View Javadoc

1   /*
2    *  XNap - A P2P framework and client.
3    *
4    *  See the file AUTHORS for copyright information.
5    *
6    *  This program is free software; you can redistribute it and/or modify
7    *  it under the terms of the GNU General Public License as published by
8    *  the Free Software Foundation.
9    *
10   *  This program is distributed in the hope that it will be useful,
11   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   *  GNU General Public License for more details.
14   *
15   *  You should have received a copy of the GNU General Public License
16   *  along with this program; if not, write to the Free Software
17   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   */
19  
20  package org.xnap.plugin.opennap.net;
21  
22  import java.io.BufferedInputStream;
23  import java.io.EOFException;
24  import java.io.FileOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.io.*;
29  import java.io.RandomAccessFile;
30  import java.net.*;
31  
32  import org.apache.log4j.Logger;
33  import org.xnap.XNap;
34  import org.xnap.net.NetHelper;
35  import org.xnap.plugin.opennap.net.msg.MessageHandler;
36  import org.xnap.plugin.opennap.net.msg.client.*;
37  import org.xnap.plugin.opennap.net.msg.client.DownloadingFileMessage;
38  import org.xnap.plugin.opennap.user.OpenNapUser;
39  import org.xnap.util.IllegalOperationException;
40  import org.xnap.util.State;
41  
42  /***
43   * Takes care of the socket communication to the other peer. Spawned
44   * by {@link OpenNapDownload} once an incoming socket was received (for
45   * firewalled downloads) or a DownloadAckMessage was received.
46   */
47  public class OpenNapDownloadRunner implements Runnable {
48  
49  	//--- Constant(s) ---
50  
51      /***
52       * Socket timeout during connect.
53       */
54      public static final int CONNECT_TIMEOUT = 1 * 30 * 1000;
55  
56      /***
57       * Abort transfer if stalled for this long.
58       */
59      public static final int TRANSFER_TIMEOUT = 2 * 60 * 1000;
60  
61      /***
62       * Maximum time to wait for segment.
63       */
64      public static final int SEGMENT_MAX_WAIT = 3 * 60 * 1000;
65  
66  	//--- Data field(s) ---
67  
68  	private static Logger logger = Logger.getLogger(OpenNapDownloadRunner.class);
69  
70  	private OpenNapDownload parent;
71  	private boolean die;
72  
73      private Socket socket;
74      private InputStream in;
75      private OutputStream out;
76  	private OpenNapSegment segment;
77  
78  	//--- Constructor(s) ---
79  	
80  	public OpenNapDownloadRunner(OpenNapDownload parent, DownloadSocket d) 
81  	{
82  		this.parent = parent;
83  
84  		if (d != null) {
85  			this.socket = d.socket;
86  			this.in = d.in;
87  		}
88  
89  	}
90  
91  	//--- Method(s) ---
92  
93  	public void run()
94  	{
95  		try {
96  			try {
97  				try {
98  					download();
99  					parent.setState(State.SUCCEEDED);
100 				}
101 				finally {
102 					parent.getParent().done(parent);
103 				}
104 			}
105 			catch (FileNotFoundException e) {
106 				logger.debug("download failed", e);
107 				parent.setState(State.FAILED, 
108 								XNap.tr("Could not create incomplete file"));
109 			}
110 			catch (IOException e) {
111 				logger.debug("download failed", e);
112 				parent.setState(State.FAILED, NetHelper.getErrorMessage(e));
113 			}
114 			catch (InterruptedException e) {
115 				logger.debug("download aborted");
116 				parent.setState(State.NOT_STARTED, parent.getStatus());
117 			}
118 		}
119 		catch (IllegalOperationException e) {
120 			logger.error("unexpected state", e);
121 		}
122 	}
123 
124 	public void stop()
125 	{
126 		die = true;
127 	}
128 
129     private void close() 
130     {
131         try {
132 			if (socket != null)
133 				socket.close();
134 			if (in != null)
135 				in.close();
136 			if (out != null)
137 				out.close();
138         } 
139 		catch (IOException e) {
140         }
141     }
142 
143     /***
144      * Opens socket and requests file.
145      */
146     private void connect(String ip, int port) 
147 		throws IOException, InterruptedException
148     {
149 		logger.debug("opening socket to " + ip + ":" + port);
150 
151 		//socket = new Socket(ip, port);
152 		try {
153 			socket = NetHelper.connect(ip, port, CONNECT_TIMEOUT);
154 		}
155 		catch (SocketException e) {
156 			MessageHandler.send
157 				(parent.getServer(), 
158 				 new DataPortErrorMessage(parent.getPeer().getName()));
159 			throw e;
160 		}
161 		socket.setSoTimeout(CONNECT_TIMEOUT);
162 
163 		out = socket.getOutputStream();
164 		in = new BufferedInputStream(socket.getInputStream());
165 
166 		parent.setStateDescription(XNap.tr("Negotiating") + "...");
167 
168 		// read magic number '1'
169 		logger.debug("reading magic number");
170 		char c = (char)in.read();
171 		if (c != '1') {
172 			throw new IOException(XNap.tr("Invalid request"));
173 		}
174 
175 		parent.setStateDescription(XNap.tr("Sending get request") + "...");
176 	
177 		// get request needs to be split over 2 packets
178 		String message = "GET";
179 	
180 		out.write(message.getBytes());
181 		out.flush();
182 
183 		parent.setStateDescription(XNap.tr("Waiting for segment") + "...");
184 		requestSegment();
185 
186 		parent.setStateDescription(XNap.tr("Sending file request") + "...");
187 	
188 		message = parent.getServer().getLocalPeer().getName() + " " 
189 			+ "\"" + parent.getResult().getFilename() + "\"" + " " 
190 			+ segment.getDownloadOffset();
191 	
192 		logger.debug("sending request: " + message);
193 
194 		out.write(message.getBytes());
195 		out.flush();
196 	
197 		String expected = segment.getTotal() + "";
198 		StringBuffer sb = new StringBuffer();
199 		while (sb.length() < expected.length()) {
200 			int b = in.read();
201 	    
202 			if (b == -1) {
203 				throw new IOException(XNap.tr("Socket error"));
204 			}
205 
206 			c = (char)b;
207 			if (Character.isDigit(c)) {
208 				// ignore leading zeros
209 				if (segment.getTotal() == 0 || sb.length() != 0 
210 					|| c != '0') {
211 					sb.append(c);
212 				}
213 			}
214 			else if (c == 'F') {
215 				throw new IOException(XNap.tr("File not shared"));
216 			}
217 			else {
218 				throw new IOException(XNap.tr("Invalid request"));
219 			}
220 		}
221 
222 		logger.debug("file length: " + sb.toString());
223 	
224 		if (Long.parseLong(sb.toString()) != segment.getTotal()) {
225 			throw new IOException(XNap.tr("Filesizes did not match"));
226 		}
227     }
228 
229 	private void connect() throws IOException, InterruptedException
230 	{
231 		socket.setSoTimeout(CONNECT_TIMEOUT);
232 		out = socket.getOutputStream();
233 		
234 		requestSegment();
235 
236 		// write offset
237 		Long offset = new Long(segment.getDownloadOffset());
238 		out.write(offset.toString().getBytes());
239 		out.flush();
240 	}
241 
242 	private void requestSegment() throws IOException, InterruptedException
243 	{
244 		// spin lock for segment
245 		socket.setSoTimeout(SEGMENT_MAX_WAIT + 30 * 1000);
246 
247 		long startTime = System.currentTimeMillis();
248 		while (segment == null 
249 			   && System.currentTimeMillis() - startTime < SEGMENT_MAX_WAIT) {
250 			// request segment for offset
251 			segment = parent.requestSegment();
252 			if (segment != null) {
253 				return;
254 			}
255 
256 			Thread.sleep(500);
257 		}
258 
259 		throw new IOException(XNap.tr("No segment available"));
260 	}
261 
262 	private void download() throws IOException, InterruptedException
263 	{
264 		boolean messageSent = false;
265 		try {
266 			if (socket == null) {
267 				OpenNapUser user = (OpenNapUser)parent.getPeer();
268 				connect(user.getHost(), user.getPort());
269 			}
270 			else {
271 				connect();
272 			}
273 
274 			if (segment.getFile() == null) {
275 				segment.setFile(parent.createIncompleteFile());
276 			}
277 
278 			parent.setState(State.DOWNLOADING);
279 			
280 			MessageHandler.send(new DownloadingFileMessage());
281 			messageSent = true;
282 
283 			// compare a few bytes
284 			int byteCount = segment.getOverlap();
285 			if (byteCount > 0) {
286 				match(segment.getFileOffset(), byteCount);
287 			}
288 
289 			downloadToFile();
290 		}
291 		catch (IOException e) {
292 			try {
293 				if (segment != null) {
294 					parent.returnSegment(segment);
295 				}
296 			}
297 			catch (IOException e2) {
298 				logger.debug("segment merge failed", e2);
299 			}
300 			
301 			// we propagate the first exception
302 			throw e;
303 		}
304 		catch (InterruptedException e) {
305 			try {
306 				if (segment != null) {
307 					parent.returnSegment(segment);
308 				}
309 			}
310 			catch (IOException e2) {
311 				logger.debug("segment merge failed", e2);
312 			}
313 			
314 			// we propagate the first exception
315 			throw e;
316 		}
317 		finally {
318 			close();
319 
320 			if (messageSent) {
321 				MessageHandler.send(new DownloadCompleteMessage());
322 			}
323 		}
324 
325 		parent.returnSegment(segment);
326 	}
327 
328 	private void match(long offset, int byteCount) 
329 		throws IOException, InterruptedException
330 	{
331 		RandomAccessFile fileIn = null;
332 		try {
333 			fileIn = new RandomAccessFile(segment.getFile(), "r");
334 			
335 			byte[] fileData = new byte[byteCount];
336 			fileIn.seek(offset);
337 			fileIn.readFully(fileData);
338 			
339 			byte[] downloadData = new byte[byteCount];
340 			readFully(downloadData, downloadData.length);
341 				
342 			for (int i = 0; i < byteCount; i++) {
343 				if (fileData[i] != downloadData[i]) {
344 					throw new IOException(XNap.tr("resume failed at {0}",
345 												  new Integer(byteCount)));
346 				}
347 			}
348 		}
349 		finally {
350 			if (fileIn != null) {
351 				try {
352 					fileIn.close();
353 				}
354 				catch (IOException e) {
355 				}
356 			}
357 		}
358 	}
359 
360 
361 	/***
362 	 *
363 	 * <p>PRECONDITION<br>
364 	 * socket != null, in != null, out != null
365 	 */
366 	private void downloadToFile() throws IOException, InterruptedException
367 	{
368 		FileOutputStream fileOut = null;
369 		try {
370 			fileOut = new FileOutputStream
371 				(segment.getFile().getAbsolutePath(), true);
372 	
373 			// we need to catch aborts for slow connects quickly
374 			byte[] data = new byte[512];
375 			
376 			long transferred = 0;
377 			long toTransfer = segment.getEnd() - segment.getStart();
378 			while (toTransfer > 0) {
379 				int len = (int)Math.min(toTransfer, data.length);
380 				readFully(data, len);
381 				
382 				len = segment.commitToFile(len);
383 				parent.commit(len);
384 				try {
385 					fileOut.write(data, 0, len);
386 				}
387 				catch (IOException e) {
388 					// FIX: how many bytes were acctually written?
389 					segment.commitToFile(-len);
390 					throw e;
391 				}
392 
393 				toTransfer = segment.getRemaining();
394 			}
395 		}
396 		finally {
397 			if (fileOut != null) {
398 				try {
399 					fileOut.close();
400 				}
401 				catch (IOException e) {
402 				}
403 			}
404 		}
405 	}
406 
407 	private void readFully(byte[] data, int len) 
408 		throws IOException, InterruptedException
409 	{
410 		int byteCount = 0;
411 		long startTime = System.currentTimeMillis();
412 		while (byteCount < len) {
413 			if (die) {
414 				throw new InterruptedException();
415 			}
416 			else if (byteCount == 0 && (System.currentTimeMillis() - startTime
417 										> TRANSFER_TIMEOUT)) {
418 				throw (new IOException(XNap.tr("socket timeout")));
419 			}
420 
421 			try {
422 				// blocks for at most SOCKET_TIMEOUT
423 				int read = in.read(data, byteCount, len - byteCount);
424 				if (read == -1) {
425 					throw new EOFException();
426 				}
427 				byteCount += read;
428 			}
429 			catch (InterruptedIOException e) {
430 			}
431 		} 
432 	}
433 
434 }
435