`

Java 线程池的原理与实现

    博客分类:
  • Java
阅读更多

 



建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用法。
关于我对同步的认识,要缘于大三年的一本书,书名好像是 Java 实战,这本书写得实在太妙了,真正的从理论到实践,从截图分析到.class字节码分析。哇,我想市场上很难买到这么精致的书了。作为一个Java爱好者,我觉得绝对值得一读。
我对此书印象最深之一的就是:equal()方法,由浅入深,经典!
还有就是同步了,其中提到了我的几个编程误区,以前如何使用同步提高性能等等,通过学习,使我对同步的认识进一步加深。


简单介绍

    创建线程有两种方式:继承Thread或实现Runnable。Thread实现了Runnable接口,提供了一个空的run()方法,所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。
    一个线程创建后就存在,调用start()方法就开始运行(执行run()方法),调用wait进入等待或调用sleep进入休眠期,顺利运行完毕或休眠被中断或运行过程中出现异常而退出。

wait和sleep比较:
      sleep方法有:sleep(long millis),sleep(long millis, long nanos),调用sleep方法后,当前线程进入休眠期,暂停执行,但该线程继续拥有监视资源的所有权。到达休眠时间后线程将继续执行,直到完成。若在休眠期另一线程中断该线程,则该线程退出。
      wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),调用wait方法后,该线程放弃监视资源的所有权进入等待状态;
      wait():等待有其它的线程调用notify()或notifyAll()进入调度状态,与其它线程共同争夺监视。wait()相当于wait(0),wait(0, 0)。
      wait(long timeout):当其它线程调用notify()或notifyAll(),或时间到达timeout亳秒,或有其它某线程中断该线程,则该线程进入调度状态。
      wait(long timeout, long nanos):相当于wait(1000000*timeout + nanos),只不过时间单位为纳秒。



线程池:
    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
    
    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
    
    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                一个线程池包括以下四个基本组成部分:
                1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                
    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

    线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

    假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。


/** 线程池类,工作线程作为其内部类 **/

package org.ymcn.util;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

/**
* 线程池
* 创建线程池,销毁线程池,添加新任务
* 
* @author obullxl
*/
public final class ThreadPool {
    private static Logger logger = Logger.getLogger(ThreadPool.class);
    private static Logger taskLogger = Logger.getLogger("TaskLogger");

    private static boolean debug = taskLogger.isDebugEnabled();
    // private static boolean debug = taskLogger.isInfoEnabled();
    /* 单例 */
    private static ThreadPool instance = ThreadPool.getInstance();

    public static final int SYSTEM_BUSY_TASK_COUNT = 150;
    /* 默认池中线程数 */
    public static int worker_num = 5;
    /* 已经处理的任务数 */
    private static int taskCounter = 0;

    public static boolean systemIsBusy = false;

    private static List<Task> taskQueue = Collections
            .synchronizedList(new LinkedList<Task>());
    /* 池中的所有线程 */
    public PoolWorker[] workers;

    private ThreadPool() {
        workers = new PoolWorker[5];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }

    private ThreadPool(int pool_worker_num) {
        worker_num = pool_worker_num;
        workers = new PoolWorker[worker_num];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }

    public static synchronized ThreadPool getInstance() {
        if (instance == null)
            return new ThreadPool();
        return instance;
    }
    /**
    * 增加新的任务
    * 每增加一个新任务,都要唤醒任务队列
    * @param newTask
    */
    public void addTask(Task newTask) {
        synchronized (taskQueue) {
            newTask.setTaskId(++taskCounter);
            newTask.setSubmitTime(new Date());
            taskQueue.add(newTask);
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                + newTask.info());
    }
    /**
    * 批量增加新任务
    * @param taskes
    */
    public void batchAddTask(Task[] taskes) {
        if (taskes == null || taskes.length == 0) {
            return;
        }
        synchronized (taskQueue) {
            for (int i = 0; i < taskes.length; i++) {
                if (taskes[i] == null) {
                    continue;
                }
                taskes[i].setTaskId(++taskCounter);
                taskes[i].setSubmitTime(new Date());
                taskQueue.add(taskes[i]);
            }
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        for (int i = 0; i < taskes.length; i++) {
            if (taskes[i] == null) {
                continue;
            }
            logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
                    + taskes[i].info());
        }
    }
    /**
    * 线程池信息
    * @return
    */
    public String getInfo() {
        StringBuffer sb = new StringBuffer();
        sb.append("\nTask Queue Size:" + taskQueue.size());
        for (int i = 0; i < workers.length; i++) {
            sb.append("\nWorker " + i + " is "
                    + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
        }
        return sb.toString();
    }
    /**
    * 销毁线程池
    */
    public synchronized void destroy() {
        for (int i = 0; i < worker_num; i++) {
            workers[i].stopWorker();
            workers[i] = null;
        }
        taskQueue.clear();
    }

    /**
    * 池中工作线程
    * 
    * @author obullxl
    */
    private class PoolWorker extends Thread {
        private int index = -1;
        /* 该工作线程是否有效 */
        private boolean isRunning = true;
        /* 该工作线程是否可以执行新任务 */
        private boolean isWaiting = true;

        public PoolWorker(int index) {
            this.index = index;
            start();
        }

        public void stopWorker() {
            this.isRunning = false;
        }

        public boolean isWaiting() {
            return this.isWaiting;
        }
        /**
        * 循环执行任务
        * 这也许是线程池的关键所在
        */
        public void run() {
            while (isRunning) {
                Task r = null;
                synchronized (taskQueue) {
                    while (taskQueue.isEmpty()) {
                        try {
                            /* 任务队列为空,则等待有新任务加入从而被唤醒 */
                            taskQueue.wait(20);
                        } catch (InterruptedException ie) {
                            logger.error(ie);
                        }
                    }
                    /* 取出任务执行 */
                    r = (Task) taskQueue.remove(0);
                }
                if (r != null) {
                    isWaiting = false;
                    try {
                        if (debug) {
                            r.setBeginExceuteTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> start execute Task<" + r.getTaskId() + ">");
                            if (r.getBeginExceuteTime().getTime()
                                    - r.getSubmitTime().getTime() > 1000)
                                taskLogger.debug("longer waiting time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                        /* 该任务是否需要立即执行 */
                        if (r.needExecuteImmediate()) {
                            new Thread(r).start();
                        } else {
                            r.run();
                        }
                        if (debug) {
                            r.setFinishTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> finish task<" + r.getTaskId() + ">");
                            if (r.getFinishTime().getTime()
                                    - r.getBeginExceuteTime().getTime() > 1000)
                                taskLogger.debug("longer execution time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        logger.error(e);
                    }
                    isWaiting = true;
                    r = null;
                }
            }
        }
    }
}

/** 任务接口类 **/

package org.ymcn.util;

import java.util.Date;

/**
* 所有任务接口
* 其他任务必须继承访类
* 
* @author obullxl
*/
public abstract class Task implements Runnable {
    // private static Logger logger = Logger.getLogger(Task.class);
    /* 产生时间 */
    private Date generateTime = null;
    /* 提交执行时间 */
    private Date submitTime = null;
    /* 开始执行时间 */
    private Date beginExceuteTime = null;
    /* 执行完成时间 */
    private Date finishTime = null;

    private long taskId;

    public Task() {
        this.generateTime = new Date();
    }

    /**
    * 任务执行入口
    */
    public void run() {
        /**
        * 相关执行代码
        * 
        * beginTransaction();
        * 
        * 执行过程中可能产生新的任务 subtask = taskCore();
        * 
        * commitTransaction();
        * 
        * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
        */
    }

    /**
    * 所有任务的核心 所以特别的业务逻辑执行之处
    * 
    * @throws Exception
    */
    public abstract Task[] taskCore() throws Exception;

    /**
    * 是否用到数据库
    * 
    * @return
    */
    protected abstract boolean useDb();

    /**
    * 是否需要立即执行
    * 
    * @return
    */
    protected abstract boolean needExecuteImmediate();

    /**
    * 任务信息
    * 
    * @return String
    */
    public abstract String info();

    public Date getGenerateTime() {
        return generateTime;
    }

    public Date getBeginExceuteTime() {
        return beginExceuteTime;
    }

    public void setBeginExceuteTime(Date beginExceuteTime) {
        this.beginExceuteTime = beginExceuteTime;
    }

    public Date getFinishTime() {
        return finishTime;
    }

    public void setFinishTime(Date finishTime) {
        this.finishTime = finishTime;
    }

    public Date getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(Date submitTime) {
        this.submitTime = submitTime;
    }

    public long getTaskId() {
        return taskId;
    }

    public void setTaskId(long taskId) {
        this.taskId = taskId;
    }

}
分享到:
评论

相关推荐

    Java线程池的原理与实现

    Java线程池的原理与实现 Java线程池的原理与实现

    JAVA线程池的原理与实现.pdf

    JAVA线程池的原理与实现.pdf

    java线程池

    介绍了java线程池的基本实现原理并由详细的实例

    Java线程池FutureTask实现原理详解

    主要介绍了Java线程池FutureTask实现原理详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下

    基于Java线程池技术实现Knock Knock游戏项目.zip

    Java语言游戏项目实战资源包 内容概览: 这次分享为你带来了丰富的Java语言游戏项目实战资源,让你在实践中深入...持续学习与探索:Java语言和游戏开发技术都在不断更新,建议你在实践中持续学习新的技术和工具,不

    Java concurrency线程池之线程池原理(四)_动力节点Java学院整理

    主要为大家详细介绍了Java concurrency线程池之线程池原理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

    高并发技术之Java线程池源码解析1

    2.线程池的运作原理 3.调度线程池的运作原理 4.调度线程池怎么实现FixRate,FixDelay 5.怎么取消的

    Java线程池浅析1

    }关于FutureTask这个类的实现,我在前面的JAVA LOCK代码浅析有讲过其实现原理,主要的思想就是关注任务完成与未完成的状态,任务提交线程get()结

    java并发包&线程池原理分析&锁的深度化

    2.Vector与ArrayList一样,也是通过数组实现的,不同的是它支持线程的同步,即某一时刻只有一个线程能够写Vector,避免多线程同时写而引起的不一致性,但实现同步需要很高的花费,因此,访问它比访问ArrayList慢 ...

    深入理解Java编程线程池的实现原理

    主要介绍了深入理解Java编程线程池的实现原理,涉及ThreadPoolExecutor类,线程池实现原理及示例等相关内容,具有一定参考价值,需要的朋友可以了解下。

    java基于线程池和反射机制实现定时任务完整实例

    主要介绍了java基于线程池和反射机制实现定时任务的方法,以完整实例形式较为详细的分析了Java定时任务的功能原理与实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下

    Java并发编程原理与实战

    线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...

    Java中的线程与线程池.pptx

    本文档对Java中的线程和线程池进行了简单介绍。首先,阐述了为什么需要线程、...然后,介绍了为什么需要线程池,JDK自带的线程池实现方式ThreadPoolExecutor的使用及其原理,最后强调ThreadPoolExecutor应用的注意点。

    龙果 java并发编程原理实战

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    JAVA并发编程实践-线程池-学习笔记

    本节主要关注在配置和调整线程池时用的高级选项,讲述了任务执行框架的过程中需 ... 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获 取下一个 Runnable 之前可以是阻塞的。

    java多线程、并发及线程池介绍收藏的几篇文档

    里面是自己收藏的几篇关于线程的文档资料,大家可以看看哈

    Java并发编程学习笔记

    3、ThreadLocal 的底层实现与使用 4、ReentrantLock底层实现和如何使用 5、Condition源码分析 6、ReentrantReadWriteLock底层实现原理 7、并发工具类CountDownLatch 、CyclicBarrier和Semaphore底层实现原理 8、...

    3.1.8.线程池的实现原理分析1

    1. 降低创建线程和销毁线程的性能开销 2. 提高响应速度,当有新任务需要执行是不需要等待线程创建就可以立马执行 3. 合理的设置线程池大小可以避免因为线程数超

    深入理解Java之线程池

    在前面的文章中,我们...我们来详细讲解一下Java的线程池,首先我们从核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,后讨论了一下如何合理配置线程池的大小。  以下是

Global site tag (gtag.js) - Google Analytics