View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.overnet.net;
21  
22  import java.io.IOException;
23  import java.net.*;
24  import java.nio.ByteBuffer;
25  import java.nio.ByteOrder;
26  import java.nio.channels.ClosedChannelException;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.nio.channels.SocketChannel;
30  import java.util.Hashtable;
31  import java.util.Iterator;
32  
33  import org.apache.log4j.Logger;
34  import org.xnap.XNap;
35  import org.xnap.event.StateListener;
36  import org.xnap.event.StateSupport;
37  import org.xnap.plugin.overnet.OvernetPlugin;
38  import org.xnap.plugin.overnet.net.msg.client.*;
39  import org.xnap.plugin.overnet.net.msg.*;
40  import org.xnap.plugin.overnet.net.msg.client.GetOptionsMessage;
41  import org.xnap.plugin.overnet.net.msg.client.GetServerListMessage;
42  import org.xnap.plugin.overnet.net.msg.client.GetSharedFilesMessage;
43  import org.xnap.plugin.overnet.net.msg.client.LoginMessage;
44  import org.xnap.plugin.overnet.net.msg.client.SendDownloadStatusMessage;
45  import org.xnap.plugin.overnet.net.msg.client.SendUploadStatusMessage;
46  import org.xnap.plugin.overnet.net.msg.client.StopDownloadStatusMessage;
47  import org.xnap.plugin.overnet.net.msg.client.StopUploadStatusMessage;
48  import org.xnap.plugin.overnet.net.msg.core.OvernetCoreMessage;
49  import org.xnap.plugin.overnet.net.msg.core.MessageFactory;
50  import org.xnap.plugin.overnet.util.OvernetPreferences;
51  import org.xnap.search.SearchManager;
52  import org.xnap.util.FiniteStateMachine;
53  import org.xnap.util.Preferences;
54  import org.xnap.util.Scheduler;
55  import org.xnap.util.State;
56  
57  /***
58   * Epitomizes a connection to the overnet core.
59   * 
60   * This class handles all the low level writing and reading of messages. The
61   * transitions from one connection state to the next are handled in its
62   * private subclass <code>StateMachine</code>.
63   */
64  public class OvernetCore
65  {
66      //--- Constant(s) ---
67  
68    	public static final byte ED2K_BYTE = (byte)0xe3;
69  
70  	private static final Hashtable TRANSITION_TABLE;
71  	static {
72  		State[][] table = new State[][] {
73  			{ State.DISCONNECTED, State.CONNECTING, },
74  			{ State.CONNECTING, State.CONNECTED, State.DISCONNECTED },
75  			{ State.CONNECTED, State.DISCONNECTING, State.DISCONNECTED },
76  			{ State.DISCONNECTING, State.DISCONNECTED }
77  		};
78  		TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
79  	}
80  
81  	//--- Data Field(s) ---
82  
83  	/***
84  	 * The non blocking channel for this connection.
85  	 */
86  	private SocketChannel channel;
87  	/***
88  	 * The buffer used for reading from the channel.
89  	 */
90  	private ByteBuffer buffer = ByteBuffer.allocate(1024);
91  	private Selector selector;
92  	private boolean die = false;
93  	/***
94  	 * Queues the messages to be written out.
95  	 */
96  	private MessageQueue sendQueue = new MessageQueue();
97  	/***
98  	 * Messages are forwarded to the message handler after their creation.
99  	 */
100 	private MessageHandler messageHandler;
101 
102 	private OvernetPreferences prefs = OvernetPreferences.getInstance();
103 	/***
104 	 * Delay connection time.
105 	 */
106 	private int delay = 0;
107 
108 	private StateMachine sm = new StateMachine();
109     private StateSupport listeners = new StateSupport(this);
110 
111 	private static Logger logger = Logger.getLogger(OvernetCore.class);
112 
113     //--- Constructor(s) ---
114 
115 	public OvernetCore(MessageHandler handler)
116 	{
117 		buffer.order(ByteOrder.LITTLE_ENDIAN);
118 		if (handler == null) {
119 			throw new NullPointerException();
120 		}
121 		messageHandler = handler;
122 	}
123 
124 	/***
125 	 * Connect to the overnet core.
126 	 */
127 	public void connect()
128 	{
129 		connect(0);
130 	}
131 	
132 	/***
133 	 * Connect to core after a certain delay.
134 	 *
135 	 * The core's state will be immediately connecting.
136 	 */
137 	public void connect(int delay)
138 	{
139 		this.delay = delay;
140 		setState(State.CONNECTING);
141 	}
142 	
143 
144 	private void read() throws IOException
145 	{
146 		try {
147 			if (channel.read(buffer) == -1) {
148 				stop();
149 				return;
150 			}
151 		}
152 		catch (IOException ie) {
153 //  			logger.debug("channel read", ie);
154 		}
155 		byte type;
156 		while ((type = check(buffer)) != -1) {
157 //  			logger.debug("read " + (int)type);
158 			OvernetCoreMessage msg = MessageFactory.create(type, buffer);
159 			if (msg != null && msg.isValid()) {
160 				messageHandler.handle(msg);
161 			}
162 			int size = buffer.getInt(1);
163 //  			logger.debug("Computed size " + size + " as to b.limit() "
164 //  						 + buffer.limit());
165 			buffer.position(Math.min(size + 5, buffer.limit()));
166 			buffer.compact();
167 		}
168 		buffer.compact();
169 	}
170 	
171 	private byte check(ByteBuffer buffer) throws IOException
172 	{
173 		buffer.flip();
174 		
175 		if (buffer.remaining() < 6) {
176 			return -1;
177 		}
178 		
179 		byte b = buffer.get();
180 		int size = buffer.getInt();
181 		byte type = buffer.get();
182 		buffer.rewind();
183 		
184 		if (b != ED2K_BYTE) {
185 			logger.debug("invalid packet" + toString(buffer.array())
186 						 + " offset " + buffer.arrayOffset()
187 						 + " position " + buffer.position()
188 						 + " limit " + buffer.limit());
189 			throw new IOException("Invalid Packet");
190 		}
191 		
192 		if (buffer.capacity() < size + 5) {
193 			resizeBuffer(size + 5);
194 			return -1;
195 		}
196 
197 		if (buffer.remaining() < size + 5) {
198 			logger.debug(type + " message of size " + size 
199 						 + "  not fully read");
200 			return -1;
201 		}
202 
203 		return type;
204 	}
205 
206 	private void resizeBuffer(int size)
207 	{
208 		logger.debug("reallocating buffer to " + size);
209 
210 		byte[] bytes = new byte[size];
211 		buffer.get(bytes, 0, buffer.remaining());
212 		ByteBuffer newBuf = ByteBuffer.wrap(bytes);
213 		newBuf.order(ByteOrder.LITTLE_ENDIAN);
214 		newBuf.limit(buffer.limit());
215 		buffer = newBuf;
216 
217 	}
218 
219 	private String toString(byte[] array)
220 	{
221 		StringBuffer sb = new StringBuffer(array.length * 2);
222 		for (int i = 0; i < array.length; i++) {
223 			sb.append(array[i]);
224 			sb.append(" ");
225 		}
226 		return sb.toString();
227 	}
228 
229 	private synchronized void write() throws IOException
230 	{
231 		OvernetClientMessage msg = sendQueue.getNextMessage();
232 		if (msg != null) {
233 			channel.write(msg.getBuffer());
234 		}
235 		/* if there is still data to be written out, register the write op
236 		   bit.  */
237 		if (sendQueue.hasRemaining()) {
238 			addOp(SelectionKey.OP_WRITE);
239 		}
240 		else {
241 			removeOp(SelectionKey.OP_WRITE);
242 		}
243 	}
244 
245 	private void addOp(int op)
246 	{
247 		SelectionKey key = channel.keyFor(selector);
248 		try {
249 			if (key != null) {
250 				channel.register(selector, key.interestOps() | op);
251 			}
252 			else {
253 				channel.register(selector, op);
254 			}
255 		}
256 		catch (ClosedChannelException ce) {
257 			logger.debug("addOp", ce);
258 			setState(State.DISCONNECTED, ce.getLocalizedMessage());
259 		}
260 	}
261 
262 	private void removeOp(int op)
263 	{
264 		SelectionKey key = channel.keyFor(selector);
265 		try {
266 			if (key != null) {
267 				channel.register(selector, key.interestOps() ^ op);
268 			}
269 		}
270 		catch (ClosedChannelException ce) {
271 			logger.debug("removeOp", ce);
272 			setState(State.DISCONNECTED, ce.getLocalizedMessage());
273 		}
274 	}
275 
276 	/***
277 	 * Adds the message to the <code>sendQueue</code> and calls {@link
278 	 * #write()}
279 	 */
280 	private void write(OvernetClientMessage cm)
281 	{
282 		sendQueue.add(cm);
283 		try {
284 			write();
285 		}
286 		catch (IOException ie) {
287 			stop();
288 			logger.debug("error writing", ie);
289 		}
290 	}
291 
292 	/***
293 	 * Sends a message to the overnet core in a non-blocking way.
294 	 */
295 	public static void send(OvernetClientMessage cm)
296 	{
297 		OvernetCore c = OvernetPlugin.getInstance().getCore();
298 //  		logger.debug("Sending " + cm.getClass().getName());
299 		c.write(cm);
300 	}
301 
302 	/***
303 	 * Stops the connection.
304 	 */
305 	public void stop()
306 	{
307 		die = true;
308 		if (selector != null) {
309 			selector.wakeup();
310 		}
311 	}
312 
313     public void addStateListener(StateListener listener) 
314     {
315         listeners.addStateListener(listener);
316     }
317 
318     public void removeStateListener(StateListener listener) 
319     {
320         listeners.removeStateListener(listener);
321     }
322 
323 	public State getState()
324 	{
325 		return sm.getState();
326 	}
327 
328 	public String getDescription()
329 	{
330 		return sm.getDescription();
331 	}
332 
333 	private void setState(State newState)
334     {
335 		sm.setState(newState);
336 		listeners.fireStateChanged();
337     }
338 
339     private void setState(State newState, String message)
340     {
341 		sm.setState(newState, message);
342 		listeners.fireStateChanged();
343     }
344 
345 	//--- Inner Class(es) ---
346 	
347 	private class StateMachine extends FiniteStateMachine implements Runnable
348     {
349 		//---- Data Field(s) ---
350 
351 		private Thread runner;
352 		
353 		//--- Constructor(s) ---
354 		
355 		public StateMachine()
356 		{
357 			super(State.DISCONNECTED, TRANSITION_TABLE);
358 		}
359 		
360 		protected synchronized void stateChanged(State oldState,
361 												 State newState)
362 		{
363 			if (newState == State.CONNECTING) {
364 				if (delay > 0) {
365 					Scheduler.run(delay, new Runnable() {
366 							public void run()
367 							{
368 								connect();
369 							}
370 						});
371 				}
372 				else {
373 					connect();
374 				}
375 			}
376 			else if (newState == State.CONNECTED) {
377 				connected();
378 			}
379 			else if (newState == State.DISCONNECTING) {
380 				disconnect();
381 			}
382 			else if (newState == State.DISCONNECTED) {
383 				close();
384 			}
385 		}
386 
387 		private void connect() 
388 		{
389 			try {
390 				sendQueue.clear();
391 				die = false;
392 				channel = SocketChannel.open();
393 				channel.configureBlocking(false);
394 				InetSocketAddress isa = new InetSocketAddress
395 					(prefs.getCoreHost(), prefs.getCorePort());
396 				if (isa.isUnresolved()) {
397 					throw new IOException("Address could not me be resolved");
398 				}
399 				channel.connect(isa);
400 			}
401 			catch (IOException ie) {
402 				logger.debug("connecting failed", ie);
403 				this.setState(State.DISCONNECTED, ie.getLocalizedMessage());
404 			}
405 			// start selector thread here
406 			runner = new Thread(this, "Overnet core connection");
407 			runner.start();
408 		}
409 
410 		private void connected()
411 		{
412 			removeOp(SelectionKey.OP_CONNECT);
413 			addOp(SelectionKey.OP_READ);
414 			
415 			/* todo: move this somewhere else, it's not the core's task to
416                send messages on its own. */
417 			write(new LoginMessage(prefs.getUsername(), prefs.getPassword()));
418 			write(new CommandMessage("g"));
419 			if (prefs.getUseXNapDownloadDirs()) {
420 				// these are the global preferences
421 				Preferences p = Preferences.getInstance();
422 				write(new CommandMessage("temp " + p.getIncompleteDir()));
423 				write(new CommandMessage("in " + p.getDownloadDir()));
424 			}
425 			write(new GetOptionsMessage());
426 			write(new SendDownloadStatusMessage());
427 			write(new SendUploadStatusMessage());
428 			write(new GetServerListMessage());
429 			//  write(new GetSharedDirsMessage());
430 			//  write(new GetSharedFilesMessage());
431 			SearchManager.getInstance().add(OvernetPlugin.getInstance());
432 		}
433 		
434 		public void run()
435 		{
436 			try {
437 				select();
438 				OvernetCore.this.setState(State.DISCONNECTING);
439 				OvernetCore.this.setState(State.DISCONNECTED);
440 			}
441 			catch (IOException ie) {
442 				logger.debug("overnet core connection", ie);
443 				OvernetCore.this.setState(State.DISCONNECTED,
444 										  ie.getLocalizedMessage());
445 			}
446 		}
447 
448 		private void select() throws IOException
449 		{
450 			selector = Selector.open();
451 			channel.register(selector, SelectionKey.OP_CONNECT);
452 			
453 			while (selector.select() > 0 && !die) {
454 				for (Iterator i = selector.selectedKeys().iterator(); 
455 					 i.hasNext();) {
456 					
457 					SelectionKey key = (SelectionKey)i.next();
458 					i.remove();
459 
460 					SocketChannel sc = (SocketChannel)key.channel();
461 
462 					if (key.isConnectable()) {
463 						if (sc.isConnectionPending()) {
464 							logger.debug("finish connect");
465 							try {
466 								if (sc.finishConnect()) {
467 									OvernetCore.this.setState(State.CONNECTED);
468 								}
469 								else {
470 									throw new
471 										IOException("finish connect failed");
472 								}
473 							}
474 							catch (ConnectException ce) {
475 								logger.debug("connect exp", ce);
476 								throw new ConnectException(XNap.tr("Could not connect to overnet daemon\nThis can have several reasons:\n-overnet is not installed.This plugin requires an external program called overnet that handles the network connection. Please see http://www.overnet.com/ for more information and download links.\n-The plugin settings are incorrect, e.g. the overnet daemon port is wrong. Please check the settings under Settings -> Configure Plugins.\n-The overnet daemon is not running: Please start it manually or configure XNap to start it for you and restart XNap.\n"));
477 							}
478 						}
479 					}
480 					else if (key.isReadable()) {
481 						read();
482 					}
483 					else if (key.isWritable()) {
484 						write();
485 					}
486 				}
487 			}
488 		}
489 
490 		private void disconnect()
491 		{
492 			// that's what the gtk gui sends on deliberate shutdown
493 			// this shouldn't be sent here
494 			write(new StopDownloadStatusMessage());
495 			write(new StopUploadStatusMessage());
496 			write(new GetSharedFilesMessage());
497 		}
498 
499 		/***
500 		 * Clean up and close channel.
501 		 */
502 		private void close()
503 		{
504 			if (channel != null) {
505 				try {
506 					logger.debug("closing channel");
507 					channel.socket().shutdownInput();
508 					channel.socket().shutdownOutput();
509 					channel.socket().close();
510 				}
511 				catch (IOException ie) {
512 					logger.debug("closing channel", ie);
513 				}
514 			}
515 			SearchManager.getInstance().remove(OvernetPlugin.getInstance());
516 		}
517 	}
518 }