1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.xnap.plugin.opennap.net;
21
22 import java.io.IOException;
23
24 import org.apache.log4j.Logger;
25 import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
26 import org.xnap.util.PriorityQueue;
27
28 /***
29 * Provides a message queue that is used by the {@link OpenNapServerRunner}
30 * class.
31 */
32 public class SendQueue {
33
34
35
36
37
38 private static Logger logger = Logger.getLogger(SendQueue.class);
39
40 public static final int STATE_EMPTY = 1;
41 public static final int STATE_WAITING = 2;
42 public static final int STATE_BUSY = 3;
43
44 /***
45 * Queued messages.
46 */
47 private PriorityQueue queue = new PriorityQueue();
48
49 /***
50 * Associated runner.
51 */
52 private OpenNapServerRunner runner;
53
54 private int state = STATE_EMPTY;
55
56
57
58
59
60
61
62
63
64
65
66 public SendQueue(OpenNapServerRunner runner)
67 {
68 this.runner = runner;
69 }
70
71
72
73 /***
74 * Returns true, if a send worker needs to be notified.
75 */
76 public synchronized void add(ClientMessage msg)
77 {
78 queue.add(msg, msg.getPriority());
79 }
80
81 /***
82 * Worker could not send message, because runner has died.
83 * Remove all pending messages.
84 */
85 private synchronized void clear()
86 {
87 logger.debug("removing all " + queue.size()
88 + " messages for: " + runner);
89 ClientMessage msg;
90 while ((msg = (ClientMessage)queue.pop()) != null) {
91 msg.failed();
92 }
93 queue.clear();
94 }
95
96 /***
97 * Returns true, if a send worker is currently sending messages
98 * from this queue.
99 */
100 public int getState()
101 {
102 return state;
103 }
104
105 public boolean isEmpty()
106 {
107 return queue.isEmpty();
108 }
109
110 /***
111 * Removes the top message from the queue.
112 *
113 * @return if queue is not empty, top message; null otherwise
114 */
115 public synchronized ClientMessage pop()
116 {
117 if (queue.isEmpty()) {
118 return null;
119 }
120 return (ClientMessage)queue.pop();
121 }
122
123 public boolean send(ClientMessage msg)
124 {
125 try {
126 runner.send(msg);
127 return true;
128 }
129 catch (IOException e) {
130 if (msg.listener != null) {
131 msg.listener.exceptionThrown(e);
132 }
133 clear();
134 return false;
135 }
136 }
137
138 public void setState(int state)
139 {
140 this.state = state;
141 }
142
143 /***
144 * Sleeps until ready to send.
145 */
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 }