<>线程池 详解 吐血整理

数哥:你们用多线程吗?

我:用啊

数哥:我们根本不需要,厉害吧

我:。。。。。。厉害

数哥:你们怎么用的啊?

我:一般都是用线程池,不会直接创建线程

数哥:线程池??? 干什么的?

我:。。。。。。 !!!!!!

<>1、使用线程池的好处

*
我们知道创建

*
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

*
可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下

<>2、线程池创建、执行

<>2.1线程池的执行流程

<>2.2线程池创建及使用

<>2.2.1task源码
class Task implements Callable { private int num1; private int num2; public
Task(){} public Task(int num1,int num2){ this.num1=num1; this.num2=num2; }
@Override public Integer call() throws Exception { int sum=0; for(int i=num1;i<=
num2;i++){ sum+=i; } return sum; } }
<>2.2.2线程池使用
/** * 计算 400 数 和 * 使用线程池 */ @Test public void pooltest() throws
ExecutionException, InterruptedException { int corePoolSize = 4; int
maximumPoolSize= 4; long keepAliveTime = 1000; TimeUnit unit = TimeUnit.
MICROSECONDS; BlockingQueue a = new SynchronousQueue(); // ArrayBlockingQueue
DelayQueue LinkedBlockingQueue PriorityBlockingQueue ThreadFactory threadFactory
= Executors.defaultThreadFactory(); RejectedExecutionHandler handler = new
ThreadPoolExecutor.AbortPolicy(); 创建线程池 ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, a,
threadFactory, handler); // 创建 任务 Task t1=new Task(1,100); Task t2=new Task(101,
200); Task t3=new Task(201,300); Task t4=new Task(301,400); //
让线程池自主选择一条线程执行线程任务 Future<Integer> f1=threadPoolExecutor.submit(t1); Future<
Integer> f2=threadPoolExecutor.submit(t2); Future<Integer> f3=threadPoolExecutor
.submit(t3); Future<Integer> f4=threadPoolExecutor.submit(t4);
threadPoolExecutor.shutdown(); int sum1=f1.get(); int sum2=f2.get(); int sum3=f3
.get(); int sum4=f4.get(); System.out.println(sum1+sum2+sum3+sum4); }
<>3、线程池各参数 详解
/** * Creates a new {@code ThreadPoolExecutor} with the given initial *
parameters. * * @param corePoolSize the number of threads to keep in the pool,
even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param
corePoolSize 是一直存活在线程池中的线程数,即使这些线程被闲置,除非设置参数 * {@code allowCoreThreadTimeOut} *
* @param maximumPoolSize the maximum number of threads to allow in the * pool *
@param maximumPoolSize 在线程池中能存活的 最大线程数 * * @param keepAliveTime when the number
of threads is greater than * the core, this is the maximum time that excess
idle threads * will wait for new tasks before terminating. * @param
keepAliveTime 如果线程池中的 线程数比 核心线程数(corePoolSize)大 * 此参数是 闲置线程 等待新任务执行的 最长 时间 * *
@param unit the time unit for the {@code keepAliveTime} argument * @param unit
{@code keepAliveTime} 参数的 时间单位 * * @param workQueue the queue to use for
holding tasks before they are * executed. This queue will hold only the {@code
Runnable} * tasks submitted by the {@code execute} method. * @param workQueue
此队列用于存储未执行的任务(任务实现了Runnable接口,并且通过excute方法提交) * * @param threadFactory the
factory to use when the executor * creates a new thread * @param threadFactory
线程池用来创建线程的 工厂 * * @param handler the handler to use when execution is blocked *
because the thread bounds and queue capacities are reached * @param handler 处理
因为达到了线程边界和队列容量而导致 执行阻塞的 策略 * * @throws IllegalArgumentException if one of the
following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime <
0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize <
corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code
threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int
corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0
|| maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new
IllegalArgumentException(); if (workQueue == null || threadFactory == null ||
handler== null) throw new NullPointerException(); this.corePoolSize =
corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue
; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory =
threadFactory; this.handler = handler; }
<>3.1阻塞队列:BlockingQueue

ArrayBlockingQueue`, `DelayQueue`, `LinkedBlockingDeque`,
`LinkedBlockingQueue`, `LinkedTransferQueue`, `PriorityBlockingQueue`,
`SynchronousQueue
3.1.1无界队列

队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue
中,导致内存飙升服务器挂掉。

3.1.2有界队列

常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

3.1.3同步移交队列

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

<>3.2饱和策略

<>3.2.1ThreadPoolExecutor.AbortPolicy
<>描述
​ 直接抛异常,不处理
<>源码 /** * A handler for rejected tasks that throws a * {@link
RejectedExecutionException}. * * This is the default handler for {@link
ThreadPoolExecutor} and * {@link ScheduledThreadPoolExecutor}. */ public static
class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code
AbortPolicy}. */ public AbortPolicy() { } /** * Always throws
RejectedExecutionException. * * @param r the runnable task requested to be
executed * @param e the executor attempting to execute this task * @throws
RejectedExecutionException always */ public void rejectedExecution(Runnable r,
ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " +
r.toString() + " rejected from " + e.toString()); } }
<>3.2.2ThreadPoolExecutor.CallerRunsPolicy
<>描述
​ 它直接在{@code execute}方法的调用线程中运行被拒绝的任务,除非执行程序已经关闭,在这种情况下任务将被丢弃
<>源码 /** * A handler for rejected tasks that runs the rejected task *
directly in the calling thread of the {@code execute} method, * unless the
executor has been shut down, in which case the task * is discarded. */ public
static class CallerRunsPolicy implements RejectedExecutionHandler { /** *
Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** *
Executes task r in the caller's thread, unless the executor * has been shut
down, in which case the task is discarded. * * @param r the runnable task
requested to be executed * @param e the executor attempting to execute this
task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if
(!e.isShutdown()) { r.run(); } } }
<>3.2.3ThreadPoolExecutor.DiscardOldestPolicy
<>描述
​ 丢弃队列最前面的任务,然后重新尝试执行任务(不适合工作队列为优先队列场景)
<>源码 /** * A handler for rejected tasks that discards the oldest unhandled *
request and then retries {@code execute}, unless the executor * is shut down,
in which case the task is discarded. */ public static class DiscardOldestPolicy
implements RejectedExecutionHandler { /** * Creates a {@code
DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { }
/** * Obtains and ignores the next task that the executor * would otherwise
execute, if one is immediately available, * and then retries execution of task
r, unless the executor * is shut down, in which case task r is instead
discarded. * * @param r the runnable task requested to be executed * @param e
the executor attempting to execute this task */ public void rejectedExecution(
Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll();
e.execute(r); } } }
<>3.2.4ThreadPoolExecutor.DiscardPolicy
<>描述
​ 新提交的任务被抛弃,但是不抛出异常
<>源码 /** * A handler for rejected tasks that silently discards the * rejected
task. */ public static class DiscardPolicy implements RejectedExecutionHandler {
/** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does
nothing, which has the effect of discarding task r. * * @param r the runnable
task requested to be executed * @param e the executor attempting to execute
this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
<>4、JDK提供线程池

<>4.1Executors.newScheduledThreadPool
/** * Creates a thread pool that can schedule commands to run after a * given
delay, or to execute periodically. * @param corePoolSize the number of threads
to keep in the pool, * even if they are idle * @param threadFactory the factory
to use when the executor * creates a new thread * @return the newly created
scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize
< 0} * @throws NullPointerException if threadFactory is null */ public static
ScheduledExecutorServicenewScheduledThreadPool( int corePoolSize, ThreadFactory
threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize,
threadFactory); } --------------------------------------------------------------
------------------------------------ /** * Creates a new {@code
ScheduledThreadPoolExecutor} with the * given initial parameters. * * @param
corePoolSize the number of threads to keep in the pool, even * if they are
idle, unless {@code allowCoreThreadTimeOut} is set * @param threadFactory the
factory to use when the executor * creates a new thread * @throws
IllegalArgumentException if {@code corePoolSize < 0} * @throws
NullPointerException if {@code threadFactory} is null */ public
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory); }
<>4.2Executors.newCachedThreadPool
/** * Creates a thread pool that creates new threads as needed, but * will
reuse previously constructed threads when they are * available, and uses the
provided * ThreadFactory to create new threads when needed. * * @param
threadFactory the factory to use when creating new threads * @return the newly
created thread pool * @throws NullPointerException if threadFactory is null */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(), threadFactory); }
<>4.3Executors.newFixedThreadPool
/** * Creates a thread pool that reuses a fixed number of threads * operating
off a shared unbounded queue, using the provided * ThreadFactory to create new
threads when needed. At any point, * at most {@code nThreads} threads will be
active processing * tasks. If additional tasks are submitted when all threads
are * active, they will wait in the queue until a thread is * available. If any
thread terminates due to a failure during * execution prior to shutdown, a new
one will take its place if * needed to execute subsequent tasks. The threads in
the pool will * exist until it is explicitly {@link ExecutorService#shutdown *
shutdown}. * * @param nThreads the number of threads in the pool * @param
threadFactory the factory to use when creating new threads * @return the newly
created thread pool * @throws NullPointerException if threadFactory is null *
@throws IllegalArgumentException if {@code nThreads <= 0} */ public static
ExecutorServicenewFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(), threadFactory); }
<>4.4Executors.newSingleThreadExecutor
/** * Creates an Executor that uses a single worker thread operating * off an
unbounded queue, and uses the provided ThreadFactory to * create a new thread
when needed. Unlike the otherwise * equivalent {@code newFixedThreadPool(1,
threadFactory)} the * returned executor is guaranteed not to be reconfigurable
to use * additional threads. * * @param threadFactory the factory to use when
creating new threads * @return the newly created single-threaded Executor *
@throws NullPointerException if threadFactory is null */ public static
ExecutorServicenewSingleThreadExecutor(ThreadFactory threadFactory) { return new
FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.
MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
<>4.5Executors.newSingleThreadScheduledExecutor
/** * Creates a single-threaded executor that can schedule commands * to run
after a given delay, or to execute periodically. (Note * however that if this
single thread terminates due to a failure * during execution prior to shutdown,
a new one will take its * place if needed to execute subsequent tasks.) Tasks
are * guaranteed to execute sequentially, and no more than one task * will be
active at any given time. Unlike the otherwise * equivalent {@code
newScheduledThreadPool(1, threadFactory)} * the returned executor is guaranteed
not to be reconfigurable to * use additional threads. * * @param threadFactory
the factory to use when creating new threads * @return the newly created
scheduled executor * @throws NullPointerException if threadFactory is null */
public static ScheduledExecutorService newSingleThreadScheduledExecutor(
ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new
ScheduledThreadPoolExecutor(1, threadFactory)); }
<>4.6Executors.newWorkStealingPool
/** * Creates a thread pool that maintains enough threads to support * the
given parallelism level, and may use multiple queues to * reduce contention.
The parallelism level corresponds to the * maximum number of threads actively
engaged in, or available to * engage in, task processing. The actual number of
threads may * grow and shrink dynamically. A work-stealing pool makes no *
guarantees about the order in which submitted tasks are * executed. * * @param
parallelism the targeted parallelism level * @return the newly created thread
pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since
1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool (parallelism, ForkJoinPool.
defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a
work-stealing thread pool using the number of * {@linkplain
Runtime#availableProcessors available processors} * as its target parallelism
level. * * @return the newly created thread pool * @see
#newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService
newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().
availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null,
true); }
<>Alibaba 编码规约

一 -(六)- 4

【强制】 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这
样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。

如有问题,欢迎大家留言指正,会虚心接纳,补充学习
请大家,点赞 收藏,谢谢

技术
©2019-2020 Toolsou All rights reserved,
王者荣耀背景故事整合痴心妄想随机森林篇 R语言实现用C++跟你聊聊“原型模式” (复制/拷贝构造函数)再见!经典版Edge!PYTHON入门期末复习汇总2021年1月程序员工资统计,平均14915元详解ubuntu14.04如何设置静态IP胡润:中国600万资产“富裕家庭”数量首次突破500万户苹果与日产对话暂停,Apple Car进展如何?