View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net.msg;
21  
22  import org.xnap.plugin.opennap.net.SendQueue;
23  import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
24  
25  public class SendWorker implements Runnable
26  {
27  
28      //--- Constant(s) ---
29  
30      /***
31       * Spent that much time on a queue per run.
32       */
33      public static long MAX_TIME_PER_SERVER = 2 * 1000;
34  
35      public static long STALL_INTERVAL = 200;
36  
37      //--- Data field(s) ---
38  
39      //private static Logger logger = Logger.getLogger(SendWorker.class);
40  
41      private MessageSender parent;
42  
43      //--- Constructor(s) ---
44  
45      public SendWorker(String name, MessageSender parent)
46      {
47  		this.parent = parent;
48  
49  		Thread t = new Thread(this, name);
50  		t.start();
51      }
52      
53      //--- Method(s) ---
54  
55      public void run()
56      {
57  		Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
58  
59  		while(!parent.hasDied()) {
60  			SendQueue q = parent.pop();    
61  			if (q != null) {
62  				sendMessages(q);
63  				// make sure it is requeued in case if queue still has messages
64  				parent.enqueue(q);
65  			}
66  			else {
67  				synchronized (parent) {
68  					try {
69  						parent.wait();
70  					}
71  					catch (InterruptedException e) {
72  					}
73  				}
74  				//logger.debug("got notified");
75  			}
76  		}
77      }
78  
79  	/***
80  	 * Sends the message from q until timeout is expired.
81  	 *
82  	 * @return true, if q has more messages, meaning it should be requed
83  	 */
84      public boolean sendMessages(SendQueue q)
85      {
86  		long start = System.currentTimeMillis();
87  
88  		int stallCount = 0;
89  		while (System.currentTimeMillis() - start < MAX_TIME_PER_SERVER) {
90  			//q.waitUntilReadyToSend();
91  			ClientMessage msg = q.pop();
92  			if (msg == null) {
93  				if (stallCount == 3) {
94  					return false;
95  				}
96  				else {
97  					stallCount++;
98  					try {
99  						Thread.sleep(STALL_INTERVAL);
100 					}
101 					catch (InterruptedException e) {
102 					}
103 				}
104 			}
105 			else {
106 				if (!q.send(msg)) {
107 					return false;
108 				}
109 				else if (msg.getPriority() != ClientMessage.PRIORITY_LOW) {
110 					stallCount = 3;
111 				}
112 			}
113 		}
114 
115 		return true;
116     }
117 
118 }