1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.opennap.net.msg;
21
22 import java.util.LinkedList;
23
24 import org.apache.log4j.Logger;
25 import org.xnap.plugin.opennap.OpenNapPlugin;
26 import org.xnap.plugin.opennap.net.OpenNapServer;
27 import org.xnap.plugin.opennap.net.OpenNapServerRunner;
28 import org.xnap.plugin.opennap.net.SendQueue;
29 import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
30
31 public class MessageSender {
32
33
34
35 /***
36 * Start with this many send worker threads.
37 */
38 public static final int SEND_WORKER_COUNT = 5;
39
40
41
42 private static Logger logger = Logger.getLogger(MessageSender.class);
43
44 boolean died = false;
45
46 /***
47 * List of send queues that are waiting to be handled by a
48 * {@link SendWorker} thread. Uses the FIFO principle.
49 */
50 private LinkedList queue = new LinkedList();
51
52 private LinkedList sendWorkers = new LinkedList();
53
54
55
56 public MessageSender()
57 {
58 for (int i = 0; i < SEND_WORKER_COUNT; i++) {
59 startNewSendWorker();
60 }
61 }
62
63
64
65 public synchronized void die()
66 {
67 died = true;
68 notifyAll();
69 }
70
71 /***
72 * This should be called every once in a while. Spawns a new SendWorker
73 * if needed. We always want to have at least one SendWorker ready to
74 * rumble.
75 */
76
77
78
79
80
81
82
83
84 /***
85 * Asynchronously sends <code>msg</code> to <code>server</code>.
86 */
87 public void send(OpenNapServer server, ClientMessage msg)
88 {
89 add(server, msg);
90 }
91
92 /***
93 * Asynchronously Sends <code>msg</code> to all connected servers.
94 */
95 public void send(ClientMessage msg)
96 {
97 OpenNapServer[] servers
98 = OpenNapPlugin.getServerManager().getConnectedServers();
99 for (int i = 0; i < servers.length; i++) {
100 add(servers[i], msg);
101 }
102 }
103
104
105
106
107
108
109
110
111
112
113
114 private synchronized void add(OpenNapServer server, ClientMessage msg)
115 {
116 OpenNapServerRunner r = server.getRunner();
117 SendQueue q = (r != null) ? r.getSendQueue() : null;
118 if (q != null) {
119 q.add(msg);
120 if (q.getState() == SendQueue.STATE_EMPTY) {
121 q.setState(SendQueue.STATE_WAITING);
122 queue.addLast(q);
123 notify();
124 }
125 }
126 else {
127 msg.failed();
128 }
129 }
130
131 boolean hasDied()
132 {
133 return died;
134 }
135
136 synchronized void enqueue(SendQueue q)
137 {
138 if (q.isEmpty()) {
139 q.setState(SendQueue.STATE_EMPTY);
140 }
141 else {
142 q.setState(SendQueue.STATE_WAITING);
143 queue.addLast(q);
144 notify();
145 }
146 }
147
148 /***
149 * Called by {@link SendWorker} threads to fetch the next message queue
150 * that is awaiting service.
151 *
152 * @return null, if no message queue is awaiting service; the message queue, otherwise
153 */
154 synchronized SendQueue pop()
155 {
156
157 if (!queue.isEmpty()) {
158 SendQueue q = (SendQueue)queue.removeFirst();
159 q.setState(SendQueue.STATE_BUSY);
160 return q;
161 }
162 return null;
163 }
164
165 private void startNewSendWorker()
166 {
167 String threadName = "SendWorker " + (sendWorkers.size() + 1);
168 SendWorker worker = new SendWorker(threadName, this);
169 sendWorkers.add(worker);
170
171 logger.info(sendWorkers.size() + " SendWorkers running");
172 }
173
174 }