1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.opennap.net;
21
22 import java.awt.event.ActionEvent;
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.Hashtable;
26
27 import javax.swing.Action;
28
29 import org.apache.log4j.Logger;
30 import org.xnap.XNap;
31 import org.xnap.event.StateEvent;
32 import org.xnap.event.StateListener;
33 import org.xnap.gui.util.IconHelper;
34 import org.xnap.peer.Peer;
35 import org.xnap.plugin.Plugin;
36 import org.xnap.plugin.opennap.OpenNapPlugin;
37 import org.xnap.plugin.opennap.net.msg.ExceptionListener;
38 import org.xnap.plugin.opennap.net.msg.FilenameMessage;
39 import org.xnap.plugin.opennap.net.msg.MessageHandler;
40 import org.xnap.plugin.opennap.net.msg.MessageListener;
41 import org.xnap.plugin.opennap.net.msg.client.AltDownloadRequestMessage;
42 import org.xnap.plugin.opennap.net.msg.client.DownloadRequestMessage;
43 import org.xnap.plugin.opennap.net.msg.client.PrivateMessage;
44 import org.xnap.plugin.opennap.net.msg.server.AcceptFailedMessage;
45 import org.xnap.plugin.opennap.net.msg.server.DownloadAckMessage;
46 import org.xnap.plugin.opennap.net.msg.server.GetErrorMessage;
47 import org.xnap.plugin.opennap.net.msg.server.QueueLimitMessage;
48 import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
49 import org.xnap.plugin.opennap.net.msg.server.UserSignoffMessage;
50 import org.xnap.plugin.opennap.net.msg.server.UserSignonMessage;
51 import org.xnap.transfer.AbstractTransfer;
52 import org.xnap.transfer.Download;
53 import org.xnap.transfer.Segment;
54 import org.xnap.transfer.action.AbstractRaisePriorityAction;
55 import org.xnap.transfer.action.AbstractStartAction;
56 import org.xnap.transfer.action.AbstractStopAction;
57 import org.xnap.transfer.action.AbstractTransferAction;
58 import org.xnap.util.FileHelper;
59 import org.xnap.util.FiniteStateMachine;
60 import org.xnap.util.IllegalOperationException;
61 import org.xnap.util.Scheduler;
62 import org.xnap.util.State;
63 import org.xnap.util.XNapTask;
64
65 /***
66 * Downloads a file.
67 */
68 public class OpenNapDownload extends AbstractTransfer
69 implements Download, ExceptionListener, MessageListener, SocketListener,
70 StateListener {
71
72
73
74 /***
75 * Do not send //WantQueue more times than this. Once should be
76 * enough anyway. */
77 public static final int MAX_WANT_QUEUE = 2;
78
79 /***
80 * Requests are sent in this interval if this download could not
81 * be remotely queued. */
82 public static final int REQUEST_INTERVAL = 1 * 60 * 1000;
83
84 /***
85 * Requests are sent in REQUEST_INTERVAL * QUEUED_REQUEST_FACTOR
86 * if the download is remotely queued. */
87 public static final int QUEUED_REQUEST_FACTOR = 5;
88
89 /***
90 * If no reponse is received within MAX_RESPONSE_LAG milli seconds
91 * the peer is considered to be offline and the download is
92 * stopped.
93 *
94 * <p>This should be greater than REQUEST_INTERVAL *
95 * QUEUED_REQUEST_FACTOR. */
96 public static final int MAX_RESPONSE_LAG
97 = 2 * REQUEST_INTERVAL * QUEUED_REQUEST_FACTOR + 2 * 1000;
98
99 /***
100 * The maximum time to wait for a connect from the other peer in
101 * case of a firewalled download.
102 */
103 public static final int WAITING_TIMEOUT = 2 * 60 * 1000;
104
105 /***
106 * Operation has been aborted.
107 */
108 public static final State AQUIRING_SLOT
109 = new State(XNap.tr("Waiting for local download slot"));
110
111 /***
112 * The state transition table.
113 *
114 * <p>Once State.SUCCEEDED is entered the download was successful
115 * and the state will not change anymore.
116 */
117 private static final Hashtable TRANSITION_TABLE;
118 static {
119 State[][] table = new State[][] {
120 { State.NOT_STARTED,
121 AQUIRING_SLOT, },
122 { AQUIRING_SLOT,
123 State.REQUESTING, State.STOPPING, },
124 { State.REQUESTING,
125 State.CONNECTING, State.FAILED, State.STOPPING, State.WAITING, },
126 { State.WAITING,
127 State.DOWNLOADING, State.FAILED, State.STOPPING, },
128 { State.CONNECTING,
129 State.DOWNLOADING, State.FAILED, State.STOPPING, },
130 { State.DOWNLOADING,
131 State.SUCCEEDED, State.FAILED, State.STOPPING, },
132 { State.STOPPING,
133 State.NOT_STARTED, State.FAILED, },
134 { State.FAILED,
135
136 AQUIRING_SLOT, },
137 };
138
139 TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
140 }
141
142
143
144 private static Logger logger = Logger.getLogger(OpenNapDownload.class);
145
146 private OpenNapDownloadContainer parent;
147 private OpenNapSegment[] segments = new OpenNapSegment[0];
148 private OpenNapSegment currentSegment;
149 private OpenNapSearchResult result;
150 private StateMachine sm = new StateMachine();
151 private OpenNapDownloadRunner runner;
152 private OpenNapServer server;
153 private int queuePos = -1;
154 private DownloadSocket inSocket;
155 private AbstractTransferAction startAction = new StartAction();
156 private AbstractTransferAction stopAction = new StopAction();
157 private AbstractTransferAction requestAction = new RequestAction();
158 private long bytesTransferred = 0;
159 private long offset = 0;
160
161 /***
162 * The number of //WantQueue messages that have been sent.
163 */
164 private int wantQueueCount = 0;
165
166 private int priority = 1;
167
168 /***
169 * The number of times we have waited for the peer to connect to us.
170 */
171 private int waitingCount = 0;
172
173 /***
174 * The time of the answer of the last response to a download request.
175 */
176 private long lastResponseReceived;
177 private long lastRequestSend;
178
179
180
181 public OpenNapDownload(OpenNapDownloadContainer parent, OpenNapSearchResult result)
182 {
183 this.parent = parent;
184 this.result = result;
185 this.server = result.getOpenNapUser().getServer();
186
187 startAction.setEnabledLater(true);
188 stopAction.setEnabledLater(false);
189
190
191 result.getOpenNapUser().update();
192 }
193
194
195
196 /***
197 * Tries to aquire a download slot. */
198 private void aquireDownloadSlot()
199 {
200 synchronized(getResult().getOpenNapUser()) {
201 if (!getResult().getOpenNapUser().isDownloadLimitReached()) {
202 try {
203 setState(State.REQUESTING);
204 }
205 catch (IllegalOperationException e) {
206 logger.warn("unexpected state", e);
207 }
208 }
209 }
210 }
211
212 /***
213 * Increased the number of transferred bytes.
214 *
215 * <p>Invoked by {@link OpenNapDownloadRunner#requestSegment()}.
216 */
217 void commit(int bytes)
218 {
219 parent.commit(bytes);
220 bytesTransferred += bytes;
221 }
222
223 public long getBytesTransferred()
224 {
225 return bytesTransferred;
226 }
227
228 public Action[] getActions()
229 {
230 return new Action[] { startAction, stopAction, requestAction, };
231 }
232
233 public File getFile()
234 {
235 OpenNapSegment segment = getSegment();
236 return (segment != null) ? segment.getFile() : null;
237 }
238
239 public long getFilesize()
240 {
241 OpenNapSegment segment = getSegment();
242 return (segment != null) ? segment.getEnd() - offset : 0;
243 }
244
245 public Peer getPeer()
246 {
247 return getResult().getPeer();
248 }
249
250 public Plugin getPlugin()
251 {
252 return OpenNapPlugin.getInstance();
253 }
254
255 public OpenNapSearchResult getResult()
256 {
257 return result;
258 }
259
260 public OpenNapSegment getSegment()
261 {
262 return currentSegment;
263 }
264
265 public Segment[] getSegments()
266 {
267 return segments;
268 }
269
270 public String getStatus()
271 {
272 return sm.getDescription();
273 }
274
275 public long getTotalBytesTransferred()
276 {
277 return getBytesTransferred();
278 }
279
280 public boolean isDone()
281 {
282 State s = sm.getState();
283 return s == State.SUCCEEDED || s == State.FAILED;
284 }
285
286 public boolean isRestartable()
287 {
288 return sm.getState() != State.SUCCEEDED;
289 }
290
291 public boolean isRunning()
292 {
293 return sm.getState() == State.DOWNLOADING;
294 }
295
296 public boolean isQueued()
297 {
298 return (getQueuePosition() > 0);
299 }
300
301 public boolean socketReceived(IncomingSocket s)
302 {
303 if (s instanceof DownloadSocket) {
304 DownloadSocket d = (DownloadSocket)s;
305 if (getPeer().getName().equals(d.nick)
306 && getResult().getFilename().equals(d.filename)) {
307 inSocket = d;
308 try {
309 setState(State.CONNECTING);
310 }
311 catch (IllegalOperationException e) {
312 logger.warn("unexpected state", e);
313 }
314 return true;
315 }
316 }
317 return false;
318 }
319
320 public int getQueuePosition()
321 {
322 return queuePos;
323 }
324
325 File createIncompleteFile() throws IOException
326 {
327 return FileHelper.createIncompleteFile(parent.getFilename());
328 }
329
330 OpenNapDownloadContainer getParent()
331 {
332 return parent;
333 }
334
335 OpenNapServer getServer()
336 {
337 return server;
338 }
339
340 /***
341 * Requests a segment from the parent {@link OpenNapDownloadContainer}
342 * and adds it to the list of segments if successful.
343 *
344 * <p>Invoked by {@link OpenNapDownloadRunner#requestSegment()}.
345 *
346 * @see #getSegments()
347 */
348 OpenNapSegment requestSegment()
349 {
350 currentSegment = parent.getSegmentManager().requestSegment(this);
351
352 if (currentSegment != null) {
353 OpenNapSegment[] tmp = new OpenNapSegment[segments.length + 1];
354 System.arraycopy(segments, 0, tmp, 0, segments.length);
355 tmp[segments.length] = currentSegment;
356 segments = tmp;
357
358 bytesTransferred = 0;
359 offset = currentSegment.getStart()
360 + currentSegment.getTransferred();
361 }
362
363 return currentSegment;
364 }
365
366 /***
367 * Clones segment and returns it to the parent
368 * {@link OpenNapDownloadContainer}.
369 *
370 * <p>Invoked by {@link OpenNapDownloadRunner}.
371 */
372 void returnSegment(OpenNapSegment segment) throws IOException
373 {
374
375 currentSegment = (OpenNapSegment)segment.clone();
376 segments[segments.length - 1] = currentSegment;
377 currentSegment.trim();
378
379 parent.getSegmentManager().returnSegment(segment);
380 }
381
382 void setState(State newState, String description)
383 {
384 sm.setState(newState, description);
385 stateChanged();
386 }
387
388 void setState(State newState)
389 {
390 sm.setState(newState);
391 stateChanged();
392 }
393
394 void setStateDescription(String description)
395 {
396 sm.setDescription(description);
397 stateChanged();
398 }
399
400 void setTotalMinimum(String description)
401 {
402 sm.setDescription(description);
403 stateChanged();
404 }
405
406 /***
407 * Invoked by {@link OpenNapDownloadContainer}.
408 */
409 void start()
410 {
411 try {
412 setState(AQUIRING_SLOT);
413 }
414 catch (IllegalOperationException e) {
415 logger.warn("unexpected state", e);
416 }
417 }
418
419 void stop(String reason)
420 {
421 setQueuePosition(-1);
422
423 try {
424 setState(State.STOPPING, reason);
425 }
426 catch (IllegalOperationException e) {
427
428
429
430
431
432 }
433 }
434
435 private void setQueuePosition(int queuePos)
436 {
437 this.queuePos = queuePos;
438 parent.remotelyQueued(queuePos);
439 }
440
441 /***
442 * Sends a download request.
443 */
444 private void sendRequest()
445 {
446 DownloadRequestMessage ms = new DownloadRequestMessage
447 (getPeer().getName(), getResult().getFilename());
448 ms.setExceptionListener(this);
449 MessageHandler.send(getServer(), ms);
450
451 lastRequestSend = System.currentTimeMillis();
452 setStateDescription(XNap.tr("Requesting"));
453 }
454
455 /***
456 * Sends a '//WantQueue' message, if client is known to honor
457 * want queue requests.
458 *
459 * Returns true, if a download request should be sent.
460 */
461 private boolean sendWantQueue()
462 {
463 if (wantQueueCount >= MAX_WANT_QUEUE) {
464
465
466 return false;
467 }
468
469
470
471
472 String client = getResult().getOpenNapUser().getClientInfo();
473
474 boolean send = false;
475 if (client != null) {
476 send |= client.startsWith("WinMX v2.6");
477 send |= client.startsWith("Napigator");
478 send |= client.startsWith("TrippyMX");
479 send |= client.startsWith("Utatane");
480 send |= wantQueueCount > 1 && client.startsWith("WinMX");
481 }
482
483 if (send) {
484 MessageHandler.send
485 (getServer(),
486 new PrivateMessage(getPeer().getName(), "//WantQueue"));
487 }
488 wantQueueCount++;
489
490 return true;
491 }
492
493 /***
494 * Invoked when the user download slot state changes.
495 */
496 public void stateChanged(StateEvent e)
497 {
498 aquireDownloadSlot();
499 }
500
501 public void exceptionThrown(Exception e)
502 {
503 try {
504 setState(State.NOT_STARTED, e.getLocalizedMessage());
505 }
506 catch (IllegalOperationException e2) {
507 logger.warn("unexpected state", e2);
508 }
509 }
510
511 /***
512 * Handles messages received from the server as response to the download
513 * request. Notifies the parent download container if download is ready to
514 * start or if it should be removed due to an error message received from
515 * the server.
516 */
517 public void messageReceived(ServerMessage m)
518 {
519 if (m instanceof FilenameMessage) {
520 FilenameMessage msg = (FilenameMessage)m;
521 if (m.getServer() == getServer()
522 && msg.getFilename().equals(getResult().getFilename())
523 && msg.getNick().equals(getResult().getPeer().getName())) {
524
525
526 m.consume();
527
528 try {
529 handleMessage(m);
530 }
531 catch (IllegalOperationException e) {
532 logger.warn("unexpected state", e);
533 }
534 }
535 }
536 else if (m instanceof UserSignonMessage) {
537
538 }
539 else if (m instanceof UserSignoffMessage) {
540 if (((UserSignoffMessage)m).nick.equals
541 (getResult().getPeer().getName())) {
542
543 stop(XNap.tr("User is offline"));
544 }
545 }
546 }
547
548 /***
549 * @throws IllegalOperationException
550 */
551 private void handleMessage(ServerMessage m)
552 {
553 if (m instanceof QueueLimitMessage) {
554 lastResponseReceived = System.currentTimeMillis();
555
556 int pos = ((QueueLimitMessage)m).maxDownloads;
557 if (pos == 0 || pos >= 10000) {
558 setQueuePosition(0);
559 if (sendWantQueue()) {
560 sendRequest();
561 }
562 }
563 else if (pos > 0 && pos < 10000) {
564 setQueuePosition(pos);
565 }
566 else {
567 setQueuePosition(-1);
568 }
569
570 setStateDescription(XNap.tr("Remotely queued"));
571 logger.debug(getPeer().getName() + " has queued us at pos "
572 + getQueuePosition());
573 }
574 else if (m instanceof AcceptFailedMessage
575 || m instanceof GetErrorMessage) {
576 setState(State.FAILED, XNap.tr("Request was rejected"));
577 }
578 else if (m instanceof DownloadAckMessage) {
579 DownloadAckMessage ackMsg = (DownloadAckMessage)m;
580
581 getResult().getOpenNapUser().setHost(ackMsg.ip);
582 getResult().getOpenNapUser().setPort(ackMsg.port);
583
584 if (ackMsg.port != 0) {
585 setState(State.CONNECTING);
586 }
587 else if (getServer().getLocalPort() == 0) {
588 setState(State.FAILED, XNap.tr("Both parties are firewalled"));
589 }
590 else {
591 setState(State.WAITING, XNap.tr("Waiting for connect") + "...");
592 }
593 }
594 }
595
596 private void subscribe(MessageListener ms)
597 {
598 MessageHandler.subscribe(DownloadAckMessage.TYPE, ms);
599 MessageHandler.subscribe(QueueLimitMessage.TYPE, ms);
600 MessageHandler.subscribe(GetErrorMessage.TYPE, ms);
601 MessageHandler.subscribe(AcceptFailedMessage.TYPE, ms);
602 }
603
604 private void unsubscribe(MessageListener ms)
605 {
606 MessageHandler.unsubscribe(DownloadAckMessage.TYPE, ms);
607 MessageHandler.unsubscribe(QueueLimitMessage.TYPE, ms);
608 MessageHandler.unsubscribe(GetErrorMessage.TYPE, ms);
609 MessageHandler.unsubscribe(AcceptFailedMessage.TYPE, ms);
610 }
611
612
613
614 private class StateMachine extends FiniteStateMachine
615 {
616
617
618
619 private Thread t;
620 private RequestorTask requestSender;
621 private WaitingTimeoutTask waitingTask;
622
623
624
625 public StateMachine()
626 {
627 super(State.NOT_STARTED, TRANSITION_TABLE);
628 }
629
630
631
632 protected synchronized void stateChanged(State oldState,
633 State newState)
634 {
635 if (oldState == AQUIRING_SLOT) {
636 getResult().getOpenNapUser().getParent().removeStateListener
637 (OpenNapDownload.this);
638 }
639
640 if (newState == AQUIRING_SLOT) {
641 MessageHandler.subscribe
642 (UserSignonMessage.TYPE, OpenNapDownload.this);
643 MessageHandler.subscribe
644 (UserSignoffMessage.TYPE, OpenNapDownload.this);
645
646 waitingCount = 0;
647
648 getResult().getOpenNapUser().getParent().addStateListener
649 (OpenNapDownload.this);
650 aquireDownloadSlot();
651 }
652 else if (newState == State.REQUESTING) {
653 if (!getParent().started(OpenNapDownload.this)) {
654 this.setState
655 (State.FAILED,
656 XNap.tr("Could not start download"));
657 return;
658 }
659
660
661 queuePos = -1;
662 wantQueueCount = 0;
663 lastResponseReceived = System.currentTimeMillis();
664
665 subscribe(OpenNapDownload.this);
666
667 requestSender = new RequestorTask();
668 Scheduler.run(0, REQUEST_INTERVAL, requestSender);
669
670 startAction.setEnabledLater(false);
671 stopAction.setEnabledLater(true);
672 requestAction.setEnabledLater(true);
673 }
674 else if (newState == State.WAITING) {
675 getServer().getListener().addSocketListener
676 (OpenNapDownload.this);
677
678 AltDownloadRequestMessage msg
679 = new AltDownloadRequestMessage
680 (getPeer().getName(), getResult().getFilename());
681 MessageHandler.send(getServer(), msg);
682
683 waitingCount++;
684 waitingTask = new WaitingTimeoutTask();
685 Scheduler.run(WAITING_TIMEOUT, waitingTask);
686 }
687 else if (newState == State.CONNECTING) {
688 runner = new OpenNapDownloadRunner
689 (OpenNapDownload.this, inSocket);
690 t = new Thread(runner, "OpenNapDownload");
691 t.start();
692 }
693 else if (newState == State.DOWNLOADING) {
694 transferStarted();
695 parent.transferStarted();
696 }
697 else if (newState == State.STOPPING) {
698 if (runner != null) {
699
700 runner.stop();
701 t.interrupt();
702
703 }
704 else {
705 this.setState(State.NOT_STARTED, getDescription());
706 }
707 }
708 else if (newState == State.NOT_STARTED
709 || newState == State.FAILED) {
710 startAction.setEnabledLater(true);
711 }
712
713 if (newState == State.STOPPING
714 || newState == State.FAILED) {
715
716 getResult().getOpenNapUser().getParent().downloadStopped();
717 }
718
719 if (newState == State.NOT_STARTED
720 || newState == State.FAILED
721 || newState == State.SUCCEEDED) {
722
723 runner = null;
724 t = null;
725
726 MessageHandler.unsubscribe
727 (UserSignonMessage.TYPE, OpenNapDownload.this);
728 MessageHandler.unsubscribe
729 (UserSignoffMessage.TYPE, OpenNapDownload.this);
730
731 stopAction.setEnabledLater(false);
732 getParent().stopped(OpenNapDownload.this);
733 setQueuePosition(-1);
734 }
735
736 if (oldState == State.REQUESTING) {
737 requestAction.setEnabledLater(false);
738
739
740
741 if (requestSender != null) {
742 requestSender.cancel();
743 requestSender = null;
744
745 unsubscribe(OpenNapDownload.this);
746 }
747 }
748 else if (oldState == State.WAITING) {
749 getServer().getListener().removeSocketListener
750 (OpenNapDownload.this);
751
752 if (waitingTask != null) {
753 waitingTask.cancel();
754 waitingTask = null;
755 }
756 }
757 else if (oldState == State.DOWNLOADING) {
758 transferStopped();
759 parent.transferStopped();
760 }
761 }
762
763 }
764
765 /***
766 * Resends the download request in regular intervals. */
767 private class RequestorTask extends XNapTask {
768
769 int skippedRequests = 0;
770
771 public void run()
772 {
773 if (lastResponseReceived + MAX_RESPONSE_LAG
774 < System.currentTimeMillis()) {
775
776 stop(XNap.tr("Request timed out."));
777 }
778
779 if (getQueuePosition() >= 0) {
780 String client = getResult().getOpenNapUser().getClientInfo();
781 if (client != null
782 && client.startsWith("nap")) {
783
784
785 sendRequest();
786 }
787 else {
788
789
790 if (skippedRequests == QUEUED_REQUEST_FACTOR - 1) {
791 skippedRequests = 0;
792 sendRequest();
793 }
794 else {
795 skippedRequests++;
796 }
797 }
798 }
799 else {
800 sendRequest();
801 }
802 }
803
804 }
805
806 /***
807 * Resends the download request in regular intervals. */
808 private class WaitingTimeoutTask extends XNapTask {
809
810 public void run()
811 {
812 if (waitingCount < 3) {
813 setState(State.REQUESTING,
814 XNap.tr("Rerequesting, wait for connect timed out"));
815 }
816 else {
817 setState(State.FAILED,
818 XNap.tr("Giving up, wait timed out 3 times"));
819 }
820 }
821
822 }
823
824 private class RaisePriorityAction extends AbstractRaisePriorityAction {
825
826 public void actionPerformed(ActionEvent e)
827 {
828 priority += 1;
829
830 }
831
832 }
833
834 private class StartAction extends AbstractStartAction {
835
836 public void actionPerformed(ActionEvent e)
837 {
838 start();
839 }
840
841 }
842
843 private class StopAction extends AbstractStopAction {
844
845 public void actionPerformed(ActionEvent e)
846 {
847 stop(XNap.tr("Download stopped"));
848 }
849
850 }
851
852 private class RequestAction extends AbstractTransferAction {
853
854 public RequestAction()
855 {
856 putValue(Action.NAME, XNap.tr("Send Request"));
857 putValue(Action.SHORT_DESCRIPTION,
858 XNap.tr("Requeries the selected transfers."));
859 putValue(IconHelper.XNAP_ICON, "reload.png");
860
861 setEnabled(false);
862 }
863
864 public void actionPerformed(ActionEvent e)
865 {
866 sendRequest();
867 }
868
869 }
870
871 }