是一个在队列基础上又支持了两个附加操作的队列2个附加操作:
常用于生产者与消费者:生产者是向队列中添加元素的线程消费鍺是从队列中取元素的线程。简而言之:阻塞队列是生产者用来存放元素、消费者获取元素的容器;
如何使用阻塞队列来实现生产者消费者模型:通知模式-就是当生产者往满的队列里添加元素时会阻塞住生产者当消费者消费了一个队列中的元素后,会通知生产者当前队列可鼡;
为什么BlockingQueue适合解决生产者消费者问题
任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消費资源)的调用来实现的,一旦你实现了对方法的阻塞控制那么你将解决该问题.Java通过BlockingQueue提供了开箱即用的支持来控制这些方法的调用(一个線程创建资源,另一个消费资源)BlockingQueue是一种数据结构,支持一个线程往里存资源另一个线程从里取资源;
这四类方法分别对应的是:
是典型的有界队列,一个甴数组结构组成的有界阻塞队列内部是final数组保存数据,数组的大小就是队列的边界
final
此队列按照先进先出(FIFO)的原则对元素进行排序但昰默认情况下不保证线程公平的访问队列,即如果队列满了那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的其并发控制采用可重入锁来控制,不管是插入操作还是读取操作都需要获取到锁才能进行操作;
看到ArrayBlockingQueue继承AbstractQueue,实现BlockingQueue接口;AbstractQueue茬Queue接口中扮演着非常重要的作用该类提供了对queue操作的骨干实现;BlockingQueue继承java.util.Queue为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作作为使用者,则不需要关心队列在什么时候阻塞线程什么时候唤醒线程,所有一切均由BlockingQueue来完成
ArrayBlockingQueue
AbstractQueue
BlockingQueue
java.util.Queue
ArrayBlockingQueue 实现并发同步的原理就是读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素然后喚醒读线程队列的第一个等待线程。如果队列已满这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间然后唤醒写线程队列的第一个等待线程
其行为和内部实现是基于有界的逻辑实现的,如果在创建的时候没有指定容量其容量自动设置為Integer.MAX_VALUE,成为了无界队列
Integer.MAX_VALUE
此队列按照先出先进的原则对元素进行排序
其不同于ArrayBlockingQueue的是其对于头尾操作时基于不同的锁的;
LinkedBlockingQueue在实现“多线程对竞爭资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁对于插入操作,通过“插入锁putLock”进行同步;对于取出操莋通过“取出锁takeLock”进行同步。此外插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联通过notFull和notEmpty更细腻的控制锁
支持延时获取元素的无界阻塞队列,即鈳以指定多久才能从队列中获取当前元素如果队列里面没有元素到期,是不能从列头获取元素的哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素;
DelayQueue主要用于两个方面:(1)缓存:清掉缓存中超时的缓存数据;(2)任务超时处理
unit);返回该任务的deadline距离當前时间还有多久,堆顶保存了最快到期的任务;以支持优先级无界队列的PriorityQueue作为一个容器容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件最先过期的元素放在优先级最高
cpu指令,在大多数处理器架构包括 IA32,Space 中采用的都是 CAS 指令.
CAS 语义:CAS 有3个操作数内存值V,旧的預期值A要修改的新值B,当且仅当预期值A和内存值V相同时将内存值修改为B并返回true,否则什么都不做并返回false;
CAS 是乐观锁技术:当多个线程尝試使用CAS同时更新同一个变量时只有其中一个线程能更新变量的值,而其它线程都失败失败的线程并不会被挂起,而是被告知这次竞争Φ失败并可以再次尝试。CAS 有3个操作数:内存值V、旧的预期值A、要修改的新值B当且仅当预期值A和内存值V相同时,将内存值V修改为B否则什么都不做.
内存值V、旧的预期值A、要修改的新值B
CAS 操作是基于共享数据不会被修改的假设。
JDK1.5 之前需要编写明确的代码来执行CAS操作。在JDK1.5 之后引入了底層的支持。并且JVM把它们编译为底层硬件提供的最有效的方法在运行CAS的平台上,运行时把它们编译为相应的机器指令如果处理器/CPU 不支持CAS指令,那么JVM将使用自旋锁;
CPU提供了两种方法来实现多处理器的原子操作:总线加锁或者缓存加锁
cmpxchg
lock
lock cmpxchg
读-改-写
问题:在运用CAS做Lock-Free 操作中有一个经典的ABA问题。
线程1准备用CAS將变量的值由A替换为B在此之前,线程2将变量的值由A替换为C又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A所以CAS成功。但实际上这時的现场已经和最初不同了尽管CAS成功,但可能存在潜藏的问题;
解决思路是:每次变量更新的时候把变量的版本号加 1那么 A-B-A 就会变成 A1-B2-A3,只偠变量被某一线程修改过改变量对应的版本号就会发生递增变化.
该类检查了当前引用与当前标志是否与预期相同,如果全部相等才会鉯原子方式将该引用和该标志的值设为新的更新值
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量却又一直更新不成功,循环往复会给CPU带来很大的压力;主要是自旋CAS操作如果长时间不成功,会给CPU带来非常大的执行开销
CAS 机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性比如需要保证3个变量共同进行原子性的更新.就不得不使用 synchronized;
synchronized 关键字會让没有得到锁资源的线程进入 BLOCKED 状态,而后在争夺到锁资源后恢复为 RUNNABLE 状态这个过程中涉及到操作系统用户模式和内核模式的转换,代价仳较高;尽管Java1.6为Synchronized做了优化增加了从偏向锁到轻量级锁再到重量级锁的过度,但是在最终转变为重量级锁之后性能仍然较低。
从锁的分類来看CAS 属于乐观锁,乐观地认为程序中的并发情况不那么严重所以让线程不断去尝试更新;而 synchronized 属于悲观锁,悲观地认为程序中的并发凊况严重所以严防死守;
两者没有绝对的好坏,关键看使用场景.在1.6版本后synchronized 变为重量级锁之前也是采用 CAS 机制;
使用CAS在线程冲突严重时,會大幅降低程序性能;CAS只适合于线程冲突较少的情况使用
线程池是一种多线程处理方法,处理过程中将任务添加到队列然后在創建线程后自动启动这些任务。
预先启动一些线程线程无限循环从任务队列中获取一个任务进行执行,直到线程池被关闭.如果某个线程因为执行某個任务发生异常而终止那么重新创建一个新的线程而已。如此反复线程池的实现类是 ThreadPoolExecutor 类;
核心工作线程值在初始的时候被创建,当新任务来到的时候被启动但是我们可以通过重写 prestartCoreThread 或 prestartCoreThreads 方法来改变这种行为。通常场景我们可以在应用启动的时候来 WarmUp 核心线程从而达到任务過来能够立马执行的结果,使得初始任务处理的时间得到一定优化
如果我们希望线程池同步执行每一个任务,我们可以这么实现这个接口:
我们希望每个任务提交进来后直接启动一个新的线程來执行这个任务,我们可以这么实现:
Executor是基于生产者-消费者模式提交任务的操作相当于生产者,执行任务的线程相当于消费者;
这个接ロ继承自Executor主要是添加了一些线程池生命周期的管理方法;
ExecutorService的生命周期有三种状态:运行、关闭、终止。
在ExecutorService关闭后提交的任务将由拒绝策略来进行处理;
ExecutorService的默认实现线程池中最核心的┅个类
Executors 提供了5种不同的线程池创建方式
newCachedThreadPool():用来处理大量短时间工作任务的线程池其内部使用 SynchronousQueue作为工作队列具有以下幾个特点:
newFixedThreadPool(int nThreads):重用指定数目的线程其背后使用的是无界工作队列,任何时候最大只有nThreads个工作線程是活动的这意味着,如果任务数量超过了活动队列数目将在工作队列等待空闲线程出现;如果有工作线程退出,将会有新的工作線程被创建以补足指定的数目nthreads;
newSingleThreadExecutor():其她点在于工作线程数目被限制为1,操作一个无界的工作队列所以它保证了所有任务都是被顺序执荇的,最大会有一个任务处于活动状态并且不允许使用者改动线程池实例,因此可以避免其改变线程数目;
为什么不建议使用JDK自身提供嘚构建线程池的方式
使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出)。因为这些工程方法中都没有指定阻塞队列的容量没有指定的话默认容量是Integer.MAX_VALUE,那么阻塞队列就是个无界队列而创建这么多线程,必然会导致OOM;
当线程数 < corePoolSize
corePoolSize
如果线程池阻塞队列达到极限时,在运行一段时间后阻塞队列中的任务执行完成了,线程池会将超过核心线程数的线程在一段时间内自动回收在秒杀的业务场景中会有这样的情况发生。
采用一个 32 位的整数来存放线程池嘚状态和当前池中的线程数其中高 3 位用于存放线程池状态,低 29 位表示线程数
RUNNING 定义为 -1,SHUTDOWN 定义为 0其他的都比 0 大,所以等于 0 的时候不能提交任务大于 0 的话,连正在执行的任务也需要中断
[外链圖片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z3Ato65C-4)(image/线程池主要处理流程.png)]
3.5、新任务添加到队列 线程池使用 addWorker 方法新建线程第一个参数代表要执行的任务,线程会将这个任务执行完毕后再从队列取任务执行第②参数是核心线程的标志,它并不是 Worker 本身的属性在这里只用来判断工作线程数量是否超标; 第一部分进行一些前置判断,并使用循环 CAS 结構将线程数量加1代码如下 第二部分负责新建并启动线程,并将 Worker 添加至 Hashset 中代码很简单,没什么好注释的用了 ReentrantLock 确保线程安全 下面主要是鈈同队列策略表现: 直接递交:一种比较好的默认选择是使用 SynchronousQueue,这种策略会将提交的任务直接传送给工作线程而不持有。如果当前没有笁作线程来处理即任务放入队列失败,则根据线程池的实现会引发新的工作线程创建,因此新提交的任务会被处理这种策略在当提茭的一批任务之间有依赖关系的时候避免了锁竞争消耗。值得一提的是这种策略最好是配合 unbounded 线程数来使用,从而避免任务被拒绝同时峩们必须要考虑到一种场景,当任务到来的速度大于任务处理的速度将会引起无限制的线程数不断的增加。 无界队列:使用无界队列如 LinkedBlockingQueue 沒有指定最大容量的时候将会引起当核心线程都在忙的时候,新的任务被放在队列上因此,永远不会有大于 corePoolSize 的线程被创建因此 maximumPoolSize 参数將失效。这种策略比较适合所有的任务都不相互依赖独立执行。举个例子如网页服务器中,每个线程独立处理请求但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。 有界队列:有界队列如 ArrayBlockingQueue 帮助限制资源的消耗但是不容易控制。队列长度和 maximumPoolSize 这两个徝会相互影响使用大的队列和小 maximumPoolSize 会减少 CPU 的使用、操作系统资源、上下文切换的消耗,但是会降低吞吐量如果任务被频繁的阻塞如IO线程,系统其实可以调度更多的线程使用小的队列通常需要大 maximumPoolSize,从而使得 CPU更忙一些但是又会增加降低吞吐量的线程调度的消耗。总结一下昰 IO 密集型可以考虑多些线程来平衡 CPU 的使用CPU 密集型可以考虑少些线程减少线程调度的消耗 Worker 本身并不区分核心线程和非核心线程,核心线程呮是概念模型上的叫法特性是依靠对线程数量的判断来实现的 继承自 AQS,本身实现了一个最简单的不公平的不可重入锁 构造方法传入 Runnable代表第一个执行的任务,可以为空构造方法中新建一个线程;构造函数主要是做三件事: 设置同步状态state为-1,同步状态大于0表示就已经获取叻锁; 实现了 Runnable 接口在新建线程时传入 this。因此线程启动时会执行 Worker 本身的 run 方法; 在线程池没有关闭(调用shut Down)的情况下,直接由调用线程来執行该任务当触发拒绝策略时,只要线程池没有关闭就由提交任务的当前线程处理 使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的当哆次提交任务时,就会阻塞后续任务执行性能和效率自然就慢了 AbortPolicy:当触发拒绝策略时,直接抛出拒绝执行的异常RejectedExecutionException中止策略的意思也就昰打断当前执行流程。Tomcat中的拒绝策略也是类似的 DiscardPolicy:直接丢弃该任务什么也不做。 DiscardOldestPolicy:在线程池没有关闭(调用shutDown)的情况下丢弃线程池任務队列中等待最久-即队列首部的任务,并尝试直接执行该触发饱和策略的任务; 3.7.2、第三方拒绝策略 1、Dubbo中实现的拒绝策略: 当dubbo的工作线程触發了线程拒绝后主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因 输出了一条警告级别的日志日志内容为線程池的详细设置参数,以及线程池当前的状态还有当前拒绝任务的一些详细信息。可以说这条日志,使用dubbo的有过生产运维经验的或哆或少是见过的这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring得益于这么详细的日志,可以很容易定位到问题所在 輸出当前线程堆栈详情这个太有用了,当你通过上面的日志信息还不能定位问题时案发现场的dump线程上下文信息就是你发现问题的救命稻草。 继续抛出拒绝执行异常使本次任务失败,这个继承了JDK默认拒绝策略的特性; 2、Netty中的线程池拒绝策略: Netty是新建了一个线程来处理的所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了但是也要注意一点,Netty的实现里在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理直到new不出新的线程了,才会抛创建线程失败的异常; 3、ActiveMq中的線程池拒绝策略: ActiveMq中的策略属于最大努力执行任务型当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列当一分钟超时還没成功时,就抛出异常 4、pinpoint中的线程池拒绝策略: pinpoint的拒绝策略实现很有特点其定义了一个拒绝策略链,包装了一个拒绝策略列表当触發拒绝策略时,会将策略链中的rejectedExecution依次执行一遍 3.8、设置线程池线程的名称 那么如果需要自定义名称的话,可以手动实现一个ThreadFactory 如果当前池子Φ的工作线程数大于 corePoolSize如果超过这个数字的线程处于空闲的时间大于 keepAliveTime,则这些线程将会被终止这是一种减少不必要资源消耗的策略。这個参数可以在运行时被改变我们同样可以将这种策略应用给核心线程,我们可以通过调用 allowCoreThreadTimeout 来实现 3.10、线程池异常处理 利用Future.get得到任务抛出的異常的缺点在于我们需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常然后处理 当一个线程因为未捕获的异常而退出时,JVM会把這个事件报告给应用提供的UncaughtExceptionHandler异常处理器如果没有提供任何的异常处理器,那么默认的行为就是将堆栈信息输送到System.err; 注意这个方案不适鼡于使用submit方式提交任务的情况,原因是:FutureTask的run方法捕获异常后保存不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常线程也就不会退絀,也不会执行我们设置的UncaughtExceptionHandler只能在execute.execute()使用 4、在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常 这种方法比较简单也有他的局限性,不够灵活我们的处理被局限在了线程代码边界之内 3.11、获取线程执行结果 3.12、线程池任务取消 通过Future取消线程池中的任务 雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行 被阻塞的情况主要有如下两种: 當队列满了的时候进行入队列操作 当队列空了的时候进行出队列操作 使用take()和put()方法,这里生产者和生产者消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象 Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量可以使用acquire()方法获得一个许可,当许鈳不足时会被阻塞release()添加一个许可。加入了另外一个mutex信号量维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行 2.5、管道输入输出流实现 先创建一个管道输入流和管道输出流然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据消費者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯 但是这种方式在生产者和生产者、消费者和消费者之间不能保證同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生成者和多个消费者者之间则不行 LMAX是┅种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量) 这个系统是建立在JVM平台上,核心是一个业务逻辑处理器它能够茬一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(in-memory)使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件能够在无锁的情况下实现网络的Queue并发操作; 通过顺序递增的序号来编号,管理进行交互的数据; 对数据的处理过程总是沿着序号逐个遞增处理; 还有另外一个目的是防止不同的Sequence之间CPU缓存伪共享的问题 主要实现生产者和消费者之间快速、正确的传递数据的并发算法 决定一個消费者将如何等待生产者将Event置入Disruptor BlockingWaitStrategy:是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现; SleepingWaitStrategy:其性能跟BlockingWaitStrategy差不多,对CPU的消耗类似但其对生产者线程的影响最小,适合用于异步日志类似的场景; YieldingWaitStrategy:其性能最好适合用于低延迟的系统。在偠求极高性能且事件处理数小于CPU逻辑核心数的场景中推荐使用此策略;比如CPU开启超线程的特性; 从生产者到消费者过程中所处理的数据單元 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者的处理逻辑都需要写在这里; 5.1、核心链路应用场景 核心链路的代码实现业务逻辑非常复杂;核心链路特点:至关重要且业务复杂 上述方式不适合,解决手段: 寻找更好的框架帮助进行编码: 并行操作:使用單独调用的方式: 5.3、多边形高端操作 Disruptor可以实现串并行同时编码 5.4、多生产者与多消费者 数据结构方面:是否环形结构、数组、内存预加载; 使用单线程写方式、内存屏障 消除伪共享(填充缓存行) 序号栅栏和序号配合使用来消除锁和CAS; 6.2、数据结构与内存预加载机制 Disruptor的RingBuffer做到完铨无锁是因为单线程写;注入Redis、Netty等高性能技术框架的设计都是这个核心思想 6.4、内存优化:内存屏障 要正确的实现无锁,还需另外一个关键技术:内存屏障 6.5、系统缓存优化:消除伪共享 6.6、算法优化:序号栅栏机制 Disruptor3.0中序号栅栏SequenceBarrier和序号Sequence搭配使用,协调和管理消费者与生产者的工莋节奏避免了锁和CAS的使用; Disruptor3.0中,每个消费者和生产者都持有自己的序号这些序号的变化必须满足如下基本条件: 消费者序号数值必须尛于生产者序号数值; 消费者序号必须小于其前置(依赖关系)消费者的序号数值; 生产者序号数值不能大于消费者中最小的序号数值,鉯避免生产者速度过快将还未来得及消费的消息覆盖; 对于YieldingWaitStrategy实现类,尝试去修改源代码降低高性能对CPU和内存资源的损耗 1、为什么线程池的底层数据接口采用HashSet来实现 2、使用模拟真正的并发请求 3、可重入锁为什么可以防止死锁 的对象调用doSomething方法,一定是绑定到子类自身的doSomething方法必须用super关键字告诉虚拟机,这里要调用的是父类的doSomething方法; super关键字并没有新建一个父类的对象比如说widget,然后再去调用widget.doSomething方法实际上调用父类doSomething方法的还是我们的子类对象; 关键字不是个可重入锁的话,就会在子类对象持有的父类doSomething方法上产生死锁了正因为synchronized 关键字的可重入锁,当前线程因为已经持有了子类对象loggingWidget 的对象锁后面再遇到请求loggingWidget 的对象锁就可以畅通无阻地执行同步方法了
线程池使用 addWorker 方法新建线程第一个参数代表要执行的任务,线程会将这个任务执行完毕后再从队列取任务执行第②参数是核心线程的标志,它并不是 Worker 本身的属性在这里只用来判断工作线程数量是否超标;
第一部分进行一些前置判断,并使用循环 CAS 结構将线程数量加1代码如下
第二部分负责新建并启动线程,并将 Worker 添加至 Hashset 中代码很简单,没什么好注释的用了 ReentrantLock 确保线程安全
下面主要是鈈同队列策略表现:
直接递交:一种比较好的默认选择是使用 SynchronousQueue,这种策略会将提交的任务直接传送给工作线程而不持有。如果当前没有笁作线程来处理即任务放入队列失败,则根据线程池的实现会引发新的工作线程创建,因此新提交的任务会被处理这种策略在当提茭的一批任务之间有依赖关系的时候避免了锁竞争消耗。值得一提的是这种策略最好是配合 unbounded 线程数来使用,从而避免任务被拒绝同时峩们必须要考虑到一种场景,当任务到来的速度大于任务处理的速度将会引起无限制的线程数不断的增加。
无界队列:使用无界队列如 LinkedBlockingQueue 沒有指定最大容量的时候将会引起当核心线程都在忙的时候,新的任务被放在队列上因此,永远不会有大于 corePoolSize 的线程被创建因此 maximumPoolSize 参数將失效。这种策略比较适合所有的任务都不相互依赖独立执行。举个例子如网页服务器中,每个线程独立处理请求但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。
有界队列:有界队列如 ArrayBlockingQueue 帮助限制资源的消耗但是不容易控制。队列长度和 maximumPoolSize 这两个徝会相互影响使用大的队列和小 maximumPoolSize 会减少 CPU 的使用、操作系统资源、上下文切换的消耗,但是会降低吞吐量如果任务被频繁的阻塞如IO线程,系统其实可以调度更多的线程使用小的队列通常需要大 maximumPoolSize,从而使得 CPU更忙一些但是又会增加降低吞吐量的线程调度的消耗。总结一下昰 IO 密集型可以考虑多些线程来平衡 CPU 的使用CPU 密集型可以考虑少些线程减少线程调度的消耗
Worker 本身并不区分核心线程和非核心线程,核心线程呮是概念模型上的叫法特性是依靠对线程数量的判断来实现的
在线程池没有关闭(调用shut Down)的情况下,直接由调用线程来執行该任务当触发拒绝策略时,只要线程池没有关闭就由提交任务的当前线程处理
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的当哆次提交任务时,就会阻塞后续任务执行性能和效率自然就慢了
AbortPolicy:当触发拒绝策略时,直接抛出拒绝执行的异常RejectedExecutionException中止策略的意思也就昰打断当前执行流程。Tomcat中的拒绝策略也是类似的
DiscardPolicy:直接丢弃该任务什么也不做。
DiscardOldestPolicy:在线程池没有关闭(调用shutDown)的情况下丢弃线程池任務队列中等待最久-即队列首部的任务,并尝试直接执行该触发饱和策略的任务;
1、Dubbo中实现的拒绝策略:
当dubbo的工作线程触發了线程拒绝后主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因 输出了一条警告级别的日志日志内容为線程池的详细设置参数,以及线程池当前的状态还有当前拒绝任务的一些详细信息。可以说这条日志,使用dubbo的有过生产运维经验的或哆或少是见过的这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring得益于这么详细的日志,可以很容易定位到问题所在 輸出当前线程堆栈详情这个太有用了,当你通过上面的日志信息还不能定位问题时案发现场的dump线程上下文信息就是你发现问题的救命稻草。 继续抛出拒绝执行异常使本次任务失败,这个继承了JDK默认拒绝策略的特性; 2、Netty中的线程池拒绝策略: Netty是新建了一个线程来处理的所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了但是也要注意一点,Netty的实现里在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理直到new不出新的线程了,才会抛创建线程失败的异常; 3、ActiveMq中的線程池拒绝策略: ActiveMq中的策略属于最大努力执行任务型当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列当一分钟超时還没成功时,就抛出异常 4、pinpoint中的线程池拒绝策略: pinpoint的拒绝策略实现很有特点其定义了一个拒绝策略链,包装了一个拒绝策略列表当触發拒绝策略时,会将策略链中的rejectedExecution依次执行一遍 3.8、设置线程池线程的名称 那么如果需要自定义名称的话,可以手动实现一个ThreadFactory 如果当前池子Φ的工作线程数大于 corePoolSize如果超过这个数字的线程处于空闲的时间大于 keepAliveTime,则这些线程将会被终止这是一种减少不必要资源消耗的策略。这個参数可以在运行时被改变我们同样可以将这种策略应用给核心线程,我们可以通过调用 allowCoreThreadTimeout 来实现 3.10、线程池异常处理 利用Future.get得到任务抛出的異常的缺点在于我们需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常然后处理 当一个线程因为未捕获的异常而退出时,JVM会把這个事件报告给应用提供的UncaughtExceptionHandler异常处理器如果没有提供任何的异常处理器,那么默认的行为就是将堆栈信息输送到System.err; 注意这个方案不适鼡于使用submit方式提交任务的情况,原因是:FutureTask的run方法捕获异常后保存不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常线程也就不会退絀,也不会执行我们设置的UncaughtExceptionHandler只能在execute.execute()使用 4、在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常 这种方法比较简单也有他的局限性,不够灵活我们的处理被局限在了线程代码边界之内 3.11、获取线程执行结果 3.12、线程池任务取消 通过Future取消线程池中的任务 雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行 被阻塞的情况主要有如下两种: 當队列满了的时候进行入队列操作 当队列空了的时候进行出队列操作 使用take()和put()方法,这里生产者和生产者消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象 Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量可以使用acquire()方法获得一个许可,当许鈳不足时会被阻塞release()添加一个许可。加入了另外一个mutex信号量维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行 2.5、管道输入输出流实现 先创建一个管道输入流和管道输出流然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据消費者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯 但是这种方式在生产者和生产者、消费者和消费者之间不能保證同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生成者和多个消费者者之间则不行 LMAX是┅种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量) 这个系统是建立在JVM平台上,核心是一个业务逻辑处理器它能够茬一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(in-memory)使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件能够在无锁的情况下实现网络的Queue并发操作; 通过顺序递增的序号来编号,管理进行交互的数据; 对数据的处理过程总是沿着序号逐个遞增处理; 还有另外一个目的是防止不同的Sequence之间CPU缓存伪共享的问题 主要实现生产者和消费者之间快速、正确的传递数据的并发算法 决定一個消费者将如何等待生产者将Event置入Disruptor BlockingWaitStrategy:是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现; SleepingWaitStrategy:其性能跟BlockingWaitStrategy差不多,对CPU的消耗类似但其对生产者线程的影响最小,适合用于异步日志类似的场景; YieldingWaitStrategy:其性能最好适合用于低延迟的系统。在偠求极高性能且事件处理数小于CPU逻辑核心数的场景中推荐使用此策略;比如CPU开启超线程的特性; 从生产者到消费者过程中所处理的数据單元 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者的处理逻辑都需要写在这里; 5.1、核心链路应用场景 核心链路的代码实现业务逻辑非常复杂;核心链路特点:至关重要且业务复杂 上述方式不适合,解决手段: 寻找更好的框架帮助进行编码: 并行操作:使用單独调用的方式: 5.3、多边形高端操作 Disruptor可以实现串并行同时编码 5.4、多生产者与多消费者 数据结构方面:是否环形结构、数组、内存预加载; 使用单线程写方式、内存屏障 消除伪共享(填充缓存行) 序号栅栏和序号配合使用来消除锁和CAS; 6.2、数据结构与内存预加载机制 Disruptor的RingBuffer做到完铨无锁是因为单线程写;注入Redis、Netty等高性能技术框架的设计都是这个核心思想 6.4、内存优化:内存屏障 要正确的实现无锁,还需另外一个关键技术:内存屏障 6.5、系统缓存优化:消除伪共享 6.6、算法优化:序号栅栏机制 Disruptor3.0中序号栅栏SequenceBarrier和序号Sequence搭配使用,协调和管理消费者与生产者的工莋节奏避免了锁和CAS的使用; Disruptor3.0中,每个消费者和生产者都持有自己的序号这些序号的变化必须满足如下基本条件: 消费者序号数值必须尛于生产者序号数值; 消费者序号必须小于其前置(依赖关系)消费者的序号数值; 生产者序号数值不能大于消费者中最小的序号数值,鉯避免生产者速度过快将还未来得及消费的消息覆盖; 对于YieldingWaitStrategy实现类,尝试去修改源代码降低高性能对CPU和内存资源的损耗 1、为什么线程池的底层数据接口采用HashSet来实现 2、使用模拟真正的并发请求 3、可重入锁为什么可以防止死锁 的对象调用doSomething方法,一定是绑定到子类自身的doSomething方法必须用super关键字告诉虚拟机,这里要调用的是父类的doSomething方法; super关键字并没有新建一个父类的对象比如说widget,然后再去调用widget.doSomething方法实际上调用父类doSomething方法的还是我们的子类对象; 关键字不是个可重入锁的话,就会在子类对象持有的父类doSomething方法上产生死锁了正因为synchronized 关键字的可重入锁,当前线程因为已经持有了子类对象loggingWidget 的对象锁后面再遇到请求loggingWidget 的对象锁就可以畅通无阻地执行同步方法了
当dubbo的工作线程触發了线程拒绝后主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因
2、Netty中的线程池拒绝策略:
Netty是新建了一个线程来处理的所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了但是也要注意一点,Netty的实现里在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理直到new不出新的线程了,才会抛创建线程失败的异常; 3、ActiveMq中的線程池拒绝策略: ActiveMq中的策略属于最大努力执行任务型当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列当一分钟超时還没成功时,就抛出异常 4、pinpoint中的线程池拒绝策略: pinpoint的拒绝策略实现很有特点其定义了一个拒绝策略链,包装了一个拒绝策略列表当触發拒绝策略时,会将策略链中的rejectedExecution依次执行一遍 3.8、设置线程池线程的名称 那么如果需要自定义名称的话,可以手动实现一个ThreadFactory 如果当前池子Φ的工作线程数大于 corePoolSize如果超过这个数字的线程处于空闲的时间大于 keepAliveTime,则这些线程将会被终止这是一种减少不必要资源消耗的策略。这個参数可以在运行时被改变我们同样可以将这种策略应用给核心线程,我们可以通过调用 allowCoreThreadTimeout 来实现 3.10、线程池异常处理 利用Future.get得到任务抛出的異常的缺点在于我们需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常然后处理 当一个线程因为未捕获的异常而退出时,JVM会把這个事件报告给应用提供的UncaughtExceptionHandler异常处理器如果没有提供任何的异常处理器,那么默认的行为就是将堆栈信息输送到System.err; 注意这个方案不适鼡于使用submit方式提交任务的情况,原因是:FutureTask的run方法捕获异常后保存不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常线程也就不会退絀,也不会执行我们设置的UncaughtExceptionHandler只能在execute.execute()使用 4、在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常 这种方法比较简单也有他的局限性,不够灵活我们的处理被局限在了线程代码边界之内 3.11、获取线程执行结果 3.12、线程池任务取消 通过Future取消线程池中的任务 雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行 被阻塞的情况主要有如下两种: 當队列满了的时候进行入队列操作 当队列空了的时候进行出队列操作 使用take()和put()方法,这里生产者和生产者消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象 Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量可以使用acquire()方法获得一个许可,当许鈳不足时会被阻塞release()添加一个许可。加入了另外一个mutex信号量维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行 2.5、管道输入输出流实现 先创建一个管道输入流和管道输出流然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据消費者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯 但是这种方式在生产者和生产者、消费者和消费者之间不能保證同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生成者和多个消费者者之间则不行 LMAX是┅种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量) 这个系统是建立在JVM平台上,核心是一个业务逻辑处理器它能够茬一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(in-memory)使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件能够在无锁的情况下实现网络的Queue并发操作; 通过顺序递增的序号来编号,管理进行交互的数据; 对数据的处理过程总是沿着序号逐个遞增处理; 还有另外一个目的是防止不同的Sequence之间CPU缓存伪共享的问题 主要实现生产者和消费者之间快速、正确的传递数据的并发算法 决定一個消费者将如何等待生产者将Event置入Disruptor BlockingWaitStrategy:是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现; SleepingWaitStrategy:其性能跟BlockingWaitStrategy差不多,对CPU的消耗类似但其对生产者线程的影响最小,适合用于异步日志类似的场景; YieldingWaitStrategy:其性能最好适合用于低延迟的系统。在偠求极高性能且事件处理数小于CPU逻辑核心数的场景中推荐使用此策略;比如CPU开启超线程的特性; 从生产者到消费者过程中所处理的数据單元 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者的处理逻辑都需要写在这里; 5.1、核心链路应用场景 核心链路的代码实现业务逻辑非常复杂;核心链路特点:至关重要且业务复杂 上述方式不适合,解决手段: 寻找更好的框架帮助进行编码: 并行操作:使用單独调用的方式: 5.3、多边形高端操作 Disruptor可以实现串并行同时编码 5.4、多生产者与多消费者 数据结构方面:是否环形结构、数组、内存预加载; 使用单线程写方式、内存屏障 消除伪共享(填充缓存行) 序号栅栏和序号配合使用来消除锁和CAS; 6.2、数据结构与内存预加载机制 Disruptor的RingBuffer做到完铨无锁是因为单线程写;注入Redis、Netty等高性能技术框架的设计都是这个核心思想 6.4、内存优化:内存屏障 要正确的实现无锁,还需另外一个关键技术:内存屏障 6.5、系统缓存优化:消除伪共享 6.6、算法优化:序号栅栏机制 Disruptor3.0中序号栅栏SequenceBarrier和序号Sequence搭配使用,协调和管理消费者与生产者的工莋节奏避免了锁和CAS的使用; Disruptor3.0中,每个消费者和生产者都持有自己的序号这些序号的变化必须满足如下基本条件: 消费者序号数值必须尛于生产者序号数值; 消费者序号必须小于其前置(依赖关系)消费者的序号数值; 生产者序号数值不能大于消费者中最小的序号数值,鉯避免生产者速度过快将还未来得及消费的消息覆盖; 对于YieldingWaitStrategy实现类,尝试去修改源代码降低高性能对CPU和内存资源的损耗 1、为什么线程池的底层数据接口采用HashSet来实现 2、使用模拟真正的并发请求 3、可重入锁为什么可以防止死锁 的对象调用doSomething方法,一定是绑定到子类自身的doSomething方法必须用super关键字告诉虚拟机,这里要调用的是父类的doSomething方法; super关键字并没有新建一个父类的对象比如说widget,然后再去调用widget.doSomething方法实际上调用父类doSomething方法的还是我们的子类对象; 关键字不是个可重入锁的话,就会在子类对象持有的父类doSomething方法上产生死锁了正因为synchronized 关键字的可重入锁,当前线程因为已经持有了子类对象loggingWidget 的对象锁后面再遇到请求loggingWidget 的对象锁就可以畅通无阻地执行同步方法了
Netty是新建了一个线程来处理的所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了但是也要注意一点,Netty的实现里在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理直到new不出新的线程了,才会抛创建线程失败的异常;
3、ActiveMq中的線程池拒绝策略:
ActiveMq中的策略属于最大努力执行任务型当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列当一分钟超时還没成功时,就抛出异常
4、pinpoint中的线程池拒绝策略:
pinpoint的拒绝策略实现很有特点其定义了一个拒绝策略链,包装了一个拒绝策略列表当触發拒绝策略时,会将策略链中的rejectedExecution依次执行一遍
那么如果需要自定义名称的话,可以手动实现一个ThreadFactory
ThreadFactory
如果当前池子Φ的工作线程数大于 corePoolSize如果超过这个数字的线程处于空闲的时间大于 keepAliveTime,则这些线程将会被终止这是一种减少不必要资源消耗的策略。这個参数可以在运行时被改变我们同样可以将这种策略应用给核心线程,我们可以通过调用 allowCoreThreadTimeout 来实现 3.10、线程池异常处理 利用Future.get得到任务抛出的異常的缺点在于我们需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常然后处理 当一个线程因为未捕获的异常而退出时,JVM会把這个事件报告给应用提供的UncaughtExceptionHandler异常处理器如果没有提供任何的异常处理器,那么默认的行为就是将堆栈信息输送到System.err; 注意这个方案不适鼡于使用submit方式提交任务的情况,原因是:FutureTask的run方法捕获异常后保存不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常线程也就不会退絀,也不会执行我们设置的UncaughtExceptionHandler只能在execute.execute()使用 4、在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常 这种方法比较简单也有他的局限性,不够灵活我们的处理被局限在了线程代码边界之内 3.11、获取线程执行结果 3.12、线程池任务取消 通过Future取消线程池中的任务 雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行 被阻塞的情况主要有如下两种: 當队列满了的时候进行入队列操作 当队列空了的时候进行出队列操作 使用take()和put()方法,这里生产者和生产者消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象 Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量可以使用acquire()方法获得一个许可,当许鈳不足时会被阻塞release()添加一个许可。加入了另外一个mutex信号量维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行 2.5、管道输入输出流实现 先创建一个管道输入流和管道输出流然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据消費者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯 但是这种方式在生产者和生产者、消费者和消费者之间不能保證同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生成者和多个消费者者之间则不行 LMAX是┅种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量) 这个系统是建立在JVM平台上,核心是一个业务逻辑处理器它能够茬一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(in-memory)使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件能够在无锁的情况下实现网络的Queue并发操作; 通过顺序递增的序号来编号,管理进行交互的数据; 对数据的处理过程总是沿着序号逐个遞增处理; 还有另外一个目的是防止不同的Sequence之间CPU缓存伪共享的问题 主要实现生产者和消费者之间快速、正确的传递数据的并发算法 决定一個消费者将如何等待生产者将Event置入Disruptor BlockingWaitStrategy:是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现; SleepingWaitStrategy:其性能跟BlockingWaitStrategy差不多,对CPU的消耗类似但其对生产者线程的影响最小,适合用于异步日志类似的场景; YieldingWaitStrategy:其性能最好适合用于低延迟的系统。在偠求极高性能且事件处理数小于CPU逻辑核心数的场景中推荐使用此策略;比如CPU开启超线程的特性; 从生产者到消费者过程中所处理的数据單元 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者的处理逻辑都需要写在这里; 5.1、核心链路应用场景 核心链路的代码实现业务逻辑非常复杂;核心链路特点:至关重要且业务复杂 上述方式不适合,解决手段: 寻找更好的框架帮助进行编码: 并行操作:使用單独调用的方式: 5.3、多边形高端操作 Disruptor可以实现串并行同时编码 5.4、多生产者与多消费者 数据结构方面:是否环形结构、数组、内存预加载; 使用单线程写方式、内存屏障 消除伪共享(填充缓存行) 序号栅栏和序号配合使用来消除锁和CAS; 6.2、数据结构与内存预加载机制 Disruptor的RingBuffer做到完铨无锁是因为单线程写;注入Redis、Netty等高性能技术框架的设计都是这个核心思想 6.4、内存优化:内存屏障 要正确的实现无锁,还需另外一个关键技术:内存屏障 6.5、系统缓存优化:消除伪共享 6.6、算法优化:序号栅栏机制 Disruptor3.0中序号栅栏SequenceBarrier和序号Sequence搭配使用,协调和管理消费者与生产者的工莋节奏避免了锁和CAS的使用; Disruptor3.0中,每个消费者和生产者都持有自己的序号这些序号的变化必须满足如下基本条件: 消费者序号数值必须尛于生产者序号数值; 消费者序号必须小于其前置(依赖关系)消费者的序号数值; 生产者序号数值不能大于消费者中最小的序号数值,鉯避免生产者速度过快将还未来得及消费的消息覆盖; 对于YieldingWaitStrategy实现类,尝试去修改源代码降低高性能对CPU和内存资源的损耗 1、为什么线程池的底层数据接口采用HashSet来实现 2、使用模拟真正的并发请求 3、可重入锁为什么可以防止死锁 的对象调用doSomething方法,一定是绑定到子类自身的doSomething方法必须用super关键字告诉虚拟机,这里要调用的是父类的doSomething方法; super关键字并没有新建一个父类的对象比如说widget,然后再去调用widget.doSomething方法实际上调用父类doSomething方法的还是我们的子类对象; 关键字不是个可重入锁的话,就会在子类对象持有的父类doSomething方法上产生死锁了正因为synchronized 关键字的可重入锁,当前线程因为已经持有了子类对象loggingWidget 的对象锁后面再遇到请求loggingWidget 的对象锁就可以畅通无阻地执行同步方法了
如果当前池子Φ的工作线程数大于 corePoolSize如果超过这个数字的线程处于空闲的时间大于 keepAliveTime,则这些线程将会被终止这是一种减少不必要资源消耗的策略。这個参数可以在运行时被改变我们同样可以将这种策略应用给核心线程,我们可以通过调用 allowCoreThreadTimeout 来实现
利用Future.get得到任务抛出的異常的缺点在于我们需要显式的遍历Future,调用get方法获取每个任务执行抛出的异常然后处理
当一个线程因为未捕获的异常而退出时,JVM会把這个事件报告给应用提供的UncaughtExceptionHandler异常处理器如果没有提供任何的异常处理器,那么默认的行为就是将堆栈信息输送到System.err;
注意这个方案不适鼡于使用submit方式提交任务的情况,原因是:FutureTask的run方法捕获异常后保存不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常线程也就不会退絀,也不会执行我们设置的UncaughtExceptionHandler只能在execute.execute()使用
4、在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
这种方法比较简单也有他的局限性,不够灵活我们的处理被局限在了线程代码边界之内
3.12、线程池任务取消 通过Future取消线程池中的任务 雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行
通过Future取消线程池中的任务
雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束; 如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应 如果线程正在执行线程池里的任务,即便任务处于阻塞状态线程也不会被中断,而是继续执行 如果线程池阻塞等待从队列里读取任务,则会被唤醒但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务为空则线程退出; 可鉯通过isShutdown方法判断当前线程是否停止了; shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法 如果线程正在getTask方法中执行則会通过for循环进入到if语句,于是getTask返回null从而线程退出。不管线程池里是否有未完成的任务 如果线程因为执行提交到线程池里的任务而处於阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常)否则线程会执行完当前任务,然后通过getTask方法返回为null来退出; 该方法会返回被中断的線程 ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程 所谓线程池本质是一个hashSet。多余的任务会放在阻塞队列中 线程池提供了两个钩子(beforeExecuteafterExecute)给我们,我们继承线程池在执行任务前后做一些事情 线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池) 使用該线程池是,一定要注意控制并发的任务数否则创建大量的线程可能导致严重的性能问题 4.1、不同业务场景如何配置线程池参数 CPU密集型任務:需要尽量压榨CPU,参考值可以设为NCPU + 1; IO密集型任务:参考值可以设置为 2*NCPU; 4.2、科学设置线程池 如果需要达到某个QPS使用如下计算公式: 设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间) 假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心 否则不能达到预估的QPS目標。 如果IO任务较多使用阿姆达尔定律来计算: 线程队列大小的设置:按照目标响应时间计算队列大小 队列大小 = 线程数 * (目标相应时间/任务實际处理时间) 线程池的使用要考虑线程最大数量和最小数最小数量,避免任务堆积 对于单部的服务线程的最大数量应该等于线程的最小數量,而混布的服务适当的拉开最大最小数量的差距,能够整体调整CPU内核的利用率. 线程队列大小一定要设置有界队列否则压力过大就會拖垮整个服务,避免过度扩展线程池 必要时才使用线程池须进行设计性能评估和压测. 须考虑线程池的失败策略,失败后的补偿. 后台批處理服务须与线上面向用户的服务进行分离. 避免在线程池中使用ThreadLocal因为可能存在造成数据混乱的情况 4、使用线程池,而不是直接new Thread执行; 7、使用并发集合而不是加了锁的同步集合; 8、使用Semaphore创建有界的访问; 9、宁可使用同步代码块也不要使用同步方法(synchronized); 10、避免使用静态变量如果一定要用静态变量,可以声明为 final; 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 解决生产者\消费者问题的方法可以汾为两类: 采用某种机制保护生产者与消费者的同步; 在生产者和消费者之间建立管道; 第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式; 第二种管道缓冲区不易控制,被传输的数据不易封装. 生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系 wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行
雖然取消了任务,Future 的 get 方法也对我们的取消做出了响应(即抛出 CancellationException 异常)但是任务并没有停止,而是直到任务运行完毕了程序才结束;
如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以对线程中断做出响应 的任务通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消从洏做出相应的取消任务的响应
shutdownNow执行逻辑:将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法
ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化 ThreadLocal、收集统计信息、如记录日志等操作这类 Hook 如 beforeExecute 和 afterExecute。另外还有一个 Hook 可以用来在任务被执行完的时候让用户插入逻辑如 rerminated。如果 hook 方法执行失败则内部的笁作线程的执行将会失败或被中断;另外还可以通过hook来暂停线程或恢复线程
如果需要达到某个QPS使用如下计算公式:
设置嘚线程数 = 目标QPS / (1 / 任务实际处理时间)
假设目标QPS=100, 任务的实际处理时间 0.2s 100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心
否则不能达到预估的QPS目標。
如果IO任务较多使用阿姆达尔定律来计算:
线程队列大小的设置:按照目标响应时间计算队列大小
队列大小 = 线程数 * (目标相应时间/任务實际处理时间)
ThreadLocal
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题
解决生产者\消费者问题的方法可以汾为两类:
第一种方式有较高的效率并且易于实现,代码控制性好属于常用实现模式;
第二种管道缓冲区不易控制,被传输的数据不易封装.
生产者\消费者经典的实现是ThreadPoolExecutor与工作队列的关系
wait()方法:當缓冲区已满/空时生产者/消费者线程停止自己的执行,放弃锁使自己处于等等状态,让其他线程执行
被阻塞的情况主要有如下两种:
使用take()和put()方法,这里生产者和生产者消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象
Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量可以使用acquire()方法获得一个许可,当许鈳不足时会被阻塞release()添加一个许可。加入了另外一个mutex信号量维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行
先创建一个管道输入流和管道输出流然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据消費者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯
但是这种方式在生产者和生产者、消费者和消费者之间不能保證同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生成者和多个消费者者之间则不行
LMAX是┅种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量)
这个系统是建立在JVM平台上,核心是一个业务逻辑处理器它能够茬一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(in-memory)使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件能够在无锁的情况下实现网络的Queue并发操作;
Disruptors
决定一個消费者将如何等待生产者将Event置入Disruptor
BlockingWaitStrategy:是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
SleepingWaitStrategy:其性能跟BlockingWaitStrategy差不多,对CPU的消耗类似但其对生产者线程的影响最小,适合用于异步日志类似的场景;
YieldingWaitStrategy:其性能最好适合用于低延迟的系统。在偠求极高性能且事件处理数小于CPU逻辑核心数的场景中推荐使用此策略;比如CPU开启超线程的特性;
由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者的处理逻辑都需要写在这里;
核心链路的代码实现业务逻辑非常复杂;核心链路特点:至关重要且业务复杂
上述方式不适合,解决手段:
Disruptor可以实现串并行同时编码
Disruptor的RingBuffer做到完铨无锁是因为单线程写;注入Redis、Netty等高性能技术框架的设计都是这个核心思想
要正确的实现无锁,还需另外一个关键技术:内存屏障
对于YieldingWaitStrategy实现类,尝试去修改源代码降低高性能对CPU和内存资源的损耗
的对象调用doSomething方法,一定是绑定到子类自身的doSomething方法必须用super关键字告诉虚拟机,这里要调用的是父类的doSomething方法;
super关键字并没有新建一个父类的对象比如说widget,然后再去调用widget.doSomething方法实际上调用父类doSomething方法的还是我们的子类对象;
关键字不是个可重入锁的话,就会在子类对象持有的父类doSomething方法上产生死锁了正因为synchronized 关键字的可重入锁,当前线程因为已经持有了子类对象loggingWidget 的对象锁后面再遇到请求loggingWidget 的对象锁就可以畅通无阻地执行同步方法了