1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.opennap.net;
21
22 import java.io.BufferedInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.ConnectException;
27 import java.net.Socket;
28 import java.util.Iterator;
29 import java.util.LinkedList;
30 import java.util.StringTokenizer;
31
32 import org.apache.log4j.Logger;
33 import org.xnap.XNap;
34 import org.xnap.net.NetHelper;
35 import org.xnap.plugin.opennap.OpenNapPlugin;
36 import org.xnap.plugin.opennap.net.msg.MessageFactory;
37 import org.xnap.plugin.opennap.net.msg.MessageHandler;
38 import org.xnap.plugin.opennap.net.msg.MessageStrings;
39 import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
40 import org.xnap.plugin.opennap.net.msg.client.LoginMessage;
41 import org.xnap.plugin.opennap.net.msg.client.NewUserLoginMessage;
42 import org.xnap.plugin.opennap.net.msg.client.NickCheckMessage;
43 import org.xnap.plugin.opennap.net.msg.server.BrowseResponseMessage;
44 import org.xnap.plugin.opennap.net.msg.server.EndBrowseMessage;
45 import org.xnap.plugin.opennap.net.msg.server.EndSearchMessage;
46 import org.xnap.plugin.opennap.net.msg.server.ErrorMessage;
47 import org.xnap.plugin.opennap.net.msg.server.InvalidMessageException;
48 import org.xnap.plugin.opennap.net.msg.server.InvalidNickMessage;
49 import org.xnap.plugin.opennap.net.msg.server.LoginAckMessage;
50 import org.xnap.plugin.opennap.net.msg.server.LoginErrorMessage;
51 import org.xnap.plugin.opennap.net.msg.server.NickAlreadyRegisteredMessage;
52 import org.xnap.plugin.opennap.net.msg.server.NickNotRegisteredMessage;
53 import org.xnap.plugin.opennap.net.msg.server.SearchResponseMessage;
54 import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
55 import org.xnap.plugin.opennap.util.*;
56 import org.xnap.util.Preferences;
57 import org.xnap.util.PriorityQueue;
58 import org.xnap.util.State;
59 import org.xnap.util.StringHelper;
60
61 public class OpenNapServerRunner extends Thread {
62
63
64
65
66
67 public static final int CONNECT_TIMEOUT = 1 * 30 * 1000;
68
69 public static final int MAX_RETRY_ON_CONNECTION_REFUSED = 3;
70
71 public static final int SOCKET_TIMEOUT = 0;
72
73
74
75 private static Logger logger = Logger.getLogger(OpenNapServerRunner.class);
76
77 private Socket socket;
78 private InputStream in;
79 private OutputStream out;
80
81 private OpenNapServer server;
82
83 /***
84 * Remembers the last <code>ErrorMessage</code>.
85 */
86 private String lastError;
87
88 private OpenNapSearch currentSearch;
89
90 private OpenNapBrowse currentBrowse;
91
92 /***
93 * Queue of pending searches.
94 */
95 private PriorityQueue searchQueue = new PriorityQueue();
96
97 /***
98 * Queue of pending browses.
99 */
100 private LinkedList browseQueue = new LinkedList();
101
102 /***
103 * Queue of pending messages.
104 */
105 private SendQueue sendQueue = new SendQueue(this);
106 private Object sendLock = new Object();
107
108 /***
109 * If set to true this runner is to die.
110 */
111 private boolean die = false;
112
113 /***
114 * The state description why the runner has died.
115 */
116 private String dieReason;
117
118 private boolean useUTF8Encoding;
119
120
121
122 public OpenNapServerRunner(OpenNapServer server)
123 {
124 super("OpenNapServer " + server.toString());
125
126 this.server = server;
127 this.useUTF8Encoding
128 = OpenNapPlugin.getPreferences().getUseUTF8Encoding();
129 }
130
131
132
133 public synchronized void enqueue(OpenNapBrowse browse)
134 {
135 if (die) {
136 browse.getRequest().failed();
137 }
138 else {
139 browseQueue.add(browse);
140 allowBrowse();
141 }
142 }
143
144 /***
145 *
146 */
147 public synchronized void enqueue(OpenNapSearch search)
148 {
149 if (die) {
150 search.getRequest().failed();
151 }
152 else {
153 searchQueue.add(search, search.getPriority());
154 allowSearch();
155 }
156 }
157
158 public synchronized void dequeue(OpenNapBrowse browse)
159 {
160 browseQueue.remove(browse);
161 }
162
163 /***
164 * Invoked by {@link OpenNapSearch} when search did not take place or
165 * was aborted.
166 */
167 public synchronized void dequeue(OpenNapSearch search)
168 {
169 searchQueue.remove(search);
170 }
171
172 public synchronized void die(org.xnap.util.State newState, String description)
173 {
174 if (!die) {
175 die = true;
176 dieReason = description;
177 server.setState(newState, description);
178 }
179 else {
180
181 logger.error("Already dead");
182 }
183 }
184
185 public void disconnect()
186 {
187 die = true;
188
189 try {
190 socket.close();
191 }
192 catch (Exception e) {
193 }
194 this.interrupt();
195 }
196
197 private synchronized void close()
198 {
199 if (socket != null) {
200 try {
201 socket.close();
202 } catch (IOException e) {}
203 }
204 if (in != null) {
205 try {
206 in.close();
207 } catch (IOException e) {}
208 }
209 if (out != null) {
210 try {
211 out.close();
212 } catch (IOException e) {}
213 }
214
215 if (currentBrowse != null) {
216 currentBrowse.getRequest().failed();
217 }
218 for (Iterator i = browseQueue.iterator(); i.hasNext();) {
219 ((OpenNapBrowse)i.next()).getRequest().failed();
220 }
221
222 if (currentSearch != null) {
223 currentSearch.getRequest().failed();
224 }
225 for (Iterator i = searchQueue.iterator(); i.hasNext();) {
226 ((OpenNapSearch)i.next()).getRequest().failed();
227 }
228
229
230 }
231
232 public SendQueue getSendQueue()
233 {
234 return sendQueue;
235 }
236
237 private byte[] read(int length) throws IOException
238 {
239 byte[] textBuf = new byte[length];
240
241 int read = 0;
242 while (read < length) {
243 int c = in.read(textBuf, read, length - read);
244 if (c == -1) {
245 throw new IOException("Connection closed");
246 }
247 read += c;
248 }
249
250 return textBuf;
251 }
252
253 /***
254 * Receives the next message. Invoked by {@link #runQueue()}.
255 */
256 private ServerMessage recv()
257 {
258 Packet p = null;
259 while (!die) {
260 try {
261 p = recvPacket();
262 return MessageFactory.create(server, p.id, p.data);
263 }
264 catch (IOException e) {
265 logger.debug(server.getHost() + ":" + server.getPort()
266 + " recv ", e);
267 die(org.xnap.util.State.FAILED, (lastError != null)
268 ? lastError
269 : NetHelper.getErrorMessage(e));
270 }
271 catch (InvalidMessageException e) {
272 logger.error(server.getHost() + ":" + server.getPort()
273 + " recv: " + p, e);
274 }
275 }
276
277 return null;
278 }
279
280 private Packet recvPacket() throws IOException
281 {
282 byte[] header = read(4);
283
284
285 int lo = 0xFF & (int)header[0];
286 int hi = 0xFF & (int)header[1];
287 int length = 0xFFFF & (hi << 8 | lo);
288
289 lo = 0xFF & (int)header[2];
290 hi = 0xFF & (int)header[3];
291 int id = 0xFFFF & (hi << 8 | lo);
292
293 byte[] data = read(length);
294 String content = (length > 0)
295 ? (useUTF8Encoding)
296 ? StringHelper.toString(data)
297 : new String(data)
298 : "";
299
300 logger.debug("< " + server.getHost() + ":" + server.getPort()
301 + " (" + id + ") [" +
302 MessageFactory.getMessageName(id) + "] " + content);
303
304 return new Packet(id, content);
305 }
306
307 /***
308 * This method should only be invoked by the associated
309 * {@link SendQueue} object.
310 */
311 void send(ClientMessage msg) throws IOException
312 {
313 String data = msg.getData(server.getVersion());
314 sendPacket(new Packet(msg.getType(), data));
315 }
316
317 private void sendPacket(Packet p) throws IOException
318 {
319 logger.debug
320 ("> " + server.getHost() + ":" + server.getPort() + " "
321 + MessageStrings.getMessageName(p.id) + "(" + p.id + ") "
322 + p.data);
323
324 try {
325 if (out == null) {
326
327 throw new IOException(XNap.tr("Invalid stream state"));
328 }
329
330 byte[] content
331 = (useUTF8Encoding)
332 ? p.data.getBytes("UTF-8")
333 : p.data.getBytes();
334 int dataLength = content.length;
335 byte[] data = new byte[2 + 2 + dataLength];
336
337 int type = (short)p.id;
338 int len = (short)dataLength;
339
340 data[0] = (byte)len;
341 data[1] = (byte)(len >> 8);
342 data[2] = (byte)type;
343 data[3] = (byte)(type >> 8);
344
345 System.arraycopy(content, 0, data, 4, content.length);
346
347 synchronized (sendLock) {
348 out.write(data);
349 out.flush();
350 }
351 }
352 catch (IOException e) {
353 logger.debug(server.getHost() + ":" + server.getPort() +
354 " sendPacket " + p, e);
355 die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
356 throw e;
357 }
358 }
359
360 private void connect()
361 {
362 try {
363 if (server.isRedirector()) {
364 if (fetchHost(server.getHost(), server.getPort())) {
365 connect(null, server.getRedirectedHost(),
366 server.getRedirectedPort());
367 }
368 else {
369 die(org.xnap.util.State.FAILED, "Invalid response");
370 }
371 }
372 else {
373 connect(server.getIP(), server.getHost(), server.getPort());
374 }
375 }
376 catch (ConnectException e) {
377
378 die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
379 }
380 catch (IOException e) {
381
382 die(org.xnap.util.State.ERROR, NetHelper.getErrorMessage(e));
383
384 if (server.canAutoRemove()) {
385 OpenNapPlugin.getServerManager().remove(server);
386 }
387 }
388 }
389
390 /***
391 * Opens the socket to the server and connects the streams.
392 * If host1 != null, it is tried first.
393 *
394 * @param host1 the ip address of the server or null if unknown
395 * @param host2 the hostname of the server
396 * @param port the port of the server
397 */
398 private void connect(String host1, String host2, int port)
399 throws IOException
400 {
401 if (host1 != null) {
402 try {
403 socket = new Socket(host1, port);
404 }
405 catch (IOException e) {
406 server.setIP(null);
407 }
408 }
409 if (socket == null) {
410 for (int i = 0; i < MAX_RETRY_ON_CONNECTION_REFUSED
411 && socket == null; i++) {
412
413 try {
414 socket = new Socket(host2, port);
415 }
416 catch (ConnectException e) {
417 if (i == MAX_RETRY_ON_CONNECTION_REFUSED - 1) {
418 throw e;
419 }
420 try {
421 Thread.sleep(10);
422 }
423 catch (InterruptedException e2) {
424 }
425 }
426 }
427 }
428
429 try {
430 socket.setSoTimeout(CONNECT_TIMEOUT);
431 in = new BufferedInputStream(socket.getInputStream());
432 out = socket.getOutputStream();
433
434 server.setStateDescription(XNap.tr("Logging in") + "...");
435
436 if (server.isNewUser()) {
437 send(new NickCheckMessage(server.getNick()));
438 }
439 else {
440 login(false);
441 }
442 }
443 catch (IOException e) {
444 die(org.xnap.util.State.FAILED, XNap.tr("Login failed"));
445 }
446 }
447
448 /***
449 * Connects to a redirector server and reads the host information.
450 *
451 * @return true, if successful; false, if failed
452 */
453 public boolean fetchHost(String host, int port) throws IOException
454 {
455 Socket socket = null;
456 InputStream in = null;
457 try {
458 socket = new Socket(host, port);
459 in = new BufferedInputStream(socket.getInputStream());
460
461 byte[] b = new byte[1024];
462 int c = in.read(b, 0, 1024);
463 if (c > 0) {
464
465 String response = new String(b, 0, c - 1);
466 StringTokenizer t = new StringTokenizer(response, ":");
467 if (t.countTokens() == 2) {
468 try {
469 server.setRedirectedHost(t.nextToken());
470 server.setRedirectedPort
471 (Integer.parseInt(t.nextToken()));
472 return true;
473 } catch (NumberFormatException e) {}
474 }
475 }
476 }
477 finally {
478 if (socket != null) {
479 try {
480 socket.close();
481 } catch (IOException e) {}
482 }
483 if (in != null) {
484 try {
485 in.close();
486 } catch (IOException e) {}
487 }
488 }
489
490 return false;
491 }
492
493 /***
494 * Sends the login message to the server.
495 */
496 private void login(boolean newUser) throws IOException
497 {
498 Preferences prefs = Preferences.getInstance();
499
500 String nick = server.getNick();
501 String password = server.getPassword();
502 int port = server.getLocalPort();
503 String email = server.getEmail();
504 String info = OpenNapPlugin.getPreferences().getClientInfo();
505 int linkSpeed = OpenNapLinkType.getIndexOfType(prefs.getLinkSpeed());
506
507 if (newUser) {
508 send(new NewUserLoginMessage
509 (nick, password, port, info, linkSpeed, email));
510 }
511 else {
512 send(new LoginMessage(nick, password, port, info, linkSpeed));
513 }
514 }
515
516 /***
517 * Polls the input stream for messages and passes them to the
518 * {@link MessageHandler}.
519 */
520 private void runQueue()
521 {
522 while (!die) {
523 ServerMessage msg = recv();
524
525 if (die) {
526 return;
527 }
528
529 if (msg instanceof NickNotRegisteredMessage) {
530 try {
531 login(true);
532 }
533 catch (IOException e) {
534 die(org.xnap.util.State.FAILED, e.getLocalizedMessage());
535 }
536 }
537 else if (msg instanceof NickAlreadyRegisteredMessage) {
538 die(org.xnap.util.State.FAILED, XNap.tr("Nick already registered"));
539 }
540 else if (msg instanceof InvalidNickMessage) {
541 die(org.xnap.util.State.FAILED, XNap.tr("Invalid nick"));
542 }
543 else if (msg instanceof LoginAckMessage) {
544 server.setNewUser(false);
545 try {
546 socket.setSoTimeout(SOCKET_TIMEOUT);
547 }
548 catch (IOException e) {
549 logger.warn("Could not set socket timeout", e);
550 }
551 server.setState(org.xnap.util.State.CONNECTED);
552 }
553 else if (msg instanceof LoginErrorMessage) {
554 die(org.xnap.util.State.FAILED, ((LoginErrorMessage)msg).message);
555 }
556 else if (msg instanceof BrowseResponseMessage) {
557 if (currentBrowse != null) {
558 currentBrowse.received((BrowseResponseMessage)msg);
559 }
560 }
561 else if (msg instanceof EndBrowseMessage) {
562 if (currentBrowse != null) {
563 synchronized (this) {
564 currentBrowse.finished();
565 currentBrowse = null;
566 }
567 updateStatus();
568 }
569 }
570 else if (msg instanceof SearchResponseMessage) {
571 if (currentSearch != null) {
572 currentSearch.received((SearchResponseMessage)msg);
573 }
574 }
575 else if (msg instanceof EndSearchMessage) {
576 if (currentSearch != null) {
577 synchronized (this) {
578 currentSearch.finished();
579 currentSearch = null;
580 }
581 updateStatus();
582 }
583 }
584 else if (msg instanceof ErrorMessage) {
585 lastError = ((ErrorMessage)msg).message;
586 if (lastError.startsWith("You have been killed by server")
587 || lastError.startsWith("You were killed by server")) {
588
589 die(org.xnap.util.State.FAILED, lastError);
590 }
591
592 }
593
594 OpenNapPlugin.getMessageHandler().handle(msg);
595
596 allowSearch();
597 allowBrowse();
598 }
599 }
600
601 /***
602 *
603 */
604 public void run()
605 {
606 Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
607
608
609
610 connect();
611
612 runQueue();
613
614 server.setState(org.xnap.util.State.DISCONNECTED, dieReason);
615 logger.debug(server.toString() + " has died");
616 }
617
618 private void updateStatus()
619 {
620 StringBuffer sb = new StringBuffer(XNap.tr("connected"));
621
622 if (currentSearch != null) {
623 sb.append(", ");
624 sb.append(XNap.tr("searching"));
625 }
626 if (searchQueue.size() > 0) {
627 sb.append(", ");
628 sb.append(XNap.tr("{0} searches pending", searchQueue.size()));
629 }
630 if (currentBrowse != null) {
631 sb.append(", browsing");
632 }
633
634 server.setStateDescription(sb.toString());
635 }
636
637 private synchronized void allowBrowse()
638 {
639 if (currentBrowse != null || browseQueue.size() == 0) {
640 return;
641 }
642
643 currentBrowse = (OpenNapBrowse)browseQueue.removeFirst();
644 MessageHandler.send(server, currentBrowse.getRequest());
645
646 updateStatus();
647 }
648
649 private synchronized void allowSearch()
650 {
651 if (currentSearch != null || searchQueue.size() == 0) {
652 return;
653 }
654
655 currentSearch = (OpenNapSearch)searchQueue.pop();
656 MessageHandler.send(server, currentSearch.getRequest());
657
658 updateStatus();
659 }
660
661 }