以文本方式查看主题

-  中文XML论坛 - 专业的XML技术讨论区  (http://bbs.xml.org.cn/index.asp)
--  『 Java/Eclipse 』  (http://bbs.xml.org.cn/list.asp?boardid=41)
----  简单线程池的实现  (http://bbs.xml.org.cn/dispbbs.asp?boardid=41&rootid=&id=84109)


--  作者:卷积内核
--  发布时间:4/2/2010 11:21:00 AM

--  简单线程池的实现
最近看了下JAVA线程相关的资料,顺便写了个自己的线程池的实现方式,记录一下。


首先定义一个简单的任务:
Java代码
/**  
* Task  
* Created on: 2008-9-29 上午10:29:18  
* Description:   
*/  
package cn.edu.ccnu.inc.ivan.thread;   
  
/**  
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>  
*/  
public interface Task {   
  
    public int getNum();   
    public void execute();   
}  

/**
* Task
* Created on: 2008-9-29 上午10:29:18
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public interface Task {

 public int getNum();
 public void execute();
}

其次定义工作线程,该线程用于执行任务:
Java代码
/**  
* WorkThread  
* Created on: 2008-9-29 上午10:30:06  
* Description:   
*/  
package cn.edu.ccnu.inc.ivan.thread;   
  
import java.util.Queue;   
  
/**  
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>  
*/  
public class WorkThread extends Thread {   
       
    /**  
     * 线程关闭的标识位  
     */  
    private boolean shutDown = false;   
       
    /**  
     * 线程池管理器  
     */  
    ThreadPoolManager mgr;   
       
    /**  
     * 任务队列  
     */  
    private Queue<Task> taskQueue;   
       
    public WorkThread(ThreadPoolManager mgr, Queue<Task> taskQueue, String name) {   
        super(name);   
        this.mgr = mgr;   
        this.taskQueue = taskQueue;   
    }   
       
    public void run() {   
        while(!shutDown) {   
            Task task;   
            // 如果任务队列不为空,则取出一个任务并开始执行,否则线程等等   
            if(!taskQueue.isEmpty()) {   
                synchronized(taskQueue) {   
                    task = taskQueue.poll();   
                }   
                task.execute();   
                // 任务执行完毕之后释放线程到空闲线程队列中   
                mgr.releaseThread(this);   
            } else {   
                try {   
                    synchronized(taskQueue) {   
                        taskQueue.wait();   
                    }   
                } catch (InterruptedException e) {   
                    e.printStackTrace();   
                }   
            }   
        }   
    }   
       
    public void shutDown() {   
        this.shutDown = true;   
    }   
}  

/**
* WorkThread
* Created on: 2008-9-29 上午10:30:06
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

import java.util.Queue;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class WorkThread extends Thread {
 
 /**
  * 线程关闭的标识位
  */
 private boolean shutDown = false;
 
 /**
  * 线程池管理器
  */
 ThreadPoolManager mgr;
 
 /**
  * 任务队列
  */
 private Queue<Task> taskQueue;
 
 public WorkThread(ThreadPoolManager mgr, Queue<Task> taskQueue, String name) {
  super(name);
  this.mgr = mgr;
  this.taskQueue = taskQueue;
 }
 
 public void run() {
  while(!shutDown) {
   Task task;
   // 如果任务队列不为空,则取出一个任务并开始执行,否则线程等等
   if(!taskQueue.isEmpty()) {
    synchronized(taskQueue) {
     task = taskQueue.poll();
    }
    task.execute();
    // 任务执行完毕之后释放线程到空闲线程队列中
    mgr.releaseThread(this);
   } else {
    try {
     synchronized(taskQueue) {
      taskQueue.wait();
     }
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }
 }
 
 public void shutDown() {
  this.shutDown = true;
 }
}


最后就是其核心类,线程池管理器:
Java代码
/**  
* ThreadPoolManager  
* Created on: 2008-9-29 上午10:34:09  
* Description:   
*/  
package cn.edu.ccnu.inc.ivan.thread;   
import java.util.Queue;   
import java.util.concurrent.ConcurrentLinkedQueue;   
  
/**  
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>  
*/  
public class ThreadPoolManager {   
  
    public static int DEFAULT_POOL_SIZE = 5;   
    public static int POOL_SIZE = 0;   
       
    /**  
     * 空闲线程  
     */  
    private Queue<WorkThread> idleThread;   
       
    /**  
     * 任务队列  
     */  
    private Queue<Task> taskQueue;   
       
    /**  
     * 线程池大小  
     */  
    private int poolSize;   
       
    public ThreadPoolManager() {   
        this(DEFAULT_POOL_SIZE);   
    }   
       
    public ThreadPoolManager(int poolSize) {   
        if(poolSize < 0) {   
            this.poolSize = DEFAULT_POOL_SIZE;   
        } else {   
            this.poolSize = poolSize;   
        }   
        idleThread = new ConcurrentLinkedQueue<WorkThread>();   
        taskQueue = new ConcurrentLinkedQueue<Task>();   
        init();   
    }   
       
    /**  
     * 初始化线程池,新建 N 个空闲线程  
     *  
     */  
    private void init() {   
        System.out.println("Start up thread pool...");   
        synchronized(taskQueue) {   
            for(int i=0; i < poolSize; i++) {   
                WorkThread workThread = new WorkThread(this, taskQueue, "Thread " + i);   
                idleThread.add(workThread);   
                POOL_SIZE++;   
                workThread.start();   
            }   
        }   
    }   
       
    /**  
     * 关闭线程池,关闭线程池中各个线程  
     * 在调用该方法后,线程并没有马上关闭,而是在线程任务执行完之后关闭  
     *  
     */  
    public void shutDown() {   
        System.out.println("Shut down all work thread...");   
        synchronized(taskQueue) {   
            for(WorkThread thread : idleThread) {   
                thread.shutDown();   
            }   
        }   
    }   
       
    /**  
     * 添加任务并唤醒各因无任务而等待的空闲线程  
     * @param task  
     * @throws Exception  
     */  
    public void addTask(Task task) throws Exception {   
        synchronized(taskQueue) {   
            taskQueue.add(task);   
            taskQueue.notifyAll();   
        }   
    }   
       
//  public void schedule() throws Exception {   
//      while(!shutDown) {   
//          if(!taskQueue.isEmpty()) {   
//              WorkThread workThread = getIdleThread();   
//              synchronized(workThread) {   
//                  workThread.notifyAll();   
//              }   
//          } else {   
//              taskQueue.wait();   
//          }   
//      }   
//  }   
       
    /**  
     * 获取空闲线程,当线程池内无空闲线程时等待  
     * @return  
     * @throws Exception  
     */  
    public WorkThread getIdleThread() throws Exception {   
        if(idleThread.isEmpty()) {   
            System.out.println("No idle thread in pool, please wait...");   
            idleThread.wait();   
        }   
        synchronized(idleThread) {   
            return idleThread.poll();   
        }   
    }   
       
    /**  
     * 释放线程  
     * @param thread  
     */  
    public void releaseThread(WorkThread thread) {   
        System.out.println("Release the thread [" + thread.getName() + "] to the pool...");   
        synchronized(idleThread) {   
            idleThread.add(thread);   
            idleThread.notifyAll();   
        }   
    }   
}  

/**
* ThreadPoolManager
* Created on: 2008-9-29 上午10:34:09
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class ThreadPoolManager {

 public static int DEFAULT_POOL_SIZE = 5;
 public static int POOL_SIZE = 0;
 
 /**
  * 空闲线程
  */
 private Queue<WorkThread> idleThread;
 
 /**
  * 任务队列
  */
 private Queue<Task> taskQueue;
 
 /**
  * 线程池大小
  */
 private int poolSize;
 
 public ThreadPoolManager() {
  this(DEFAULT_POOL_SIZE);
 }
 
 public ThreadPoolManager(int poolSize) {
  if(poolSize < 0) {
   this.poolSize = DEFAULT_POOL_SIZE;
  } else {
   this.poolSize = poolSize;
  }
  idleThread = new ConcurrentLinkedQueue<WorkThread>();
  taskQueue = new ConcurrentLinkedQueue<Task>();
  init();
 }
 
 /**
  * 初始化线程池,新建 N 个空闲线程
  *
  */
 private void init() {
  System.out.println("Start up thread pool...");
  synchronized(taskQueue) {
   for(int i=0; i < poolSize; i++) {
    WorkThread workThread = new WorkThread(this, taskQueue, "Thread " + i);
    idleThread.add(workThread);
    POOL_SIZE++;
    workThread.start();
   }
  }
 }
 
 /**
  * 关闭线程池,关闭线程池中各个线程
  * 在调用该方法后,线程并没有马上关闭,而是在线程任务执行完之后关闭
  *
  */
 public void shutDown() {
  System.out.println("Shut down all work thread...");
  synchronized(taskQueue) {
   for(WorkThread thread : idleThread) {
    thread.shutDown();
   }
  }
 }
 
 /**
  * 添加任务并唤醒各因无任务而等待的空闲线程
  * @param task
  * @throws Exception
  */
 public void addTask(Task task) throws Exception {
  synchronized(taskQueue) {
   taskQueue.add(task);
   taskQueue.notifyAll();
  }
 }
 
// public void schedule() throws Exception {
//  while(!shutDown) {
//   if(!taskQueue.isEmpty()) {
//    WorkThread workThread = getIdleThread();
//    synchronized(workThread) {
//     workThread.notifyAll();
//    }
//   } else {
//    taskQueue.wait();
//   }
//  }
// }
 
 /**
  * 获取空闲线程,当线程池内无空闲线程时等待
  * @return
  * @throws Exception
  */
 public WorkThread getIdleThread() throws Exception {
  if(idleThread.isEmpty()) {
   System.out.println("No idle thread in pool, please wait...");
   idleThread.wait();
  }
  synchronized(idleThread) {
   return idleThread.poll();
  }
 }
 
 /**
  * 释放线程
  * @param thread
  */
 public void releaseThread(WorkThread thread) {
  System.out.println("Release the thread [" + thread.getName() + "] to the pool...");
  synchronized(idleThread) {
   idleThread.add(thread);
   idleThread.notifyAll();
  }
 }
}

测试起来就比较简单,为了模拟线程效果,在任务中只是打印一行执行过程,并让线程睡眠一段时间,同样,在所有任务执行完成后,让线程池睡眠一段时间再关闭:
Java代码
/**  
* SimpleTask  
* Created on: 2008-9-29 上午10:47:07  
* Description:   
*/  
package cn.edu.ccnu.inc.ivan.thread;   
  
import edu.ccnu.inc.ivan.util.DateUtils;   
  
/**  
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>  
*/  
public class SimpleTask implements Task {   
  
    int num = 0;   
       
    public SimpleTask(int num) {   
        this.num = num;   
    }   
  
    public void execute() {   
        try {   
            System.out.println("[" + DateUtils.getTimeNow() + "] Task[" + getNum() + "]:I have worked in thread [" + Thread.currentThread().getName() + "]");   
            Thread.sleep(Math.round(Math.random() * 1000));   
        } catch (InterruptedException e) {   
            e.printStackTrace();   
        }   
    }   
  
    public int getNum() {   
        return num;   
    }   
  
    public void setNum(int num) {   
        this.num = num;   
    }   
}  

/**
* SimpleTask
* Created on: 2008-9-29 上午10:47:07
* Description:
*/
package cn.edu.ccnu.inc.ivan.thread;

import edu.ccnu.inc.ivan.util.DateUtils;

/**
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>
*/
public class SimpleTask implements Task {

 int num = 0;
 
 public SimpleTask(int num) {
  this.num = num;
 }

 public void execute() {
  try {
   System.out.println("[" + DateUtils.getTimeNow() + "] Task[" + getNum() + "]:I have worked in thread [" + Thread.currentThread().getName() + "]");
   Thread.sleep(Math.round(Math.random() * 1000));
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 public int getNum() {
  return num;
 }

 public void setNum(int num) {
  this.num = num;
 }
}


Java代码
/**  
* TestThreadPoolManager  
* Created on: 2008-9-29 上午10:45:35  
* Description:   
*/  
package cn.edu.ccnu.inc.ivan.thread;   
  
import junit.framework.TestCase;   
  
/**  
* @author <a href="mailto:huangfengjing@gmail.com">Ivan</a>  
*/  
public class TestThreadPoolManager extends TestCase {   
  
    public void testManager() throws Exception {   
        ThreadPoolManager pool = new ThreadPoolManager();   
        for(int i = 0; i < 100; i++) {   
            pool.addTask(new SimpleTask(i));   
        }   
        Thread.sleep(1000 * 10);   
        pool.shutDown();   
    }   
  
}  


W 3 C h i n a ( since 2003 ) 旗 下 站 点
苏ICP备05006046号《全国人大常委会关于维护互联网安全的决定》《计算机信息网络国际联网安全保护管理办法》
79.102ms