临沂金属网:高并发之——从源码角度分析创建线程池究竟有哪些方式

时间:7个月前   阅读:46

写给Unity『开发者『的』』iOS内存调试指南

前言

“在”Java『‘『的』高并发领’域』,线程池一直是一个绕不开『的』话题。 有些童鞋一直“在”<〖使用〗>线程池[,“〖但是〗”,对于如何创「建线程池」仅仅停留“在”<〖使用〗>Executors工具( <【『(类)』】>[)「『的』方式」,「那么」,“创「建线程池」究竟存“在”哪几种方式呢”?‘就让【我们】一起「(从)」创「建线程池」『的』源码来深入分析究’竟有哪些方式可以创「建线程池」。

<〖使用〗>Executors工具《( <【『(类)』】>[)创》「建线程池」

“在”‘创「建线程池」时’,初学者用『的』最多『的』就是Executors 这个工具( <【『(类)』】>[), 而[<〖使用〗>这个工具《( <【『(类)』】>[)创》「建线程池」时非常简单『的』,“不需要关注太多『的』线程池细节”,只需要传入必要『的』参「数」即可。Executors 工具( <【『(类)』】>[)提供了几种创「建线程池」『的』方法,如下所示。

  • Executors.newCachedThreadPool:‘创建一’个可缓存『的』线程池,「如果线程池『的』大小超过了需要」,可以灵活回收空闲线程,‘如果没有可回收线程’,则新建线程
  • Executors.newFixedThreadPool:‘创建一’个定长『的』线程池,可以控制线程『的』最大并发「数」,{超出『的』线程会“在”队列中等待}
  • Executors.newScheduledThreadPool:‘创建一’个定长『的』线程池,支持定时、周期性『的』任务(执行)
  • Executors.newSingleThreadExecutor: <‘创建一’>个单线程化『的』线程池,<〖使用〗>一个唯一『的』工作线程(执行)任务,保证所有任务按照指定顺序({先入先出或者优}先级)(执行)
  • Executors.newSingleThreadScheduledExecutor:<‘创建一’>个单线程化『的』线程池,支持定时、周期性『的』任务(执行)
  • Executors.newWorkStealingPool:‘创建一’个具有并行级别『的』work-stealing线程池

《其中》,Executors.newWorkStealingPool方法是Java 8中新增『的』创「建线程池」『的』方法,<它能够「为」线程池设置并>行级别,具有更高『的』并发度和性能。除了此方法外,其他创「建线程池」『的』方法本质上调用『的』是ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』。

例如[,【我们】可以<〖使用〗>如下代码创「建线程池」。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

<〖使用〗>ThreadPoolExecutor《( <【『(类)』】>[)创》「建线程池」

<「(从)」代码结构上看>ThreadPoolExecutor『( <【『(类)』】>[)继承自』AbstractExecutorService,也就是说,ThreadPoolExecutor( <【『(类)』】>[)具有AbstractExecutorService( <【『(类)』】>[)『的』全部功能。

既然Executors工具( <【『(类)』】>[)中创「建线程池」大部分调用『的』都是ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』,所以,【我们】也可以直接调用ThreadPoolExecutor(( <【『(类)』】>[)『『的』构造方法』来创「建线程池」), 而[不再<〖使用〗>Executors工具( <【『(类)』】>[)。〖接下来〗,『【我们】一起看下』ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』。

ThreadPoolExecutor( <【『(类)』】>[)中『的』所有构造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                 BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                    TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』『的』源代码可知,创「建线程池」最终调用『『的』构造方法』如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
              long keepAliveTime, TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

“关”于此构造方法中各参「数」『的』含义和作用,如下所示。
【【注意】】:“「为」了更加深入『的』分析”ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』,会适当调整参「数」『的』顺序进行解析,以便于大家更能深入『的』理解ThreadPoolExecutor〖构造方法中每个参「数」『的』作〗用。

<上述构造方法接收如下参「数」进行初始化>:

(1)corePoolSize:核心线程「数」量。

(2)maximumPoolSize:最大线程「数」。

(3)workQueue:阻塞队列,存储等待(执行)『的』任务,很重要,〖会对线程池运行过程产生重大影响〗。

《其中》,上述三个参「数」『的』关系如下所示:

  • 如果运行『的』线程「数」‘小于’corePoolSize,直接创建新线程处理任务,即使线程池中『的』其他线程是空闲『的』。
  • 如果运行『的』线程「数」大于等于corePoolSize,{(并且)}‘小于’maximumPoolSize,《《“此时”》》,只有当workQueue满时,才会创建新『的』线程处理任务。
  • 如果设置『的』corePoolSize「与」maximumPoolSize相同,「那么」创建『的』线程池大小是固定『的』,《《“此时”》》,「如果有新任务提交」,{(并且)}workQueue没有满时,{(就把请求放入到)}workQueue中,等待空闲『的』线程,「(从)」workQueue中取出任务进行处理。
  • 「如果运行『的』线程「数」量大于」maximumPoolSize,同时,workQueue已经满了,会通过拒绝策略参「数」rejectHandler来指定处理策略。

根据上述三个参「数」『的』配置,线程池会对任务进行如下处理方式:

当提交一个新『的』任务到线程池时,线程池会根据当前线程池中正“在”运行『的』线程「数」量来决定该任务『的』处理方式。{处理方式总共有三种}:直接切换、<〖使用〗>【无】限队列、<〖使用〗>有界队列。

  • 直接切换常用『的』队列就是SynchronousQueue。
  • <〖使用〗>【无】限队列就是<〖使用〗>基于链表『的』队列,《比如》:LinkedBlockingQueue,如果<〖使用〗>这种方式,线程池中创建『的』最大线程「数」就是corePoolSize,《《“此时”》》maximumPoolSize『不会起作用』。当线程池中所有『的』核心线程都是运行状态时,提交新任务,{就会放入等待队列中}。
  • <〖使用〗>有界队列<〖使用〗>『的』是ArrayBlockingQueue,<〖使用〗>这种方式可以将线程池『的』最大线程「数」量限制「为」maximumPoolSize,可以降低资源『的』消耗。“〖但是〗”,这种方式使得线程池对线程『的』调度更困难,<因「为」线程池和队列『的』容量都是有限『的』了>。

根据上面三个参「数」,【我们】可以简单得出如何降低系统资源消耗『的』一些措施:

Docker Compose【搭建】Redis<一主二>「(从)」三哨兵高可用集群

  • 如果想降低系统资源『的』消耗, 包括[CPU<〖使用〗>率,操作系统资源『的』消耗,上下文环境切换『的』开销等,可以设置一个较大『的』队列容量和较小『的』线程池容量。(这样),<『会降』>《低线程处理任务『的』吞吐量》。
  • 如果提交『的』任务经常发生阻塞,可以考虑调用设置最大线程「数」『的』方法,重新设置线程池最大线程「数」。如果队列『的』容量设置『的』较小,通常需要将线程池『的』容量设置『的』大一些,(这样),CPU『的』<〖使用〗>率会高些。如果线程池『的』容量设置『的』过大,并发量就会增加,“则需要考虑线程调度『的』”问题,反 而[可能『会降』低处理任务『的』吞吐量。

〖接下来〗,【我们】继续看ThreadPoolExecutor{『『的』构造方法』『的』参}「数」。

(4)keepAliveTime:‘线程没有任务’(执行)时最多保持多久时间终止
当线程池中『的』线程「数」量大于corePoolSize时,如果《《“此时”》》没有新『的』任务提交,核心线程外『的』线程不会立即销毁,需要等待,直到等待『的』时间超过了keepAliveTime“就会终止”。

(5)unit:keepAliveTime「『的』时间单位」

(6)threadFactory:线程工厂,‘用来创建线’程
默认会提供一个默认『的』工厂来创建线程,当<〖使用〗>默认『的』工厂来创建线程时,会使新创建『的』线程具有相同『的』优先级,{(并且)}是非守护『的』线程,同时也设置了线程『的』名称

(7)rejectHandler:拒绝处理任务时『的』策略

如果workQueue阻塞队列满了,{(并且)}没有空闲『的』线程池,《《“此时”》》,继续提交任务,〖需要采取一种策略来处理这个任务〗。
《线程池总共提》供了四种策略:

  • 直接抛出异常,〖这也是默认『的』策略〗。【 实现[( <【『(类)』】>[)「为」】AbortPolicy。
  • 【用调用者所】“在”『的』线程来(执行)任务。【 实现[( <【『(类)』】>[)「为」】CallerRunsPolicy。
  • 丢弃队列中最靠前『的』任务并(执行)当前任务。【 实现[( <【『(类)』】>[)「为」】DiscardOldestPolicy。
  • 直接丢弃当前任务。【 实现[( <【『(类)』】>[)「为」】DiscardPolicy。

大家可以自行调用ThreadPoolExecutor(( <【『(类)』】>[)『『的』构造方法』来创「建线程池」)。 例如[,【我们】可以<〖使用〗>如下形式创「建线程池」。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

<〖使用〗>ForkJoinPool《( <【『(类)』】>[)创》「建线程池」

“在”Java8『的』Executors工具( <【『(类)』】>[)中,“新增了如下创建线程”池「『的』方式」。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
 
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

「(从)」源代码可以可以,本质上调用『的』是ForkJoinPool( <【『(类)』】>[)『『的』构造方法』《( <【『(类)』】>[)创》「建线程池」, 而[「(从)」代码结构上来看ForkJoinPool『( <【『(类)』】>[)继承自』AbstractExecutorService抽象( <【『(类)』】>[)。〖接下来〗,【我们】看下ForkJoinPool( <【『(类)』】>[)『『的』构造方法』。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
 
public ForkJoinPool(int parallelism,
                ForkJoinWorkerThreadFactory factory,
                UncaughtExceptionHandler handler,
                boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
 
private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通过查看源代码得知,ForkJoinPool『『的』构造方法』,最终调用『的』是如下私有构造方法。

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

《其中》,各参「数」『的』含义如下所示。

  • parallelism:并发级别。
  • factory:创建线程『的』工厂( <【『(类)』】>[)对象。
  • handler:〖当线程池中『的』线程抛出未捕获『的』异常时〗,统一<〖使用〗>UncaughtExceptionHandler对象处理。
  • mode:取值「为」FIFO_QUEUE或者LIFO_QUEUE。
  • workerNamePrefix:(执行)任务『的』线程名称『的』前缀。

【当】然,私有构造方法虽然是参「数」最多『的』一个方法,“〖但是〗”其不会直接对外方法,【我们】可以<〖使用〗>如下方式创「建线程池」。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

<〖使用〗>ScheduledThreadPoolExecutor《( <【『(类)』】>[)创》「建线程池」

“在”Executors工具( <【『(类)』】>[)中存“在”如下方法《( <【『(类)』】>[)创》「建线程池」。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
 
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}
 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

「(从)」源码来看,这几个方法本质上调用『的』都是ScheduledThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』,ScheduledThreadPoolExecutor中存“在”『『的』构造方法』如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

而[<「(从)」代码结构上看>,ScheduledThreadPoolExecutor『( <【『(类)』】>[)继承自』ThreadPoolExecutor( <【『(类)』】>[),本质上还是调用ThreadPoolExecutor( <【『(类)』】>[)『『的』构造方法』,只不过《《“此时”》》传递『的』队列「为」DelayedWorkQueue。{【我们】}可以直接调用ScheduledThreadPoolExecutor(( <【『(类)』】>[)『『的』构造方法』来创「建线程池」), 例如[以如下形式创「建线程池」。

new ScheduledThreadPoolExecutor(3)

最后,需要【【注意】】『的』是:ScheduledThreadPoolExecutor主要用来创建(执行)定时任务『的』线程池。

,

sunbet

Sunbet www.tggzfm.com展望2019『年』,将用完善『的』服务体系,创新『的』技术应用,‘雄厚『的』资’金实力,『贴心『的』服务品质』,【成「为」每位申博会员】、代理『的』首选平台。

上一篇:阳江景点:【开箱】《富豪辩护人》朱智勋扮情圣 金惠秀执宋慧乔二摊演三流律师

下一篇:鹤岗二手车:【自由副刊.蝙蝠通信】 川贝母/冬眠的母亲

网友评论