View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net;
21  
22  import java.awt.event.ActionEvent;
23  import java.io.*;
24  import java.util.*;
25  
26  import javax.swing.Action;
27  import javax.swing.Icon;
28  
29  import org.apache.log4j.Logger;
30  import org.xnap.XNap;
31  import org.xnap.peer.Peer;
32  import org.xnap.plugin.Plugin;
33  import org.xnap.plugin.opennap.OpenNapPlugin;
34  import org.xnap.plugin.opennap.net.msg.ExceptionListener;
35  import org.xnap.plugin.opennap.net.msg.MessageHandler;
36  import org.xnap.plugin.opennap.net.msg.MessageListener;
37  import org.xnap.plugin.opennap.net.msg.client.UploadAckMessage;
38  import org.xnap.plugin.opennap.net.msg.server.AltDownloadAckMessage;
39  import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
40  import org.xnap.plugin.opennap.user.OpenNapUser;
41  import org.xnap.plugin.opennap.util.*;
42  import org.xnap.transfer.AbstractTransfer;
43  import org.xnap.transfer.Queueable;
44  import org.xnap.transfer.Segment;
45  import org.xnap.transfer.Upload;
46  import org.xnap.transfer.UploadManager;
47  import org.xnap.transfer.action.*;
48  import org.xnap.util.*;
49  import org.xnap.util.IllegalOperationException;
50  import org.xnap.util.State;
51  
52  /***
53   * 
54   */
55  public class OpenNapUpload extends AbstractTransfer	
56  	implements ExceptionListener, MessageListener,
57  			   Queueable, Upload, SocketListener {
58  
59  	//--- Constant(s) ---
60  
61  	/***
62  	 * If the upload is not received within this timeout after the ack
63  	 * message has been sent, the upload will be cancelled.  */
64  	public static final int ACK_TIMEOUT = 1 * 60 * 1000;
65  
66  	/***
67  	 * If the upload request is not renewed within this interval, the
68  	 * upload will be cancelled.  */
69  	public static final int MAX_REREQUST_INTERVAL = 15 * 60 * 1000;
70  
71  	/***
72  	 * The state transition table.
73  	 */
74      private static final Hashtable TRANSITION_TABLE;
75      static {
76  		State[][] table = new State[][] {
77  			{ State.NOT_STARTED,  
78  			  State.WAITING, State.STOPPED },
79  			{ State.WAITING,
80  			  State.CONNECTING, State.STOPPED, State.STOPPING, },
81  			{ State.CONNECTING, 
82  			  State.UPLOADING, State.STOPPED, State.STOPPING, },
83  			{ State.UPLOADING, 
84  			  State.SUCCEEDED, State.STOPPED, State.STOPPING, },
85  			{ State.STOPPING,
86  			  State.STOPPED, }
87  		};
88  		
89  		TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
90      }
91  
92  	//--- Data Field(s) ---
93  
94      protected static Logger logger = Logger.getLogger(OpenNapUpload.class);
95  
96      private int queuePosition;
97      private StateMachine sm = new StateMachine();
98  	private long bytesTransferred;
99  	private long totalBytesTransferred;
100 
101 	private OpenNapServer server;
102 	private OpenNapUser user;
103 	private File file;
104 	private String requestFilename;
105 
106 	private OpenNapUploadRunner runner;
107 	private UploadSocket inSocket;
108 
109 	private long startTime;
110 	private int priorityFactor = 1;
111 
112 	private long lastRequest;
113 	private String hash;
114 
115 	private AbstractTransferAction startAction = new StartAction();
116 	private AbstractTransferAction stopAction = new StopAction();
117 
118     //--- Constructor(s) ---
119 
120     public OpenNapUpload(OpenNapServer server, OpenNapUser user, File file, 
121 						 String requestFilename)
122     {
123 		this.server = server;
124 		this.user = user;
125 		this.file = file;
126 		this.requestFilename = requestFilename;
127 
128 		startTime = System.currentTimeMillis();
129 		lastRequest = System.currentTimeMillis();
130     }
131 
132     //--- Methods ---
133     
134     public void exceptionThrown(Exception e)
135     {
136 		setState(State.STOPPED, e.getLocalizedMessage());
137     }
138 
139 	public File getFile()
140 	{
141 		return file;
142 	}
143 
144     /***
145      * 
146      */
147     public long getFilesize() 
148 	{
149         return file.length();
150     }
151 
152     public Plugin getPlugin()
153     {
154 		return OpenNapPlugin.getInstance();
155     }
156 
157 	/***
158 	 * @see xnap.transfer.Transfer#getActions()
159 	 */
160 	public Action[] getActions() 
161 	{
162 		return new Action[] { 
163 			startAction, stopAction, 
164 			new LowerPriorityAction(), new RaisePriorityAction(),
165 		};
166 	}
167 
168 	/***
169 	 * 
170 	 */
171 	public long getBytesTransferred() 
172 	{
173 		return bytesTransferred;
174 	}
175 
176     public long getEnqueueTime()
177     {
178 		return startTime;
179     }
180 
181 	public String getHash()
182 	{
183 		return hash;
184 	}
185 
186     public Icon getIcon()
187     {
188 		return OpenNapPlugin.ICON_16;
189     }
190 
191 	public int getPriority()
192 	{
193 		return priorityFactor;
194 	}
195 
196     /***
197      * Returns the position in the {@link UploadManager} queue.
198      */
199     public int getQueuePosition()
200     {
201 		return queuePosition;
202     }
203 
204 	/***
205 	 * @see xnap.transfer.Transfer#getPeer()
206 	 */
207 	public Peer getPeer() 
208 	{
209 		return user;
210 	}
211 
212     public Segment[] getSegments()
213     {
214 		return null;
215     }
216 
217 	/***
218 	 * @see xnap.transfer.Transfer#getStatus()
219 	 */
220 	public String getStatus() 
221 	{
222 		return sm.getDescription();
223 	}
224 
225 	/***
226 	 * @see xnap.transfer.Transfer#getTotalBytesTransferred()
227 	 */
228 	public long getTotalBytesTransferred() 
229 	{
230 		return totalBytesTransferred;
231 	}
232 
233     public boolean isDone()
234     {
235 		State s = sm.getState();
236 		return s == State.SUCCEEDED || s == State.STOPPED;
237     }
238 
239     public boolean isRunning()
240     {
241 		return sm.getState() == State.UPLOADING;
242     }
243 
244     public void messageReceived(ServerMessage msg)
245     {
246 	    AltDownloadAckMessage m = (AltDownloadAckMessage)msg;
247 
248 	    if (m.getServer() == server && m.nick.equals(user.getName()) 
249 			&& m.filename.equals(requestFilename)) {
250 			m.consume();
251 
252 			user.setHost(m.ip);
253 			user.setPort(m.port);
254 
255 			setState(State.CONNECTING);
256 	    }
257     }
258 
259 	/***
260 	 * Invoked by {@link
261 	 * OpenNapTransferManager#received(UploadRequestMessage)}.  */
262 	void requested()
263 	{
264 		lastRequest = System.currentTimeMillis();
265 	}
266 
267 	public void setHash(String hash)
268 	{
269 		this.hash = hash;
270 	}
271 
272     /***
273      *
274      */
275     public void setQueuePosition(int position)
276     {
277 		queuePosition = position;
278 		stateChanged();
279 	}
280 
281 	public boolean socketReceived(IncomingSocket s)
282 	{
283 		if (s instanceof UploadSocket) {
284 			UploadSocket u = (UploadSocket)s;
285 			if (getPeer().getName().equals(u.nick)
286 				&& getRequestFilename().equals(u.requestFilename)) {
287 				inSocket = u;
288 				try {
289 					setState(State.CONNECTING);
290 				}
291 				catch (IllegalOperationException e) {
292 					u.close();
293 				}
294 				return true;
295 			}
296 	    }
297 		return false;
298 	}
299 
300 	/***
301 	 * Inovked by OpenNapTransferManager after the instanciation.
302 	 */
303     public void start()
304 	{
305     	UploadManager.getInstance().getQueue().add(this);
306 	}
307 
308 	/***
309 	 * Starts the transfer. Invoked by TransferQueue.  
310 	 */
311 	public boolean startTransfer()
312 	{
313 		synchronized (user) {
314 			if (!user.isUploadLimitReached()) {
315 				try {
316 					queuePosition = 0;
317 					if (user.isUploadDenied()) {
318 						setState(State.STOPPED, XNap.tr("Upload denied"));
319 					}
320 					else if (!OpenNapFileHelper.isShared(getFile())) {
321 						setState
322 							(State.STOPPED, 
323 							 XNap.tr("File is not shared anymore"));
324 					}
325 					else {
326 						setState(State.WAITING);
327 					}
328 					return true;
329 				}
330 				catch (IllegalOperationException e) {
331 					logger.error("unexpected state", e);
332 				}
333 			}
334 		}
335 		return false;
336     }
337 
338 	public void stop()
339 	{
340 		try {
341 			setState(State.STOPPING);
342 		}
343 		catch (IllegalOperationException e) {
344 			logger.warn("unexpected state", e);
345 		}
346     }
347 
348 	synchronized void commit(int transferred)
349 	{
350 		bytesTransferred += transferred;
351 		totalBytesTransferred += transferred;
352 	}
353 	
354 	/***
355 	 * @return the server
356 	 */
357 	OpenNapServer getServer() 
358 	{
359 		return server;
360 	}
361 
362 	String getRequestFilename() 
363 	{
364 		return requestFilename;
365 	}
366 
367 	void setTotalBytesTransferred(long totalBytesTransferred)
368 	{
369 		this.totalBytesTransferred = totalBytesTransferred;
370 	}
371 
372     void setState(State newState, String description)
373     {
374 		sm.setState(newState, description);
375 		stateChanged();
376     }
377 
378     void setState(State newState)
379     {
380 		sm.setState(newState);
381 		stateChanged();
382     }
383 
384     //--- Inner Class(es) ---
385 
386     private class StateMachine extends FiniteStateMachine {
387 
388 		//--- Data Field(s) ---
389 
390 		private Thread runThread;
391 		private XNapTask timeoutTask;
392 
393 		//--- Constructor(s) ---
394 
395 		public StateMachine()
396 		{
397 			super(State.NOT_STARTED, TRANSITION_TABLE);
398 
399 			timeoutTask = new TimeoutWatcherTask(MAX_REREQUST_INTERVAL);
400 			Scheduler.run(MAX_REREQUST_INTERVAL, 60 * 1000, timeoutTask);
401 		}
402 
403 		//--- Method(s) ---
404 
405 		protected synchronized void stateChanged(State oldState,
406 												 State newState) 
407 		{
408 			if (oldState == State.NOT_STARTED) {
409 				timeoutTask.cancel();
410 				timeoutTask = null;
411 			}
412 
413 			if (newState == State.WAITING) {
414 				MessageHandler.subscribe
415 					(AltDownloadAckMessage.TYPE, OpenNapUpload.this);
416 
417 				server.getListener().addSocketListener(OpenNapUpload.this);
418 
419 				user.getParent().uploadStarted();
420 
421 				UploadAckMessage msg
422 					= new UploadAckMessage(user.getName(), requestFilename);
423 				msg.setExceptionListener(OpenNapUpload.this);
424 				MessageHandler.send(server, msg);
425 				
426 				startAction.setEnabledLater(false);
427 
428 				timeoutTask = new StopTask();
429 				Scheduler.run(ACK_TIMEOUT, timeoutTask);
430 
431 				// once an alternative download request is received
432 				// or an incoming connect is registered, the state will
433 				// be set to CONNECTING or set to STOPPED in case of a 
434 				// timeout
435 			}
436 			else if (newState == State.CONNECTING) {
437 				runner = new OpenNapUploadRunner(OpenNapUpload.this, inSocket);
438 				runThread
439 					= new Thread(runner, "OpenNapUpload:" + getFilename());
440 				runThread.start();
441 			}
442 			else if (newState == State.UPLOADING) {
443 				transferStarted();
444 			}
445 			else if (newState == State.STOPPING) {
446 				if (runThread != null) {
447 					runner.stop();
448 					runThread.interrupt();
449 					runner = null;
450 					runThread = null;
451 				}
452 
453 				if (oldState == State.WAITING) {
454 					this.setState(State.STOPPED);
455 				}
456 			}
457 			else if (newState == State.SUCCEEDED 
458 					 || newState == State.STOPPED) {
459 				if (oldState != State.NOT_STARTED) {
460 					user.getParent().uploadStopped();
461 				}
462 				
463 				UploadManager.getInstance().getQueue().remove
464 					(OpenNapUpload.this);
465 				OpenNapPlugin.getTransferManager().done(OpenNapUpload.this);
466 
467 				stopAction.setEnabledLater(false);
468 			}
469 
470 			if (oldState == State.WAITING) {
471 				MessageHandler.unsubscribe(AltDownloadAckMessage.TYPE,
472 										   OpenNapUpload.this);
473 				server.getListener().removeSocketListener(OpenNapUpload.this);
474 
475 				timeoutTask.cancel();
476 				timeoutTask = null;
477 			}
478 			else if (oldState == State.UPLOADING) {
479 				transferStopped();
480 			}
481 		}
482 	}
483 
484     /***
485      * Stops the upload if it has not been rerequested within timeout.  */
486 	private class TimeoutWatcherTask extends XNapTask {
487 
488 		private long timeout;
489 
490 		public TimeoutWatcherTask(long timeout)
491 		{
492 			this.timeout = timeout;
493 		}
494 
495 		public void run() 
496 		{
497 			if (System.currentTimeMillis() - lastRequest > timeout) {
498 				try {
499 					setState(State.STOPPED, XNap.tr("Remote timed out"));
500 				}
501 				catch (IllegalOperationException e) {
502 					logger.error("unexpected state", e);
503 				}
504 			}
505 		}
506 
507 	}
508 
509     /***
510      * Stops the upload if ever run.  */
511 	private class StopTask extends XNapTask {
512 
513 		public void run() 
514 		{
515 			try {
516 				setState(State.STOPPED, XNap.tr("Remote timed out"));
517 			}
518 			catch (IllegalOperationException e) {
519 				logger.error("unexpected state", e);
520 			}
521 		}
522 
523 	}
524 
525 	private class LowerPriorityAction extends AbstractLowerPriorityAction {
526 
527 		public void actionPerformed(ActionEvent e) 
528 		{
529 			priorityFactor -= 1;
530 			UploadManager.getInstance().getQueue().resort();
531 		}
532 
533 	}
534 
535 	private class RaisePriorityAction extends AbstractRaisePriorityAction {
536 
537 		public void actionPerformed(ActionEvent e) 
538 		{
539 			priorityFactor += 1;
540 			UploadManager.getInstance().getQueue().resort();
541 		}
542 
543 	}
544 
545 	private class StartAction extends AbstractStartAction {
546 
547 		public void actionPerformed(ActionEvent event) 
548 		{
549 			try {
550 				setState(State.WAITING);
551 			}
552 			catch (IllegalOperationException e) {
553 				logger.error("unexpected state", e);
554 			}
555 		}
556  
557 	}
558 
559 	private class StopAction extends AbstractStopAction {
560 
561 		public void actionPerformed(ActionEvent event) 
562 		{
563 			stop();
564 		}
565  
566 	}
567     
568 }
569