View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net;
21  
22  import java.io.BufferedInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.net.ConnectException;
27  import java.net.Socket;
28  import java.util.Iterator;
29  import java.util.LinkedList;
30  import java.util.StringTokenizer;
31  
32  import org.apache.log4j.Logger;
33  import org.xnap.XNap;
34  import org.xnap.net.NetHelper;
35  import org.xnap.plugin.opennap.OpenNapPlugin;
36  import org.xnap.plugin.opennap.net.msg.MessageFactory;
37  import org.xnap.plugin.opennap.net.msg.MessageHandler;
38  import org.xnap.plugin.opennap.net.msg.MessageStrings;
39  import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
40  import org.xnap.plugin.opennap.net.msg.client.LoginMessage;
41  import org.xnap.plugin.opennap.net.msg.client.NewUserLoginMessage;
42  import org.xnap.plugin.opennap.net.msg.client.NickCheckMessage;
43  import org.xnap.plugin.opennap.net.msg.server.BrowseResponseMessage;
44  import org.xnap.plugin.opennap.net.msg.server.EndBrowseMessage;
45  import org.xnap.plugin.opennap.net.msg.server.EndSearchMessage;
46  import org.xnap.plugin.opennap.net.msg.server.ErrorMessage;
47  import org.xnap.plugin.opennap.net.msg.server.InvalidMessageException;
48  import org.xnap.plugin.opennap.net.msg.server.InvalidNickMessage;
49  import org.xnap.plugin.opennap.net.msg.server.LoginAckMessage;
50  import org.xnap.plugin.opennap.net.msg.server.LoginErrorMessage;
51  import org.xnap.plugin.opennap.net.msg.server.NickAlreadyRegisteredMessage;
52  import org.xnap.plugin.opennap.net.msg.server.NickNotRegisteredMessage;
53  import org.xnap.plugin.opennap.net.msg.server.SearchResponseMessage;
54  import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
55  import org.xnap.plugin.opennap.util.*;
56  import org.xnap.util.Preferences;
57  import org.xnap.util.PriorityQueue;
58  import org.xnap.util.State;
59  import org.xnap.util.StringHelper;
60  
61  public class OpenNapServerRunner extends Thread {
62  
63      //--- Constant(s) ---
64  
65      // wait 30 seconds for login ack
66      //public static final int LOGIN_TIMEOUT = 30 * 1000;
67      public static final int CONNECT_TIMEOUT = 1 * 30 * 1000;
68  
69  	public static final int MAX_RETRY_ON_CONNECTION_REFUSED = 3;
70  
71      public static final int SOCKET_TIMEOUT = 0;
72  
73      //--- Data field(s) ---
74  
75      private static Logger logger = Logger.getLogger(OpenNapServerRunner.class);
76  
77      private Socket socket;
78      private InputStream in;
79      private OutputStream out;
80  
81      private OpenNapServer server;
82  
83      /***
84       * Remembers the last <code>ErrorMessage</code>.
85       */
86      private String lastError;
87  
88      private OpenNapSearch currentSearch;
89  
90  	private OpenNapBrowse currentBrowse;
91  
92      /***
93       * Queue of pending searches.
94       */
95      private PriorityQueue searchQueue = new PriorityQueue();
96  
97      /***
98       * Queue of pending browses.
99       */
100     private LinkedList browseQueue = new LinkedList();
101 
102     /***
103      * Queue of pending messages.
104      */
105     private SendQueue sendQueue = new SendQueue(this);
106 	private Object sendLock = new Object();
107 
108     /***
109      * If set to true this runner is to die.
110      */
111     private boolean die = false;
112 
113     /***
114      * The state description why the runner has died.
115      */
116     private String dieReason;
117 
118 	private boolean useUTF8Encoding;
119 
120     //--- Constructor(s) ---
121 
122     public OpenNapServerRunner(OpenNapServer server)
123     {
124 		super("OpenNapServer " + server.toString());
125 
126 		this.server = server;
127 		this.useUTF8Encoding 
128 			= OpenNapPlugin.getPreferences().getUseUTF8Encoding();
129     }
130 
131     //--- Method(s) ---
132 
133 	public synchronized void enqueue(OpenNapBrowse browse)
134 	{
135 	  	if (die) {
136 			browse.getRequest().failed();
137 	  	}
138 		else {
139 			browseQueue.add(browse);
140 			allowBrowse();
141 		}
142 	}
143 
144     /***
145      * 
146      */
147     public synchronized void enqueue(OpenNapSearch search)
148     {
149 		if (die) {
150 			search.getRequest().failed();
151 		}
152 		else {
153 			searchQueue.add(search, search.getPriority());
154 			allowSearch();
155 		}
156     }
157 
158 	public synchronized void dequeue(OpenNapBrowse browse)
159 	{
160 	  	browseQueue.remove(browse);
161 	}
162 
163     /***
164      * Invoked by {@link OpenNapSearch} when search did not take place or
165      * was aborted.
166      */
167     public synchronized void dequeue(OpenNapSearch search)
168     {
169 		searchQueue.remove(search);
170     }
171 
172     public synchronized void die(org.xnap.util.State newState, String description)
173     {
174 		if (!die) {
175 			die = true;
176 			dieReason = description;
177 			server.setState(newState, description);
178 		}
179 		else {
180 			// ups looks like we died multiple times
181 			logger.error("Already dead");
182 		}
183     }
184 
185     public void disconnect()
186     {
187 		die = true;
188 		// FIX: ugly work around to force close immediatelly
189 		try {
190 			socket.close();
191 		}
192 		catch (Exception e) {
193 		}
194 		this.interrupt();
195     }
196 
197     private synchronized void close()
198     {
199 		if (socket != null) {
200 			try {
201 				socket.close();
202 			} catch (IOException e) {}
203 		}
204 		if (in != null) {
205 			try {
206 				in.close();
207 			} catch (IOException e) {}
208 		}
209 		if (out != null) {
210 			try {
211 				out.close();
212 			} catch (IOException e) {}
213 		}
214 
215 		if (currentBrowse != null) {
216 			currentBrowse.getRequest().failed();
217 		}
218 		for (Iterator i = browseQueue.iterator(); i.hasNext();) {
219 			((OpenNapBrowse)i.next()).getRequest().failed();
220 		}
221 
222 		if (currentSearch != null) {
223 			currentSearch.getRequest().failed();
224 		}
225 		for (Iterator i = searchQueue.iterator(); i.hasNext();) {
226 			((OpenNapSearch)i.next()).getRequest().failed();
227 		}
228 
229 		// notify all peers
230     }
231 
232     public SendQueue getSendQueue()
233     {
234 		return sendQueue;
235     }
236 
237     private byte[] read(int length) throws IOException
238     {
239 		byte[] textBuf = new byte[length];
240 	
241 		int read = 0;
242 		while (read < length) {
243 			int c = in.read(textBuf, read, length - read);
244 			if (c == -1) {
245 				throw new IOException("Connection closed");
246 			}
247 			read += c;
248 		}
249 
250 		return textBuf;
251     }
252 
253     /***
254      * Receives the next message. Invoked by {@link #runQueue()}.
255      */
256     private ServerMessage recv() 
257     {
258 		Packet p = null;
259 		while (!die) {
260 			try {
261 				p = recvPacket();
262 				return MessageFactory.create(server, p.id, p.data);
263 			}
264 			catch (IOException e) {
265 				logger.debug(server.getHost() + ":" + server.getPort() 
266 							 + " recv ", e);
267 				die(org.xnap.util.State.FAILED, (lastError != null) 
268 					? lastError 
269 					: NetHelper.getErrorMessage(e));
270 			}
271 			catch (InvalidMessageException e) {
272 				logger.error(server.getHost() + ":" + server.getPort() 
273 							 + " recv: " + p, e);
274 			}
275 		}
276 
277 		return null;
278     }
279 
280     private Packet recvPacket() throws IOException
281     {
282 		byte[] header = read(4);
283 
284 		// byte types are signed...
285 		int lo = 0xFF & (int)header[0];
286 		int hi = 0xFF & (int)header[1];
287 		int length = 0xFFFF & (hi << 8 | lo);
288 
289 		lo = 0xFF & (int)header[2];
290 		hi = 0xFF & (int)header[3];
291 		int id = 0xFFFF & (hi << 8 | lo);
292 	
293 		byte[] data = read(length);
294 		String content = (length > 0) 
295 			? (useUTF8Encoding)
296 			? StringHelper.toString(data)
297 			: new String(data)
298 			: "";
299 
300  		logger.debug("< " + server.getHost() + ":" + server.getPort()
301  					 + " (" + id + ") [" +
302  					 MessageFactory.getMessageName(id) + "] " + content);
303 
304 		return new Packet(id, content);
305     }
306 
307     /***
308      * This method should only be invoked by the associated 
309      * {@link SendQueue} object.
310      */
311     void send(ClientMessage msg) throws IOException
312     {
313 		String data = msg.getData(server.getVersion());
314         sendPacket(new Packet(msg.getType(), data));
315     }
316 
317     private void sendPacket(Packet p) throws IOException
318     {
319  		logger.debug
320  			("> " + server.getHost() + ":" + server.getPort() + " "
321  			 + MessageStrings.getMessageName(p.id) + "(" + p.id + ") " 
322  			 + p.data);
323 		
324 		try {
325 			if (out == null) {
326 				// FIX: should not happen
327 				throw new IOException(XNap.tr("Invalid stream state"));
328 			}
329 
330 			byte[] content
331 				= (useUTF8Encoding) 
332 				? p.data.getBytes("UTF-8") 
333 				: p.data.getBytes();
334 			int dataLength = content.length;
335 			byte[] data = new byte[2 + 2 + dataLength];
336 			
337 			int type = (short)p.id;
338 			int len = (short)dataLength;
339 			
340 			data[0] = (byte)len;
341 			data[1] = (byte)(len >> 8);
342 			data[2] = (byte)type;
343 			data[3] = (byte)(type >> 8);
344 			
345 			System.arraycopy(content, 0, data, 4, content.length);
346 				
347 			synchronized (sendLock) {
348 				out.write(data);
349 				out.flush();
350 			}
351 		} 
352 		catch (IOException e) {
353 			logger.debug(server.getHost() + ":" + server.getPort() +
354 						 " sendPacket " + p, e);
355 			die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
356 			throw e;
357 		}
358     }
359 
360     private void connect()
361     {
362 		try {
363 			if (server.isRedirector()) {
364 				if (fetchHost(server.getHost(), server.getPort())) {
365 					connect(null, server.getRedirectedHost(), 
366 							server.getRedirectedPort());
367 				}
368 				else {
369 					die(org.xnap.util.State.FAILED, "Invalid response");
370 				}
371 			}
372 			else {
373 				connect(server.getIP(), server.getHost(), server.getPort());
374 			}
375 		} 
376 		catch (ConnectException e) {
377 			// connection refused, we will try again later
378 			die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
379 		}
380 		catch (IOException e) {
381 			// unrecoverable error
382 			die(org.xnap.util.State.ERROR, NetHelper.getErrorMessage(e));
383 			
384 			if (server.canAutoRemove()) {
385 				OpenNapPlugin.getServerManager().remove(server);
386 			}
387 		}
388     }
389 
390     /***
391      * Opens the socket to the server and connects the streams.
392      * If host1 != null, it is tried first.
393      *
394      * @param host1 the ip address of the server or null if unknown
395      * @param host2 the hostname of the server 
396      * @param port the port of the server 
397      */
398     private void connect(String host1, String host2, int port)
399 		throws IOException
400     {
401 		if (host1 != null) {
402 			try {
403 				socket = new Socket(host1, port);
404 			}
405 			catch (IOException e) {
406 				server.setIP(null);
407 			}
408 		}
409 		if (socket == null) {
410 			for (int i = 0; i < MAX_RETRY_ON_CONNECTION_REFUSED
411 					 && socket == null; i++) {
412 
413 				try {
414 					socket = new Socket(host2, port);
415 				}
416 				catch (ConnectException e) {
417 					if (i == MAX_RETRY_ON_CONNECTION_REFUSED - 1) {
418 						throw e;
419 					}
420 					try {
421 						Thread.sleep(10);
422 					}
423 					catch (InterruptedException e2) {
424 					}
425 				}
426 			}
427 		}
428 
429 		try {
430 			socket.setSoTimeout(CONNECT_TIMEOUT);
431 			in = new BufferedInputStream(socket.getInputStream());
432 			out = socket.getOutputStream();
433 			
434 			server.setStateDescription(XNap.tr("Logging in") + "...");
435 
436 			if (server.isNewUser()) {
437 				send(new NickCheckMessage(server.getNick()));
438 			} 
439 			else {
440 				login(false);
441 			}
442 		}
443 		catch (IOException e) {
444 			die(org.xnap.util.State.FAILED, XNap.tr("Login failed"));
445 		}
446     }
447 
448     /***
449      * Connects to a redirector server and reads the host information.
450      *
451      * @return true, if successful; false, if failed
452      */
453     public boolean fetchHost(String host, int port) throws IOException
454     {
455 		Socket socket = null;
456 		InputStream in = null;
457 		try {
458 			socket = new Socket(host, port);
459 			in = new BufferedInputStream(socket.getInputStream());
460 
461 			byte[] b = new byte[1024];
462 			int c = in.read(b, 0, 1024);
463 			if (c > 0) {
464 				// chop the \n
465 				String response = new String(b, 0, c - 1);
466 				StringTokenizer t = new StringTokenizer(response, ":");
467 				if (t.countTokens() == 2) {
468 					try {
469 						server.setRedirectedHost(t.nextToken());
470 						server.setRedirectedPort
471 							(Integer.parseInt(t.nextToken()));
472 						return true;
473 					} catch (NumberFormatException e) {}
474 				}
475 			}
476 		}
477 		finally {
478 			if (socket != null) {
479 				try {
480 					socket.close();
481 				} catch (IOException e) {}
482 			}
483 			if (in != null) {
484 				try {
485 					in.close();
486 				} catch (IOException e) {}
487 			}
488 		}
489 
490 		return false;
491     }
492 
493     /***
494      * Sends the login message to the server.
495      */
496     private void login(boolean newUser) throws IOException
497     {
498 		Preferences prefs = Preferences.getInstance();
499 	
500 		String nick = server.getNick();
501 		String password = server.getPassword();
502 		int port = server.getLocalPort();
503 		String email = server.getEmail();
504 		String info = OpenNapPlugin.getPreferences().getClientInfo();
505 		int linkSpeed = OpenNapLinkType.getIndexOfType(prefs.getLinkSpeed());
506 	
507 		if (newUser) {
508 			send(new NewUserLoginMessage
509 				(nick, password, port, info, linkSpeed, email));
510 		}
511 		else {
512 			send(new LoginMessage(nick, password, port, info, linkSpeed));
513 		}
514     }
515 
516     /***
517      * Polls the input stream for messages and passes them to the
518      * {@link MessageHandler}.
519      */
520     private void runQueue() 
521     {
522 		while (!die) {
523 			ServerMessage msg = recv();
524 	    
525 			if (die) {
526 				return;
527 			}
528 
529 			if (msg instanceof NickNotRegisteredMessage) {
530 				try {
531 					login(true);
532 				}
533 				catch (IOException e) {
534 					die(org.xnap.util.State.FAILED, e.getLocalizedMessage());
535 				}
536 			}
537 			else if (msg instanceof NickAlreadyRegisteredMessage) {
538 				die(org.xnap.util.State.FAILED, XNap.tr("Nick already registered"));
539 			}
540 			else if (msg instanceof InvalidNickMessage) {
541 				die(org.xnap.util.State.FAILED, XNap.tr("Invalid nick"));
542 			}
543 			else if (msg instanceof LoginAckMessage) {
544 				server.setNewUser(false);
545 				try {
546 					socket.setSoTimeout(SOCKET_TIMEOUT);
547 				}
548 				catch (IOException e) {
549 					logger.warn("Could not set socket timeout", e);
550 				}
551 				server.setState(org.xnap.util.State.CONNECTED);
552 			}
553 			else if (msg instanceof LoginErrorMessage) {
554 				die(org.xnap.util.State.FAILED, ((LoginErrorMessage)msg).message);
555 			}
556 			else if (msg instanceof BrowseResponseMessage) {
557 				if (currentBrowse != null) {
558 					currentBrowse.received((BrowseResponseMessage)msg);
559 				}
560 			}
561 			else if (msg instanceof EndBrowseMessage) {
562 				if (currentBrowse != null) {
563 					synchronized (this) {
564 						currentBrowse.finished();
565 						currentBrowse = null;
566 					}
567 					updateStatus();
568 				}
569 			}
570 			else if (msg instanceof SearchResponseMessage) {
571 				if (currentSearch != null) {
572 					currentSearch.received((SearchResponseMessage)msg);
573 				}
574 			}
575 			else if (msg instanceof EndSearchMessage) {
576 				if (currentSearch != null) {
577 					synchronized (this) {
578 						currentSearch.finished();
579 						currentSearch = null;
580 					}
581 					updateStatus();
582 				}
583 			}
584 			else if (msg instanceof ErrorMessage) {
585 				lastError = ((ErrorMessage)msg).message;
586 				if (lastError.startsWith("You have been killed by server") 
587 					|| lastError.startsWith("You were killed by server")) {
588 					// handle server kicks
589 					die(org.xnap.util.State.FAILED, lastError);
590 				}
591 
592 			}
593 
594 			OpenNapPlugin.getMessageHandler().handle(msg);
595 
596 			allowSearch();
597 			allowBrowse();
598 		}
599     }
600 	
601     /***
602      * 
603      */
604     public void run() 
605     {
606 		Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
607 	
608 		//userByNick.clear();
609 	
610 		connect();
611 
612 		runQueue();
613 
614 		server.setState(org.xnap.util.State.DISCONNECTED, dieReason);
615 		logger.debug(server.toString() + " has died");
616     }
617 
618     private void updateStatus()
619     {
620 		StringBuffer sb = new StringBuffer(XNap.tr("connected"));
621 
622 		if (currentSearch != null) {
623 			sb.append(", ");
624 			sb.append(XNap.tr("searching"));
625 		}
626 		if (searchQueue.size() > 0) {
627 			sb.append(", ");
628 			sb.append(XNap.tr("{0} searches pending", searchQueue.size()));
629 		}
630 		if (currentBrowse != null) {
631 			sb.append(", browsing");
632 		}
633 		
634 		server.setStateDescription(sb.toString());
635     }
636 
637 	private synchronized void allowBrowse()
638 	{
639 	  	if (currentBrowse != null || browseQueue.size() == 0) {
640 	  	    return;
641 	  	}
642 		
643 	  	currentBrowse = (OpenNapBrowse)browseQueue.removeFirst();
644 		MessageHandler.send(server, currentBrowse.getRequest());
645 
646 	  	updateStatus();
647 	}
648 
649     private synchronized void allowSearch()
650     {
651 		if (currentSearch != null || searchQueue.size() == 0) {
652 			return;
653 		}
654 
655 		currentSearch = (OpenNapSearch)searchQueue.pop();
656 		MessageHandler.send(server, currentSearch.getRequest());
657 	
658 		updateStatus();
659     }
660 
661 }