java Socket I/O读写异步分离

原创
2015/09/21 11:39
阅读数 7.3K

参考:Java UDP 广播与多播

在Android开发中,基于Socket的通信必须是非阻塞式的,因此需要Reader和Writer异步分离,此外需要监听Socket网络状态、

监听接口

package com.io.sockets;

import java.io.IOException;
import java.net.Socket;

public interface SocketStatusListener
{
	public static final int STATUS_OPEN = 0x01<<0;
	public static final int STATUS_CLOSE = 0x01<<1;
	public static final int STATUS_RESET = 0x01<<2;
	public static final int STATUS_PIP_BROKEN = 0x01<<3;
	public static final int STATUS_UNKOWN = 0x01<<4;
		
	public void onSocketStatusChanged(Socket socket,int status,IOException e);
}

Handler读写处理控制

package com.io.sockets;

import java.io.IOException;
import java.net.Socket;

public  class SocketHandler implements SocketStatusListener {

		private Socket socket=null;
		
		private ReaderTask reader;

		private WriterTask writer;
		
		public SocketHandler(Socket socket) throws IOException {
			
			this.socket = socket;
			this.socket.setTcpNoDelay(true);
			reader = new ReaderTask(socket);
			writer = new WriterTask(socket);
			onSocketStatusChanged(socket, STATUS_OPEN, null);
		}
		
		
		/**
		 * sendMessage:(这里用一句话描述这个方法的作用). <br/>
		 * TODO(这里描述这个方法适用条件 – 可选).<br/>
		 */
		public  void sendMessage(String msg) {
			writer.send(msg);
		}
		
		public void listen(boolean isListen)
		{
			reader.startListener(this);
			
		}
		
		public void shutDown() {
			
			if(!socket.isClosed() &&socket.isConnected())
			{
				try {
					writer.finish();
					reader.finish();
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
					onSocketStatusChanged(socket, STATUS_CLOSE, e);
				}finally{
					reader = null;
					writer = null;
					System.out.println("Socket连接已关闭!!");
				}
			}
			
		}

		@Override
		public void onSocketStatusChanged(Socket socket,int status, IOException e) {
			
			switch (status) {
			
				case SocketStatusListener.STATUS_CLOSE:
				case SocketStatusListener.STATUS_RESET:
				case SocketStatusListener.STATUS_PIP_BROKEN:
					shutDown();
				break;

			default:
				break;
			}
		}
		
	}

读取任务

package com.io.sockets;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;

public  class ReaderTask extends Thread{

		private SocketStatusListener socketStatusListener;
		
		private BufferedReader bufferedReader;
		
		private Socket socket;

		private boolean listening;
		
		public ReaderTask(Socket socket) throws IOException
		{
			bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			this.socket = socket;
		}
		
		/**
		 * finish:(这里用一句话描述这个方法的作用). <br/>
		 * TODO(这里描述这个方法适用条件 – 可选).<br/>
		 * @throws IOException 
		 *
		 */
		public void finish() throws IOException
		{
			listening = false;
			interrupt();
			if(bufferedReader!=null && socket!=null)
			{
				if(socket.isInputShutdown())
				{
					socket.shutdownInput();
				}
				bufferedReader.close();
			}
		}
		
		/* (non-Javadoc)
		 * @see java.lang.Runnable#run()
		 */
		@Override
		public synchronized  void run() 
		{		
				while (listening) 
				{
					String readStr = null;
					try {
						while((readStr=bufferedReader.readLine())!=null)
						{
							System.err.println("[Server]:"+readStr);
						}
					} catch (IOException e) {
						listening = false;
						if(socketStatusListener!=null)
						{
							int status = parseSocketStatus(e);
							socketStatusListener.onSocketStatusChanged(socket, status, e);
						}
						e.printStackTrace();
						return;//终止线程继续运行,这里也可以使用continue
					}
							
				}
		}
		
