1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.opennap.net;
21
22 import java.awt.event.ActionEvent;
23 import java.io.*;
24 import java.util.*;
25
26 import javax.swing.Action;
27 import javax.swing.Icon;
28
29 import org.apache.log4j.Logger;
30 import org.xnap.XNap;
31 import org.xnap.peer.Peer;
32 import org.xnap.plugin.Plugin;
33 import org.xnap.plugin.opennap.OpenNapPlugin;
34 import org.xnap.plugin.opennap.net.msg.ExceptionListener;
35 import org.xnap.plugin.opennap.net.msg.MessageHandler;
36 import org.xnap.plugin.opennap.net.msg.MessageListener;
37 import org.xnap.plugin.opennap.net.msg.client.UploadAckMessage;
38 import org.xnap.plugin.opennap.net.msg.server.AltDownloadAckMessage;
39 import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
40 import org.xnap.plugin.opennap.user.OpenNapUser;
41 import org.xnap.plugin.opennap.util.*;
42 import org.xnap.transfer.AbstractTransfer;
43 import org.xnap.transfer.Queueable;
44 import org.xnap.transfer.Segment;
45 import org.xnap.transfer.Upload;
46 import org.xnap.transfer.UploadManager;
47 import org.xnap.transfer.action.*;
48 import org.xnap.util.*;
49 import org.xnap.util.IllegalOperationException;
50 import org.xnap.util.State;
51
52 /***
53 *
54 */
55 public class OpenNapUpload extends AbstractTransfer
56 implements ExceptionListener, MessageListener,
57 Queueable, Upload, SocketListener {
58
59
60
61 /***
62 * If the upload is not received within this timeout after the ack
63 * message has been sent, the upload will be cancelled. */
64 public static final int ACK_TIMEOUT = 1 * 60 * 1000;
65
66 /***
67 * If the upload request is not renewed within this interval, the
68 * upload will be cancelled. */
69 public static final int MAX_REREQUST_INTERVAL = 15 * 60 * 1000;
70
71 /***
72 * The state transition table.
73 */
74 private static final Hashtable TRANSITION_TABLE;
75 static {
76 State[][] table = new State[][] {
77 { State.NOT_STARTED,
78 State.WAITING, State.STOPPED },
79 { State.WAITING,
80 State.CONNECTING, State.STOPPED, State.STOPPING, },
81 { State.CONNECTING,
82 State.UPLOADING, State.STOPPED, State.STOPPING, },
83 { State.UPLOADING,
84 State.SUCCEEDED, State.STOPPED, State.STOPPING, },
85 { State.STOPPING,
86 State.STOPPED, }
87 };
88
89 TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
90 }
91
92
93
94 protected static Logger logger = Logger.getLogger(OpenNapUpload.class);
95
96 private int queuePosition;
97 private StateMachine sm = new StateMachine();
98 private long bytesTransferred;
99 private long totalBytesTransferred;
100
101 private OpenNapServer server;
102 private OpenNapUser user;
103 private File file;
104 private String requestFilename;
105
106 private OpenNapUploadRunner runner;
107 private UploadSocket inSocket;
108
109 private long startTime;
110 private int priorityFactor = 1;
111
112 private long lastRequest;
113 private String hash;
114
115 private AbstractTransferAction startAction = new StartAction();
116 private AbstractTransferAction stopAction = new StopAction();
117
118
119
120 public OpenNapUpload(OpenNapServer server, OpenNapUser user, File file,
121 String requestFilename)
122 {
123 this.server = server;
124 this.user = user;
125 this.file = file;
126 this.requestFilename = requestFilename;
127
128 startTime = System.currentTimeMillis();
129 lastRequest = System.currentTimeMillis();
130 }
131
132
133
134 public void exceptionThrown(Exception e)
135 {
136 setState(State.STOPPED, e.getLocalizedMessage());
137 }
138
139 public File getFile()
140 {
141 return file;
142 }
143
144 /***
145 *
146 */
147 public long getFilesize()
148 {
149 return file.length();
150 }
151
152 public Plugin getPlugin()
153 {
154 return OpenNapPlugin.getInstance();
155 }
156
157 /***
158 * @see xnap.transfer.Transfer#getActions()
159 */
160 public Action[] getActions()
161 {
162 return new Action[] {
163 startAction, stopAction,
164 new LowerPriorityAction(), new RaisePriorityAction(),
165 };
166 }
167
168 /***
169 *
170 */
171 public long getBytesTransferred()
172 {
173 return bytesTransferred;
174 }
175
176 public long getEnqueueTime()
177 {
178 return startTime;
179 }
180
181 public String getHash()
182 {
183 return hash;
184 }
185
186 public Icon getIcon()
187 {
188 return OpenNapPlugin.ICON_16;
189 }
190
191 public int getPriority()
192 {
193 return priorityFactor;
194 }
195
196 /***
197 * Returns the position in the {@link UploadManager} queue.
198 */
199 public int getQueuePosition()
200 {
201 return queuePosition;
202 }
203
204 /***
205 * @see xnap.transfer.Transfer#getPeer()
206 */
207 public Peer getPeer()
208 {
209 return user;
210 }
211
212 public Segment[] getSegments()
213 {
214 return null;
215 }
216
217 /***
218 * @see xnap.transfer.Transfer#getStatus()
219 */
220 public String getStatus()
221 {
222 return sm.getDescription();
223 }
224
225 /***
226 * @see xnap.transfer.Transfer#getTotalBytesTransferred()
227 */
228 public long getTotalBytesTransferred()
229 {
230 return totalBytesTransferred;
231 }
232
233 public boolean isDone()
234 {
235 State s = sm.getState();
236 return s == State.SUCCEEDED || s == State.STOPPED;
237 }
238
239 public boolean isRunning()
240 {
241 return sm.getState() == State.UPLOADING;
242 }
243
244 public void messageReceived(ServerMessage msg)
245 {
246 AltDownloadAckMessage m = (AltDownloadAckMessage)msg;
247
248 if (m.getServer() == server && m.nick.equals(user.getName())
249 && m.filename.equals(requestFilename)) {
250 m.consume();
251
252 user.setHost(m.ip);
253 user.setPort(m.port);
254
255 setState(State.CONNECTING);
256 }
257 }
258
259 /***
260 * Invoked by {@link
261 * OpenNapTransferManager#received(UploadRequestMessage)}. */
262 void requested()
263 {
264 lastRequest = System.currentTimeMillis();
265 }
266
267 public void setHash(String hash)
268 {
269 this.hash = hash;
270 }
271
272 /***
273 *
274 */
275 public void setQueuePosition(int position)
276 {
277 queuePosition = position;
278 stateChanged();
279 }
280
281 public boolean socketReceived(IncomingSocket s)
282 {
283 if (s instanceof UploadSocket) {
284 UploadSocket u = (UploadSocket)s;
285 if (getPeer().getName().equals(u.nick)
286 && getRequestFilename().equals(u.requestFilename)) {
287 inSocket = u;
288 try {
289 setState(State.CONNECTING);
290 }
291 catch (IllegalOperationException e) {
292 u.close();
293 }
294 return true;
295 }
296 }
297 return false;
298 }
299
300 /***
301 * Inovked by OpenNapTransferManager after the instanciation.
302 */
303 public void start()
304 {
305 UploadManager.getInstance().getQueue().add(this);
306 }
307
308 /***
309 * Starts the transfer. Invoked by TransferQueue.
310 */
311 public boolean startTransfer()
312 {
313 synchronized (user) {
314 if (!user.isUploadLimitReached()) {
315 try {
316 queuePosition = 0;
317 if (user.isUploadDenied()) {
318 setState(State.STOPPED, XNap.tr("Upload denied"));
319 }
320 else if (!OpenNapFileHelper.isShared(getFile())) {
321 setState
322 (State.STOPPED,
323 XNap.tr("File is not shared anymore"));
324 }
325 else {
326 setState(State.WAITING);
327 }
328 return true;
329 }
330 catch (IllegalOperationException e) {
331 logger.error("unexpected state", e);
332 }
333 }
334 }
335 return false;
336 }
337
338 public void stop()
339 {
340 try {
341 setState(State.STOPPING);
342 }
343 catch (IllegalOperationException e) {
344 logger.warn("unexpected state", e);
345 }
346 }
347
348 synchronized void commit(int transferred)
349 {
350 bytesTransferred += transferred;
351 totalBytesTransferred += transferred;
352 }
353
354 /***
355 * @return the server
356 */
357 OpenNapServer getServer()
358 {
359 return server;
360 }
361
362 String getRequestFilename()
363 {
364 return requestFilename;
365 }
366
367 void setTotalBytesTransferred(long totalBytesTransferred)
368 {
369 this.totalBytesTransferred = totalBytesTransferred;
370 }
371
372 void setState(State newState, String description)
373 {
374 sm.setState(newState, description);
375 stateChanged();
376 }
377
378 void setState(State newState)
379 {
380 sm.setState(newState);
381 stateChanged();
382 }
383
384
385
386 private class StateMachine extends FiniteStateMachine {
387
388
389
390 private Thread runThread;
391 private XNapTask timeoutTask;
392
393
394
395 public StateMachine()
396 {
397 super(State.NOT_STARTED, TRANSITION_TABLE);
398
399 timeoutTask = new TimeoutWatcherTask(MAX_REREQUST_INTERVAL);
400 Scheduler.run(MAX_REREQUST_INTERVAL, 60 * 1000, timeoutTask);
401 }
402
403
404
405 protected synchronized void stateChanged(State oldState,
406 State newState)
407 {
408 if (oldState == State.NOT_STARTED) {
409 timeoutTask.cancel();
410 timeoutTask = null;
411 }
412
413 if (newState == State.WAITING) {
414 MessageHandler.subscribe
415 (AltDownloadAckMessage.TYPE, OpenNapUpload.this);
416
417 server.getListener().addSocketListener(OpenNapUpload.this);
418
419 user.getParent().uploadStarted();
420
421 UploadAckMessage msg
422 = new UploadAckMessage(user.getName(), requestFilename);
423 msg.setExceptionListener(OpenNapUpload.this);
424 MessageHandler.send(server, msg);
425
426 startAction.setEnabledLater(false);
427
428 timeoutTask = new StopTask();
429 Scheduler.run(ACK_TIMEOUT, timeoutTask);
430
431
432
433
434
435 }
436 else if (newState == State.CONNECTING) {
437 runner = new OpenNapUploadRunner(OpenNapUpload.this, inSocket);
438 runThread
439 = new Thread(runner, "OpenNapUpload:" + getFilename());
440 runThread.start();
441 }
442 else if (newState == State.UPLOADING) {
443 transferStarted();
444 }
445 else if (newState == State.STOPPING) {
446 if (runThread != null) {
447 runner.stop();
448 runThread.interrupt();
449 runner = null;
450 runThread = null;
451 }
452
453 if (oldState == State.WAITING) {
454 this.setState(State.STOPPED);
455 }
456 }
457 else if (newState == State.SUCCEEDED
458 || newState == State.STOPPED) {
459 if (oldState != State.NOT_STARTED) {
460 user.getParent().uploadStopped();
461 }
462
463 UploadManager.getInstance().getQueue().remove
464 (OpenNapUpload.this);
465 OpenNapPlugin.getTransferManager().done(OpenNapUpload.this);
466
467 stopAction.setEnabledLater(false);
468 }
469
470 if (oldState == State.WAITING) {
471 MessageHandler.unsubscribe(AltDownloadAckMessage.TYPE,
472 OpenNapUpload.this);
473 server.getListener().removeSocketListener(OpenNapUpload.this);
474
475 timeoutTask.cancel();
476 timeoutTask = null;
477 }
478 else if (oldState == State.UPLOADING) {
479 transferStopped();
480 }
481 }
482 }
483
484 /***
485 * Stops the upload if it has not been rerequested within timeout. */
486 private class TimeoutWatcherTask extends XNapTask {
487
488 private long timeout;
489
490 public TimeoutWatcherTask(long timeout)
491 {
492 this.timeout = timeout;
493 }
494
495 public void run()
496 {
497 if (System.currentTimeMillis() - lastRequest > timeout) {
498 try {
499 setState(State.STOPPED, XNap.tr("Remote timed out"));
500 }
501 catch (IllegalOperationException e) {
502 logger.error("unexpected state", e);
503 }
504 }
505 }
506
507 }
508
509 /***
510 * Stops the upload if ever run. */
511 private class StopTask extends XNapTask {
512
513 public void run()
514 {
515 try {
516 setState(State.STOPPED, XNap.tr("Remote timed out"));
517 }
518 catch (IllegalOperationException e) {
519 logger.error("unexpected state", e);
520 }
521 }
522
523 }
524
525 private class LowerPriorityAction extends AbstractLowerPriorityAction {
526
527 public void actionPerformed(ActionEvent e)
528 {
529 priorityFactor -= 1;
530 UploadManager.getInstance().getQueue().resort();
531 }
532
533 }
534
535 private class RaisePriorityAction extends AbstractRaisePriorityAction {
536
537 public void actionPerformed(ActionEvent e)
538 {
539 priorityFactor += 1;
540 UploadManager.getInstance().getQueue().resort();
541 }
542
543 }
544
545 private class StartAction extends AbstractStartAction {
546
547 public void actionPerformed(ActionEvent event)
548 {
549 try {
550 setState(State.WAITING);
551 }
552 catch (IllegalOperationException e) {
553 logger.error("unexpected state", e);
554 }
555 }
556
557 }
558
559 private class StopAction extends AbstractStopAction {
560
561 public void actionPerformed(ActionEvent event)
562 {
563 stop();
564 }
565
566 }
567
568 }
569