View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net;
21  
22  import java.io.IOException;
23  
24  import org.apache.log4j.Logger;
25  import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
26  import org.xnap.util.PriorityQueue;
27  
28  /***
29   * Provides a message queue that is used by the {@link OpenNapServerRunner}
30   * class.
31   */
32  public class SendQueue {
33  
34      //--- Constant(s) ---
35  
36      //--- Data field(s) ---
37  
38      private static Logger logger = Logger.getLogger(SendQueue.class);
39  
40      public static final int STATE_EMPTY = 1;
41      public static final int STATE_WAITING = 2;
42      public static final int STATE_BUSY = 3;
43      
44      /***
45       * Queued messages.
46       */
47      private PriorityQueue queue = new PriorityQueue();
48  
49      /***
50       * Associated runner.
51       */
52      private OpenNapServerRunner runner;
53  
54      private int state = STATE_EMPTY;
55  
56      // do not flood server
57  	//      private int maxPacketsPerTick
58  	//  	= OpenNapPreferences.getInstance().getMaxPacketsPerTick();
59  	//      private long tickLength
60  	//  	= OpenNapPreferences.getInstance().getTickLength();
61  	//      private long writeTickPacketCount = 0;
62  	//      private long writeTickStart = 0;
63  
64      //--- Constructor(s) ---
65  
66      public SendQueue(OpenNapServerRunner runner)
67      {
68  		this.runner = runner;
69      }
70      
71      //--- Method(s) ---
72  
73      /***
74       * Returns true, if a send worker needs to be notified.
75       */
76      public synchronized void add(ClientMessage msg)
77      {
78  		queue.add(msg, msg.getPriority());
79      }
80  
81      /***
82       * Worker could not send message, because runner has died. 
83       * Remove all pending messages.
84       */
85      private synchronized void clear()
86      {
87  		logger.debug("removing all " + queue.size() 
88  					 + " messages for: " + runner);
89  		ClientMessage msg;
90  		while ((msg = (ClientMessage)queue.pop()) != null) {
91  			msg.failed();
92  		}
93  		queue.clear();
94      }
95  
96      /***
97       * Returns true, if a send worker is currently sending messages 
98       * from this queue.
99       */
100     public int getState()
101     {
102 		return state;
103     }
104 
105     public boolean isEmpty()
106 	{
107     	return queue.isEmpty();
108     }
109     
110     /***
111      * Removes the top message from the queue.
112      *
113      * @return if queue is not empty, top message; null otherwise
114      */
115     public synchronized ClientMessage pop()
116     {
117 		if (queue.isEmpty()) {
118 			return null;
119 		}
120 		return (ClientMessage)queue.pop();
121     }
122 
123     public boolean send(ClientMessage msg)
124     {
125 		try {
126 			runner.send(msg);
127 			return true;
128 		}
129 		catch (IOException e) {
130 			if (msg.listener != null) {
131 				msg.listener.exceptionThrown(e);
132 			}
133 			clear();
134 			return false;
135 		}
136     }
137 
138     public void setState(int state)
139     {
140 		this.state = state;
141     }
142 
143     /***
144      * Sleeps until ready to send.
145      */
146 	//      public void waitUntilReadyToSend() 
147 	//      {
148 	//  	long tickLeft
149 	//  	    = tickLength - (System.currentTimeMillis() - writeTickStart);
150 	//  	if (tickLeft <= 0) {
151 	//  	    writeTickStart = System.currentTimeMillis();
152 	//  	    writeTickPacketCount = 0;
153 	//  	}
154 	//  	else if (writeTickPacketCount >= maxPacketsPerTick) {
155 	//  	    //logger.debug("flood protection: stalling for " 
156 	//  	    //              + tickLeft + " ms");
157 	//  	    try {
158 	//  		Thread.currentThread().sleep(tickLeft);
159 	//  	    }
160 	//  	    catch (InterruptedException e) {
161 	//  	    }
162 	//  	}
163 	//  	else {
164 	//  	    writeTickPacketCount++;
165 	//  	}
166 	//      }
167     
168 }