		private int parseSocketStatus(IOException e)
		{
			if(SocketException.class.isInstance(e))
			{
				String msg = e.getLocalizedMessage().trim();
				if("Connection reset".equalsIgnoreCase(msg))
				{
					return SocketStatusListener.STATUS_RESET;
				}
				else if("Socket is closed".equalsIgnoreCase(msg))
				{
					return SocketStatusListener.STATUS_CLOSE;
				}
				else if("Broken pipe".equalsIgnoreCase(msg))
				{
					return SocketStatusListener.STATUS_PIP_BROKEN;
				}
				
			}
			return SocketStatusListener.STATUS_UNKOWN;
		}
		
		/**
		 * listen:(这里用一句话描述这个方法的作用). <br/>
		 * TODO(这里描述这个方法适用条件 – 可选).<br/>
		 *
		 */
		public void startListener(SocketStatusListener ssl) {
			listening = true;
			this.socketStatusListener = ssl;
			start();
		}
		
	}

写入任务

package com.io.sockets;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;

public  class WriterTask extends Thread{

		private BufferedWriter bufferedWriter;
		
		private String msg = null;
		
		private Socket socket = null;
	
		public WriterTask(Socket socket) throws IOException {
			this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
			this.socket = socket;
		}
		
		/**
		 * finishTask:(这里用一句话描述这个方法的作用). <br/>
		 * TODO(这里描述这个方法适用条件 – 可选).<br/>
		 * @throws IOException 
		 *
		 */
		public void finish() throws IOException {
			if(bufferedWriter!=null && socket!=null)
			{
				if(!socket.isOutputShutdown())
				{
					socket.shutdownOutput();
				}
				bufferedWriter.close();
			}
		}
		
		/* (non-Javadoc)
		 * @see java.lang.Runnable#run()
		 */
		@Override
		public synchronized void run() {
			try {
				bufferedWriter.write(msg);
				bufferedWriter.flush();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
		public void send(String msg){
			this.msg = msg;
			new Thread(this).start();
		}
		
	}


Server端测试

public class TcpSocketServer {

	public static void main(String[] args) {
		
		List<SocketHandler> serverHandlers = new CopyOnWriteArrayList<SocketHandler>();
		ServerSocket serverSocket = null;
		try {
			serverSocket = new ServerSocket(8090, 5);
			while(true)
			{
			Socket clientSocket = serverSocket.accept();
			if(clientSocket.isConnected()) 
			{
				SocketHandler serverHandler = new SocketHandler(clientSocket);
				serverHandlers.add(serverHandler);
				serverHandler.listen(true);
				
				serverHandler.sendMessage("Host:"+serverSocket.getInetAddress().getHostAddress()+"\r\n");
				
				/*while (true) 
				{
					Scanner sc = new Scanner(System.in);
					String next = sc.nextLine()+"\r\n";
					for (SocketHandler scItem : serverHandlers) {
						scItem.sendMessage(next);
					}
				}*/
			}
			
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			try {
				for (SocketHandler serverHandler : serverHandlers) 
				{
					serverHandler.shutDown();
				}
				serverHandlers.clear();
				serverSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	
	
}

客户端测试

public class TcpSocketClient {

		public static void main(String[] args) {
			
			SocketHandler clientHandler = null;
			try {
				Socket clientSocket = new Socket("localhost", 8090);
				clientSocket.setKeepAlive(true);
				clientSocket.setTcpNoDelay(true);
				
				if(clientSocket.isConnected()) 
				{
					 clientHandler = new SocketHandler(clientSocket);
					 clientHandler.listen(true);
					 
					 while (true) 
					 {
						Scanner sc = new Scanner(System.in);
						String next = sc.nextLine()+"\r\n";
						if(!clientSocket.isClosed())
						{
							clientHandler.sendMessage(next);
						}else{
							break;
						}
					}
				}
				
			} catch (IOException e) {
				e.printStackTrace();
			}finally{
				clientHandler.shutDown();
			}
			
		}
		
		
	}


展开阅读全文
加载中

作者的其它热门文章

打赏
1
1 收藏
分享
打赏
0 评论
1 收藏
1
分享
返回顶部
顶部