View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net.msg;
21  
22  import java.util.LinkedList;
23  
24  import org.apache.log4j.Logger;
25  import org.xnap.plugin.opennap.OpenNapPlugin;
26  import org.xnap.plugin.opennap.net.OpenNapServer;
27  import org.xnap.plugin.opennap.net.OpenNapServerRunner;
28  import org.xnap.plugin.opennap.net.SendQueue;
29  import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
30  
31  public class MessageSender {
32  
33      //--- Constant(s) ---
34  
35      /***
36       * Start with this many send worker threads.
37       */
38      public static final int SEND_WORKER_COUNT = 5;
39  
40      //--- Data field(s) ---
41  
42      private static Logger logger = Logger.getLogger(MessageSender.class);
43  
44      boolean died = false;
45  
46      /***
47       * List of send queues that are waiting to be handled by a 
48       * {@link SendWorker} thread. Uses the FIFO principle.
49       */
50      private LinkedList queue = new LinkedList();
51  
52      private LinkedList sendWorkers = new LinkedList();
53  
54      //--- Constructor(s) ---
55  
56      public MessageSender()
57      {
58  		for (int i = 0; i < SEND_WORKER_COUNT; i++) {
59  			startNewSendWorker();
60  		}
61      }
62      
63      //--- Method(s) ---
64  
65      public synchronized void die()
66      {
67  		died = true;
68  		notifyAll();
69      }
70  
71      /***
72       * This should be called every once in a while. Spawns a new SendWorker 
73       * if needed. We always want to have at least one SendWorker ready to 
74       * rumble.
75       */
76  	//      public static void ensureLiveness()
77  	//      {
78  	//  	if (getInstance().lockedServers.size() 
79  	//  	    >= getInstance().sendWorkers.size()) {
80  	//  	    getInstance().startNewSendWorker();
81  	//  	}
82  	//      }
83  
84      /***
85       * Asynchronously sends <code>msg</code> to <code>server</code>.
86       */
87      public void send(OpenNapServer server, ClientMessage msg) 
88      {
89  		add(server, msg);
90      }
91  
92      /***
93       * Asynchronously Sends <code>msg</code> to all connected servers.
94       */
95      public void send(ClientMessage msg)
96      {
97  		OpenNapServer[] servers 
98  			= OpenNapPlugin.getServerManager().getConnectedServers();
99  		for (int i = 0; i < servers.length; i++) {
100 			add(servers[i], msg);
101 		}
102     }
103 
104 	//      public static void sendLater(Server server, OvernetClientMessage msg)
105 	//      {
106 	//  	getInstance().add(server, msg, true, false);
107 	//      }
108 
109 	//      public static void sendPending(Server server)
110 	//      {
111 	//  	getInstance().notifyServer(server);
112 	//      }
113 
114     private synchronized void add(OpenNapServer server, ClientMessage msg) 
115     {
116 		OpenNapServerRunner r = server.getRunner();
117 		SendQueue q = (r != null) ?  r.getSendQueue() : null;
118 		if (q != null) {
119 			q.add(msg);
120 			if (q.getState() == SendQueue.STATE_EMPTY) {
121 	    		q.setState(SendQueue.STATE_WAITING);
122 	    		queue.addLast(q);
123 	    		notify();
124 			}
125 		}
126 		else {
127 			msg.failed();
128 		}
129     }
130 
131     boolean hasDied()
132     {
133 		return died;
134     }
135 
136     synchronized void enqueue(SendQueue q)
137     {
138     	if (q.isEmpty()) {
139     		q.setState(SendQueue.STATE_EMPTY);
140     	}
141     	else {
142     		q.setState(SendQueue.STATE_WAITING);
143     		queue.addLast(q);
144     		notify();
145     	}
146     }
147 
148     /***
149      * Called by {@link SendWorker} threads to fetch the next message queue 
150      * that is awaiting service.
151      * 
152      * @return null, if no message queue is awaiting service; the message queue, otherwise 
153      */
154     synchronized SendQueue pop()
155     {
156 		//logger.debug("next: queue size: " + queue.size());
157     	if (!queue.isEmpty()) {
158     		SendQueue q = (SendQueue)queue.removeFirst();
159     		q.setState(SendQueue.STATE_BUSY);
160     		return q;
161     	}
162 		return null; 
163     }
164 
165     private void startNewSendWorker()
166     {
167 		String threadName = "SendWorker " + (sendWorkers.size() + 1);
168 		SendWorker worker = new SendWorker(threadName, this);
169 		sendWorkers.add(worker);
170 
171 		logger.info(sendWorkers.size() + " SendWorkers running");
172     }
173 
174 }