1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.overnet.net;
21
22 import java.io.IOException;
23 import java.net.*;
24 import java.nio.ByteBuffer;
25 import java.nio.ByteOrder;
26 import java.nio.channels.ClosedChannelException;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.SocketChannel;
30 import java.util.Hashtable;
31 import java.util.Iterator;
32
33 import org.apache.log4j.Logger;
34 import org.xnap.XNap;
35 import org.xnap.event.StateListener;
36 import org.xnap.event.StateSupport;
37 import org.xnap.plugin.overnet.OvernetPlugin;
38 import org.xnap.plugin.overnet.net.msg.client.*;
39 import org.xnap.plugin.overnet.net.msg.*;
40 import org.xnap.plugin.overnet.net.msg.client.GetOptionsMessage;
41 import org.xnap.plugin.overnet.net.msg.client.GetServerListMessage;
42 import org.xnap.plugin.overnet.net.msg.client.GetSharedFilesMessage;
43 import org.xnap.plugin.overnet.net.msg.client.LoginMessage;
44 import org.xnap.plugin.overnet.net.msg.client.SendDownloadStatusMessage;
45 import org.xnap.plugin.overnet.net.msg.client.SendUploadStatusMessage;
46 import org.xnap.plugin.overnet.net.msg.client.StopDownloadStatusMessage;
47 import org.xnap.plugin.overnet.net.msg.client.StopUploadStatusMessage;
48 import org.xnap.plugin.overnet.net.msg.core.OvernetCoreMessage;
49 import org.xnap.plugin.overnet.net.msg.core.MessageFactory;
50 import org.xnap.plugin.overnet.util.OvernetPreferences;
51 import org.xnap.search.SearchManager;
52 import org.xnap.util.FiniteStateMachine;
53 import org.xnap.util.Preferences;
54 import org.xnap.util.Scheduler;
55 import org.xnap.util.State;
56
57 /***
58 * Epitomizes a connection to the overnet core.
59 *
60 * This class handles all the low level writing and reading of messages. The
61 * transitions from one connection state to the next are handled in its
62 * private subclass <code>StateMachine</code>.
63 */
64 public class OvernetCore
65 {
66
67
68 public static final byte ED2K_BYTE = (byte)0xe3;
69
70 private static final Hashtable TRANSITION_TABLE;
71 static {
72 State[][] table = new State[][] {
73 { State.DISCONNECTED, State.CONNECTING, },
74 { State.CONNECTING, State.CONNECTED, State.DISCONNECTED },
75 { State.CONNECTED, State.DISCONNECTING, State.DISCONNECTED },
76 { State.DISCONNECTING, State.DISCONNECTED }
77 };
78 TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
79 }
80
81
82
83 /***
84 * The non blocking channel for this connection.
85 */
86 private SocketChannel channel;
87 /***
88 * The buffer used for reading from the channel.
89 */
90 private ByteBuffer buffer = ByteBuffer.allocate(1024);
91 private Selector selector;
92 private boolean die = false;
93 /***
94 * Queues the messages to be written out.
95 */
96 private MessageQueue sendQueue = new MessageQueue();
97 /***
98 * Messages are forwarded to the message handler after their creation.
99 */
100 private MessageHandler messageHandler;
101
102 private OvernetPreferences prefs = OvernetPreferences.getInstance();
103 /***
104 * Delay connection time.
105 */
106 private int delay = 0;
107
108 private StateMachine sm = new StateMachine();
109 private StateSupport listeners = new StateSupport(this);
110
111 private static Logger logger = Logger.getLogger(OvernetCore.class);
112
113
114
115 public OvernetCore(MessageHandler handler)
116 {
117 buffer.order(ByteOrder.LITTLE_ENDIAN);
118 if (handler == null) {
119 throw new NullPointerException();
120 }
121 messageHandler = handler;
122 }
123
124 /***
125 * Connect to the overnet core.
126 */
127 public void connect()
128 {
129 connect(0);
130 }
131
132 /***
133 * Connect to core after a certain delay.
134 *
135 * The core's state will be immediately connecting.
136 */
137 public void connect(int delay)
138 {
139 this.delay = delay;
140 setState(State.CONNECTING);
141 }
142
143
144 private void read() throws IOException
145 {
146 try {
147 if (channel.read(buffer) == -1) {
148 stop();
149 return;
150 }
151 }
152 catch (IOException ie) {
153
154 }
155 byte type;
156 while ((type = check(buffer)) != -1) {
157
158 OvernetCoreMessage msg = MessageFactory.create(type, buffer);
159 if (msg != null && msg.isValid()) {
160 messageHandler.handle(msg);
161 }
162 int size = buffer.getInt(1);
163
164
165 buffer.position(Math.min(size + 5, buffer.limit()));
166 buffer.compact();
167 }
168 buffer.compact();
169 }
170
171 private byte check(ByteBuffer buffer) throws IOException
172 {
173 buffer.flip();
174
175 if (buffer.remaining() < 6) {
176 return -1;
177 }
178
179 byte b = buffer.get();
180 int size = buffer.getInt();
181 byte type = buffer.get();
182 buffer.rewind();
183
184 if (b != ED2K_BYTE) {
185 logger.debug("invalid packet" + toString(buffer.array())
186 + " offset " + buffer.arrayOffset()
187 + " position " + buffer.position()
188 + " limit " + buffer.limit());
189 throw new IOException("Invalid Packet");
190 }
191
192 if (buffer.capacity() < size + 5) {
193 resizeBuffer(size + 5);
194 return -1;
195 }
196
197 if (buffer.remaining() < size + 5) {
198 logger.debug(type + " message of size " + size
199 + " not fully read");
200 return -1;
201 }
202
203 return type;
204 }
205
206 private void resizeBuffer(int size)
207 {
208 logger.debug("reallocating buffer to " + size);
209
210 byte[] bytes = new byte[size];
211 buffer.get(bytes, 0, buffer.remaining());
212 ByteBuffer newBuf = ByteBuffer.wrap(bytes);
213 newBuf.order(ByteOrder.LITTLE_ENDIAN);
214 newBuf.limit(buffer.limit());
215 buffer = newBuf;
216
217 }
218
219 private String toString(byte[] array)
220 {
221 StringBuffer sb = new StringBuffer(array.length * 2);
222 for (int i = 0; i < array.length; i++) {
223 sb.append(array[i]);
224 sb.append(" ");
225 }
226 return sb.toString();
227 }
228
229 private synchronized void write() throws IOException
230 {
231 OvernetClientMessage msg = sendQueue.getNextMessage();
232 if (msg != null) {
233 channel.write(msg.getBuffer());
234 }
235
236
237 if (sendQueue.hasRemaining()) {
238 addOp(SelectionKey.OP_WRITE);
239 }
240 else {
241 removeOp(SelectionKey.OP_WRITE);
242 }
243 }
244
245 private void addOp(int op)
246 {
247 SelectionKey key = channel.keyFor(selector);
248 try {
249 if (key != null) {
250 channel.register(selector, key.interestOps() | op);
251 }
252 else {
253 channel.register(selector, op);
254 }
255 }
256 catch (ClosedChannelException ce) {
257 logger.debug("addOp", ce);
258 setState(State.DISCONNECTED, ce.getLocalizedMessage());
259 }
260 }
261
262 private void removeOp(int op)
263 {
264 SelectionKey key = channel.keyFor(selector);
265 try {
266 if (key != null) {
267 channel.register(selector, key.interestOps() ^ op);
268 }
269 }
270 catch (ClosedChannelException ce) {
271 logger.debug("removeOp", ce);
272 setState(State.DISCONNECTED, ce.getLocalizedMessage());
273 }
274 }
275
276 /***
277 * Adds the message to the <code>sendQueue</code> and calls {@link
278 * #write()}
279 */
280 private void write(OvernetClientMessage cm)
281 {
282 sendQueue.add(cm);
283 try {
284 write();
285 }
286 catch (IOException ie) {
287 stop();
288 logger.debug("error writing", ie);
289 }
290 }
291
292 /***
293 * Sends a message to the overnet core in a non-blocking way.
294 */
295 public static void send(OvernetClientMessage cm)
296 {
297 OvernetCore c = OvernetPlugin.getInstance().getCore();
298
299 c.write(cm);
300 }
301
302 /***
303 * Stops the connection.
304 */
305 public void stop()
306 {
307 die = true;
308 if (selector != null) {
309 selector.wakeup();
310 }
311 }
312
313 public void addStateListener(StateListener listener)
314 {
315 listeners.addStateListener(listener);
316 }
317
318 public void removeStateListener(StateListener listener)
319 {
320 listeners.removeStateListener(listener);
321 }
322
323 public State getState()
324 {
325 return sm.getState();
326 }
327
328 public String getDescription()
329 {
330 return sm.getDescription();
331 }
332
333 private void setState(State newState)
334 {
335 sm.setState(newState);
336 listeners.fireStateChanged();
337 }
338
339 private void setState(State newState, String message)
340 {
341 sm.setState(newState, message);
342 listeners.fireStateChanged();
343 }
344
345
346
347 private class StateMachine extends FiniteStateMachine implements Runnable
348 {
349
350
351 private Thread runner;
352
353
354
355 public StateMachine()
356 {
357 super(State.DISCONNECTED, TRANSITION_TABLE);
358 }
359
360 protected synchronized void stateChanged(State oldState,
361 State newState)
362 {
363 if (newState == State.CONNECTING) {
364 if (delay > 0) {
365 Scheduler.run(delay, new Runnable() {
366 public void run()
367 {
368 connect();
369 }
370 });
371 }
372 else {
373 connect();
374 }
375 }
376 else if (newState == State.CONNECTED) {
377 connected();
378 }
379 else if (newState == State.DISCONNECTING) {
380 disconnect();
381 }
382 else if (newState == State.DISCONNECTED) {
383 close();
384 }
385 }
386
387 private void connect()
388 {
389 try {
390 sendQueue.clear();
391 die = false;
392 channel = SocketChannel.open();
393 channel.configureBlocking(false);
394 InetSocketAddress isa = new InetSocketAddress
395 (prefs.getCoreHost(), prefs.getCorePort());
396 if (isa.isUnresolved()) {
397 throw new IOException("Address could not me be resolved");
398 }
399 channel.connect(isa);
400 }
401 catch (IOException ie) {
402 logger.debug("connecting failed", ie);
403 this.setState(State.DISCONNECTED, ie.getLocalizedMessage());
404 }
405
406 runner = new Thread(this, "Overnet core connection");
407 runner.start();
408 }
409
410 private void connected()
411 {
412 removeOp(SelectionKey.OP_CONNECT);
413 addOp(SelectionKey.OP_READ);
414
415
416
417 write(new LoginMessage(prefs.getUsername(), prefs.getPassword()));
418 write(new CommandMessage("g"));
419 if (prefs.getUseXNapDownloadDirs()) {
420
421 Preferences p = Preferences.getInstance();
422 write(new CommandMessage("temp " + p.getIncompleteDir()));
423 write(new CommandMessage("in " + p.getDownloadDir()));
424 }
425 write(new GetOptionsMessage());
426 write(new SendDownloadStatusMessage());
427 write(new SendUploadStatusMessage());
428 write(new GetServerListMessage());
429
430
431 SearchManager.getInstance().add(OvernetPlugin.getInstance());
432 }
433
434 public void run()
435 {
436 try {
437 select();
438 OvernetCore.this.setState(State.DISCONNECTING);
439 OvernetCore.this.setState(State.DISCONNECTED);
440 }
441 catch (IOException ie) {
442 logger.debug("overnet core connection", ie);
443 OvernetCore.this.setState(State.DISCONNECTED,
444 ie.getLocalizedMessage());
445 }
446 }
447
448 private void select() throws IOException
449 {
450 selector = Selector.open();
451 channel.register(selector, SelectionKey.OP_CONNECT);
452
453 while (selector.select() > 0 && !die) {
454 for (Iterator i = selector.selectedKeys().iterator();
455 i.hasNext();) {
456
457 SelectionKey key = (SelectionKey)i.next();
458 i.remove();
459
460 SocketChannel sc = (SocketChannel)key.channel();
461
462 if (key.isConnectable()) {
463 if (sc.isConnectionPending()) {
464 logger.debug("finish connect");
465 try {
466 if (sc.finishConnect()) {
467 OvernetCore.this.setState(State.CONNECTED);
468 }
469 else {
470 throw new
471 IOException("finish connect failed");
472 }
473 }
474 catch (ConnectException ce) {
475 logger.debug("connect exp", ce);
476 throw new ConnectException(XNap.tr("Could not connect to overnet daemon\nThis can have several reasons:\n-overnet is not installed.This plugin requires an external program called overnet that handles the network connection. Please see http://www.overnet.com/ for more information and download links.\n-The plugin settings are incorrect, e.g. the overnet daemon port is wrong. Please check the settings under Settings -> Configure Plugins.\n-The overnet daemon is not running: Please start it manually or configure XNap to start it for you and restart XNap.\n"));
477 }
478 }
479 }
480 else if (key.isReadable()) {
481 read();
482 }
483 else if (key.isWritable()) {
484 write();
485 }
486 }
487 }
488 }
489
490 private void disconnect()
491 {
492
493
494 write(new StopDownloadStatusMessage());
495 write(new StopUploadStatusMessage());
496 write(new GetSharedFilesMessage());
497 }
498
499 /***
500 * Clean up and close channel.
501 */
502 private void close()
503 {
504 if (channel != null) {
505 try {
506 logger.debug("closing channel");
507 channel.socket().shutdownInput();
508 channel.socket().shutdownOutput();
509 channel.socket().close();
510 }
511 catch (IOException ie) {
512 logger.debug("closing channel", ie);
513 }
514 }
515 SearchManager.getInstance().remove(OvernetPlugin.getInstance());
516 }
517 }
518 }