一个简单的ThreadPool实现,以及基于线程池的简单的Web服务器

原创
2016/09/05 12:00
阅读数 175

 一个简单的线程池

package com;

public interface ThreadPool<Job extends Runnable> {
	//执行一个JOB
	void execute(Job job);
	//关闭线程池
	void shutdown();
	//增加工作者线程
	void addWorkers(int num);
	//减少工作者线程
	void removeWorker(int num);
	//得到正在等待执行的任务数量
	int getJobSize();
}
package com;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
	//线程池最大工作者限制数
	private static final int MAX_WORKER_NUM = 10;
	//线程池最小工作者限制数
	private static final int MIN_WORKER_NUM = 1;
	//线程池默认工作者数量
	private static final int DEFAULT_WORKER_NUM = 5;
	//工作列表
	private final LinkedList<Job> jobs = new LinkedList<>();
	
	//工作者线程数量
	private int workerNum = DEFAULT_WORKER_NUM;
	//线程编号生成
	private AtomicLong threadNum = new AtomicLong();
	//工作者列表
	private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
	
	public DefaultThreadPool() {
		initiallzeWorkers(DEFAULT_WORKER_NUM);
	}
	
	public DefaultThreadPool(int num) {
		workerNum = num > MAX_WORKER_NUM ? MAX_WORKER_NUM : num < MIN_WORKER_NUM ? MIN_WORKER_NUM : num;
		initiallzeWorkers(num);
	}
	
	
	
	@Override
	public void execute(Job job) {
		if(job != null){
			//添加一个工作,然后进行通知
			synchronized (jobs) {
				jobs.addLast(job);
				jobs.notify();
			}
		}
	}

	@Override
	public void shutdown() {
		for(Worker worker:workers){
			worker.shutdown();
		}
	}

	@Override
	public void addWorkers(int num) {
		synchronized (jobs) {
			//限制新增的worker数量不能超过最大值
			if(num + this.workerNum > MAX_WORKER_NUM){
				num = MAX_WORKER_NUM-this.workerNum;
			}
			initiallzeWorkers(num);
			this.workerNum += num;
		}
	}

	@Override
	public void removeWorker(int num) {
		synchronized (jobs) {
			if(num >= this.workerNum){
				throw new IllegalArgumentException("beyond workNum");
			}
			int count = 0;
			while(count < num){
				Worker worker = workers.get(count);
				if(workers.remove(worker)){
					worker.shutdown();
					count++;
				}
			}
			this.workerNum -= count;
		}
	}

	@Override
	public int getJobSize() {
		return jobs.size();
	}
	
	//初始化工作线程工作者
	private void initiallzeWorkers(int num){
		for(int i = 0; i < num; i++){
			Worker worker = new Worker();
			workers.add(worker);
			Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
			thread.start();
		}
	}
	
	
	//工作者,消费任务
	class Worker implements Runnable{

		private volatile boolean running = true;
		
		@Override
		public void run() {
			while(running){
				Job job  = null;
				synchronized (jobs) {
					while(jobs.isEmpty()){
						try {
							jobs.wait();
						} catch (InterruptedException e) {
							//感知外部对workerThread的中断返回
							Thread.currentThread().interrupt();
							return;
						}
					}
					//取出job
					job = jobs.removeFirst();
				}
				if(job != null){
					try{
						job.run();
					}catch(Exception e){
						//忽略Job执行中的Exception
					}
				}
			}
		}
		
		public void shutdown(){
			running = false;
		}
	}

}

从线程池的实现中可以看出,当客户端调用  execute(Job) 方法时候,会不断的向任务列表 jobs 添加 job,每个工作者线程不断的从jobs中取出一个job进行执行,当jobs为空时候,工作者线程进入等待状态。

一个基于线程池技术的简单Web服务器

package com;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleHttpServer {

	//处理 HttpRequest的线程池
	static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1);
	
	//SimpleHttpServer的根路径
	static String basePath;
	static ServerSocket serverSocket;
	static int port = 8080;

	public static void setPort(int port){
		if(port>0){
			SimpleHttpServer.port = port;
		}
	}
	public static void setBasePath(String basePath){
		if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){
			SimpleHttpServer.basePath = basePath;
		}
	}
	
	public static void start() throws IOException{
		serverSocket = new ServerSocket(port);
		Socket socket;
		while((socket = serverSocket.accept())!= null){
			threadPool.execute(new HttpRequestHandler(socket));
		}
		serverSocket.close();
	}
	
	public static void main(String[] args) {
		try {
			
			start();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	static class HttpRequestHandler implements Runnable{

		private Socket socket;
		
		public HttpRequestHandler(Socket socket) {
			this.socket = socket;
		}
		
		@Override
		public void run() {
			String line = null;
			BufferedReader br = null;
			BufferedReader reader = null;
			PrintWriter out = null;
			InputStream in = null;
			
			try {
				reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
				String header = reader.readLine();
				//相对路径计算绝对路径
				String filePath = basePath + header.split(" ")[1];
				out = new PrintWriter(socket.getOutputStream());
				
				//后缀为jpg或者ico,则读取资源输出
				if(filePath.endsWith("jpg") || filePath.endsWith("ico")){
					in = new FileInputStream(filePath);
					ByteArrayOutputStream baos = new ByteArrayOutputStream();
					int i = 0;
					while( (i = in.read()) != -1 ){
						baos.write(i);
					}
					byte[] array = baos.toByteArray();
					out.println("HTTP/1.1 200 OK");
					out.println("Server: TOMCAT");
					out.println("Content-Type: image/jpeg");
					out.println("Content-Length: "+array.length);
					out.println("");
					socket.getOutputStream().write(array, 0, array.length);
				}else{
					br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
					out.println("HTTP/1.1 200 OK");
					out.println("Server: TOMCAT");
					out.println("Content-Type: text/html; charset=UTF-8");
					out.println("");
					while((line = br.readLine())!=null){
						out.println(line);
					}
				}
				out.flush();
			} catch (IOException e) {
				out.println("HTTP/1.1 500 OK");
				out.println("");
				out.flush();
			}finally {
				close(br,in,reader,out,socket);
			}
		}
		
		private static void close(Closeable... closeables){
			if(closeables != null){
				for (Closeable closeable : closeables) {
					if(closeable!= null){
						try {
							closeable.close();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				}
			}
		}
		
	}
}

 

展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
返回顶部
顶部