JUC学习笔记 学习来源:黑马程序员
学习时间:2021年11月3日,2023年6月26日复习
0 Java中的锁 0.1 框架
0.2 悲观锁 vs 乐观锁 乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在Java和数据库中都有此概念对应的实际应用。
先说概念。对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。Java中,synchronized关键字和Lock的实现类都是悲观锁 。
而乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁 ,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。
乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法 ,Java原子类中的递增操作就通过CAS自旋实现的。
根据从上面的概念描述我们可以发现:
悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。
调用方式示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public synchronized void testMethod () { } private ReentrantLock lock = new ReentrantLock(); public void modifyPublicResources () { lock.lock(); lock.unlock(); } private AtomicInteger atomicInteger = new AtomicInteger(); atomicInteger.incrementAndGet();
通过调用方式示例,我们可以发现悲观锁基本都是在显式的锁定之后再操作同步资源,而乐观锁则直接去操作同步资源。那么,为何乐观锁能够做到不锁定同步资源也可以正确的实现线程同步呢?我们通过介绍乐观锁的主要实现方式 “CAS” 的技术原理来为大家解惑。
CAS全称 Compare And Swap
(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞 )的情况下实现多线程之间的变量同步。java.util.concurrent
包中的原子类就是通过CAS来实现了乐观锁。
CAS算法涉及到三个操作数:
需要读写的内存值 V
进行比较的值 A
要写入的新值 B
当且仅当 V 的值等于 A 时,CAS通过原子方式用新值 B 来更新 V 的值(“比较+更新”整体是一个原子操作),否则不会执行任何操作。一般情况下,“更新”是一个不断重试的操作。
之前提到java.util.concurrent
包中的原子类,就是通过CAS来实现了乐观锁,那么我们进入原子类AtomicInteger
的源码,看一下AtomicInteger
的定义:
根据定义我们可以看出各属性的作用:
unsafe: 获取并操作内存的数据。
valueOffset: 存储value在AtomicInteger中的偏移量。
value: 存储AtomicInteger的int值,该属性需要借助volatile关键字保证其在线程间是可见的。
接下来,我们查看AtomicInteger的自增函数incrementAndGet()的源码时,发现自增函数底层调用的是unsafe.getAndAddInt()。但是由于JDK本身只有Unsafe.class,只通过class文件中的参数名,并不能很好的了解方法的作用,所以我们通过OpenJDK 8 来查看Unsafe的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final int incrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, 1 ) + 1 ; } public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final int getAndAddInt (Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
根据OpenJDK 8的源码我们可以看出,getAndAddInt()循环获取给定对象o中的偏移量处的值v,然后判断内存值是否等于v。如果相等则将内存值设置为 v + delta,否则返回false,继续循环进行重试,直到设置成功才能退出循环,并且将旧值返回。整个“比较+更新”操作封装在compareAndSwapInt()中,在JNI里是借助于一个CPU指令完成的,属于原子操作,可以保证多个线程都能够看到同一个变量的修改值。
后续JDK通过CPU的cmpxchg指令 ,去比较寄存器中的 A 和 内存中的值 V。如果相等,就把要写入的新值 B 存入内存中。如果不相等,就将内存值 V 赋值给寄存器中的值 A。然后通过Java代码中的while循环再次调用cmpxchg指令进行重试,直到设置成功为止。
CAS虽然很高效,但是它也存在三大问题,这里也简单说一下:
ABA问题:CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。 ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。
JDK从1.5开始提供了AtomicStampedReference
类来解决ABA问题,具体操作封装在compareAndSet()中。compareAndSet()首先检查当前引用和当前标志与预期引用和预期标志是否相等,如果都相等,则以原子方式将引用值和标志的值设置为给定的更新值。
循环时间长开销大 。CAS操作如果长时间不成功,会导致其一直自旋 ,给CPU带来非常大的开销。
只能保证一个共享变量的原子操作 :对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
Java从1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。
0.3 自旋锁 vs 适应性自旋锁 阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。
在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁 。
而为了让当前线程“稍等一下”,我们需让当前线程进行自旋(死循环检查锁的持有状态 ),如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。
自旋锁本身是有缺点的,它不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是10次,可以使用-XX:PreBlockSpin来更改)没有成功获得锁,就应当挂起线程。
自旋锁的实现原理同样也是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。
自旋锁在JDK1.4.2中引入,使用-XX:+UseSpinning来开启。JDK 6中变为默认开启,并且引入了自适应的自旋锁(适应性自旋锁)。
自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
0.4 无锁 vs 偏向锁 vs 轻量级锁 vs 重量级锁 这四种锁是指锁的状态,专门针对synchronized 的。在介绍这四种锁状态之前还需要介绍一些额外的知识。
Java对象头
synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?
我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。
Mark Word :默认存储对象的HashCode,分代年龄和锁标志位信 息。这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。
Klass Point :对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
Monitor
Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁 。
Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。
现在话题回到synchronized,synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。
如同我们在自旋锁中提到的“阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长”。这种方式就是synchronized最初实现同步的方式,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。
所以目前锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级。
通过上面的介绍,我们对synchronized的加锁机制以及相关知识有了一个了解,那么下面我们给出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:
锁状态
存储内容
存储内容
无锁
对象的hashCode、对象分代年龄、是否是偏向锁(0)
01
偏向锁
偏向线程ID、偏向时间戳、对象分代年龄、是否是偏向锁(1)
01
轻量级锁
指向栈中锁记录的指针
00
重量级锁
指向互斥量(重量级锁)的指针
10
无锁
无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。
无锁的特点就是修改操作在循环内进行 ,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。
偏向锁
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。
在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。
当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID 。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。
偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。
轻量级锁
是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能 。
在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。
拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。
如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。
若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。
重量级锁
升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态 。
综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。
0.5 公平锁 vs 非公平锁 公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死 。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待 。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
如上图所示,假设有一口水井,有管理员看守,管理员有一把锁,只有拿到锁的人才能够打水,打完水要把锁还给管理员。每个过来打水的人都要管理员的允许并拿到锁之后才能去打水,如果前面有人正在打水,那么这个想要打水的人就必须排队。管理员会查看下一个要去打水的人是不是队伍里排最前面的人,如果是的话,才会给你锁让你去打水;如果你不是排第一的人,就必须去队尾排队,这就是公平锁。
但是对于非公平锁,管理员对打水的人没有要求。即使等待队伍里有排队等待的人,但如果在上一个人刚打完水把锁还给管理员而且管理员还没有允许等待队伍里下一个人去打水时,刚好来了一个插队的人,这个插队的人是可以直接从管理员那里拿到锁去打水,不需要排队,原本排队等待的人只能继续等待。如下图所示:
接下来我们通过ReentrantLock的源码来讲解公平锁和非公平锁。
根据代码可知,ReentrantLock里面有一个内部类Sync,Sync继承AQS(AbstractQueuedSynchronizer),添加锁和释放锁的大部分操作实际上都是在Sync中实现的。它有公平锁FairSync和非公平锁NonfairSync两个子类。ReentrantLock默认使用非公平锁 ,也可以通过构造器来显示的指定使用公平锁。
下面我们来看一下公平锁与非公平锁的加锁方法的源码:
通过上图中的源代码对比,我们可以明显的看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()。再进入hasQueuedPredecessors(),可以看到该方法主要做一件事情:主要是判断当前线程是否位于同步队列中的第一个。如果是则返回true,否则返回false。
综上,公平锁就是通过同步队列来实现多个线程按照申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待问题,直接尝试获取锁,所以存在后申请却先获得锁的情况。
0.6 可重入锁 vs 非可重入锁 可重入锁又名递归锁 ,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。下面用示例代码来进行分析:
1 2 3 4 5 6 7 8 9 10 public class Widget { public synchronized void doSomething () { System.out.println("方法1执行..." ); doOthers(); } public synchronized void doOthers () { System.out.println("方法2执行..." ); } }
在上面的代码中,类中的两个方法都是被内置锁synchronized修饰的,doSomething()方法中调用doOthers()方法。因为内置锁是可重入的,所以同一个线程在调用doOthers()时可以直接获得当前对象的锁,进入doOthers()进行操作。
如果是一个不可重入锁,那么当前线程在调用doOthers()之前需要将执行doSomething()时获取当前对象的锁释放掉,实际上该对象锁已被当前线程所持有,且无法释放。所以此时会出现死锁。
而为什么可重入锁就可以在嵌套调用时可以自动获得锁呢?我们通过图示和源码来分别解析一下。
还是打水的例子,有多个人在排队打水,此时管理员允许锁和同一个人的多个水桶绑定。这个人用多个水桶打水时,第一个水桶和锁绑定并打完水之后,第二个水桶也可以直接和锁绑定并开始打水,所有的水桶都打完水之后打水人才会将锁还给管理员。这个人的所有打水流程都能够成功执行,后续等待的人也能够打到水。这就是可重入锁。
但如果是非可重入锁的话,此时管理员只允许锁和同一个人的一个水桶绑定。第一个水桶和锁绑定打完水之后并不会释放锁,导致第二个水桶不能和锁绑定也无法打水。当前线程出现死锁,整个等待队列中的所有线程都无法被唤醒。
之前我们说过ReentrantLock和synchronized都是重入锁,那么我们通过重入锁ReentrantLock以及非可重入锁NonReentrantLock的源码来对比分析一下为什么非可重入锁在重复调用同步资源时会出现死锁。
首先ReentrantLock和NonReentrantLock都继承父类AQS,其父类AQS中维护了一个同步状态status来计数重入次数,status初始值为0。
当线程尝试获取锁时,可重入锁先尝试获取并更新status值,如果status == 0表示没有其他线程在执行同步代码,则把status置为1,当前线程开始执行。如果status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行status+1,且当前线程可以再次获取锁。而非可重入锁是直接去获取并尝试更新当前status的值,如果status != 0的话会导致其获取锁失败,当前线程阻塞。
释放锁时,可重入锁同样先获取当前status的值,在当前线程是持有锁的线程的前提下。如果status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。而非可重入锁则是在确定当前线程是持有锁的线程之后,直接将status置为0,将锁释放。
0.7 独享锁 vs 共享锁 独享锁和共享锁同样是一种概念。我们先介绍一下具体的概念,然后通过ReentrantLock和ReentrantReadWriteLock的源码来介绍独享锁和共享锁。
独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK中的synchronized和JUC中Lock的实现类就是互斥锁。
共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据 。
独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
下图为ReentrantReadWriteLock的部分源码:
我们看到ReentrantReadWriteLock有两把锁:ReadLock和WriteLock,由词知意,一个读锁一个写锁,合称“读写锁”。再进一步观察可以发现ReadLock和WriteLock是靠内部类Sync实现的锁。Sync是AQS的一个子类,这种结构在CountDownLatch、ReentrantLock、Semaphore里面也都存在。
在ReentrantReadWriteLock里面,读锁和写锁的锁主体都是Sync,但读锁和写锁的加锁方式不一样。读锁是共享锁,写锁是独享锁。读锁的共享锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升。
那读锁和写锁的具体加锁方式有什么区别呢?在了解源码之前我们需要回顾一下其他知识。 在最开始提及AQS的时候我们也提到了state字段(int类型,32位),该字段用来描述有多少线程获持有锁。
在独享锁中这个值通常是0或者1(如果是重入锁的话state值就是重入的次数),在共享锁中state就是持有锁的数量。但是在ReentrantReadWriteLock中有读、写两把锁,所以需要在一个整型变量state上分别描述读锁和写锁的数量(或者也可以叫状态)。于是将state变量“按位切割”切分成了两个部分,高16位表示读锁状态(读锁个数),低16位表示写锁状态(写锁个数)。如下图所示:
了解了概念之后我们再来看代码,先看写锁的加锁源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
这段代码首先取到当前锁的个数c,然后再通过c来获取写锁的个数w。因为写锁是低16位,所以取低16位的最大值与当前的c做与运算( int w = exclusiveCount©; ),高16位和0与运算后是0,剩下的就是低位运算的值,同时也是持有写锁的线程数目。
在取到写锁线程的数目后,首先判断是否已经有线程持有了锁。如果已经有线程持有了锁(c!=0),则查看当前写锁线程的数目,如果写线程数为0(即此时存在读锁)或者持有锁的线程不是当前线程就返回失败(涉及到公平锁和非公平锁的实现)。
如果写入锁的数量大于最大数(65535,2的16次方-1)就抛出一个Error。
如果当且写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果通过CAS增加写线程数失败也返回失败。
如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,返回成功!
tryAcquire()除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:必须确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。
因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,然后等待的读写线程才能够继续访问读写锁,同时前次写线程的修改对后续的读写线程可见。
接着是读锁的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
可以看到在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是“1<<16”。所以读写锁才能实现读读的过程共享,而读写、写读、写写的过程互斥。
1 基本概念 1.1 进程与线程
进程
程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的。
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
线程
一个进程之内可以分为一到多个线程。
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行 。
Java 中,线程作为小调度单位,进程作为资源分配的小单位。 在 windows 中进程是不活动的,只是作为线程的容器
二者对比
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集进程拥有共享的资源,如内存空间等,供其内部的线程共享
进程间通信较为复杂,同一台计算机的进程通信称为 IPC(Inter-process communication)
不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量,线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
1.2 进程和线程的切换 上下文切换
内核为每一个进程维持一个上下文。上下文就是内核重新启动一个被抢占的进程所需的状态。 包括以下内容:
通用目的寄存器
浮点寄存器
程序计数器
用户栈
状态寄存器
内核栈
各种内核数据结构:比如描绘地址空间的页表 ,包含有关当前进程信息的进程表 ,以及包含进程已打开文件的信息的文件表
进程切换和线程切换的主要区别
最主要的一个区别在于进程切换涉及虚拟地址空间的切换而线程不会 。因为每个进程都有自己的虚拟地址空间,而线程是共享所在进程的虚拟地址空间的 ,因此同一个进程中的线程进行线程切换时不涉及虚拟地址空间的转换
页表查找是一个很慢的过程,因此通常使用cache来缓存常用的地址映射,这样可以加速页表查找,这个cache就是快表TLB(translation Lookaside Buffer,用来加速页表查找)。由于每个进程都有自己的虚拟地址空间,那么显然每个进程都有自己的页表,那么当进程切换后页表也要进行切换,页表切换后TLB就失效了 ,cache失效导致命中率降低,那么虚拟地址转换为物理地址就会变慢,表现出来的就是程序运行会变慢,而线程切换则不会导致TLB失效,因为线程线程无需切换地址空间,因此我们通常说线程切换要比较进程切换快
而且还可能出现缺页中断 ,这就需要操作系统将需要的内容调入内存中,若内存已满则还需要将不用的内容调出内存,这也需要花费时间
为什么TLB能加快访问速度
快表可以避免每次都对页号进行地址的有效性判断。快表中保存了对应的物理块号,可以直接计算出物理地址,无需再进行有效性检查
1.3 并发与并行 并发是一个CPU在不同的时间去不同线程中执行指令。
并行是多个CPU同时处理不同的线程。
引用 Rob Pike 的一段描述:
并发(concurrent)是同一时间应对 (dealing with)多件事情的能力
并行(parallel)是同一时间动手做 (doing)多件事情的能力
1.4 应用之异步调用(案例1) 以调用方角度来讲,如果
需要等待结果返回,才能继续运行就是同步
不需要等待结果返回,就能继续运行就是异步
1) 设计 多线程可以让方法执行变为异步的(即不要巴巴干等着)比如说读取磁盘文件时,假设读取操作花费了 5 秒钟,如 果没有线程调度机制,这 5 秒 cpu 什么都做不了,其它代码都得暂停… 2) 结论
比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程
tomcat 的异步 servlet 也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞
tomcat 的工作线程 ui 程序中,开线程进行其他操作,避免阻塞 ui 线程
结论
单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用 cpu ,不至于一个线程总占用 cpu,别的线程没法干活
多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任 务都能拆分(参考后文的【阿姆达尔定律】)
也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化
2 线程的创建 2.1 创建一个线程(非主线程) 2.1.1 通过继承Thread类创建线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class CreateThread { public static void main (String[] args) { Thread myThread = new MyThread(); myThread.start(); } } class MyThread extends Thread { @Override public void run () { System.out.println("my thread running..." ); } }
使用继承方式的好处是,在run()
方法内获取当前线程直接使用this就可以了,无须使用Thread.currentThread()
方法;不好的地方是Java不支持多继承,如果继承了Thread类,那么就不能再继承其他类。另外任务与代码没有分离,当多个线程执行一样的任务时需要多份任务代码。
或者使用匿名内部类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class Test1 { public static void main (String[] args) { Thread t = new Thread(){ @Override public void run () { log.debug("running" ); } }; t.setName("t1" ); t.start(); log.debug("running" ); } }
运行结果:
1 2 2021-11-03 20:03:05 [ main:0 ] - [ DEBUG ] running 2021-11-03 20:03:05 [ t1:0 ] - [ DEBUG ] running
2.1.2 使用Runnable配合Thread(推荐) 把【线程】和【任务】(要执行的代码)分开:
Thread 代表线程
Runnable 可执行的任务(线程要执行的代码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class CreateThread2 { private static class MyRunnable implements Runnable { @Override public void run () { System.out.println("my runnable running..." ); } } public static void main (String[] args) { MyRunnable myRunnable = new MyRunnable(); Thread thread = new Thread(myRunnable); thread.start(); } }
或者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class Test2 { public static void main (String[] args) { Runnable r = new Runnable(){ @Override public void run () { log.debug("running" ); } }; Thread t = new Thread(r, "t2" ); t.start(); } }
执行结果:
1 2021-11-03 20:03:38 [ t2:0 ] - [ DEBUG ] running
方法二的简化版——lamda表达式:
当一个接口带有@FunctionalInterface
注解时,是可以使用lambda来简化操作的
所以方法二中的代码可以被简化为:
1 2 3 4 5 6 7 8 9 10 @Slf4j public class Test2 { public static void main (String[] args) { Thread t = new Thread(() -> { log.debug("running" ); }, "t2" ); t.start(); } }
原理之 Thread 与 Runnable 的关系
分析 Thread 的源码,理清它与 Runnable 的关系
小结
方法1 是把线程和任务合并在了一起
方法2 是把线程和任务分开了
用 Runnable 更容易与线程池等高级 API 配合,用 Runnable 让任务类脱离了 Thread 继承体系,更灵活
2.1.3 使用FutureTask与Callable结合 使用FutureTask可以用泛型指定线程的返回值类型(Runnable的run方法没有返回值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class Test3 { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() { @Override public Integer call () throws Exception { log.debug("running..." ); Thread.sleep(1000 ); return 100 ; } }); Thread t = new Thread(task, "t1" ); t.start(); log.debug("执行结果为:{}" , task.get()); } }
执行结果:
1 2 2021-11-03 20:21:41 [ t1:0 ] - [ DEBUG ] running... 2021-11-03 20:21:42 [ main:1002 ] - [ DEBUG ] 执行结果为:100
2.1.4 总结 使用继承方式的好处是方便传参 ,你可以在子类里面添加成员变量,通过set方法设置参数或者通过构造函数进行传递,而如果使用Runnable方式,则只能使用主线程里面被声明为final的变量。不好的地方是Java不支持多继承 ,如果继承了Thread类,那么子类不能再继承其他类,而Runable则没有这个限制。前两种方式都没办法拿到任务的返回结果,但是Futuretask方式可以 。
2.2 线程执行的原理 2.2.1 栈与栈帧 Java Virtual Machine Stacks (Java 虚拟机栈) 我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?
其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
2.2.2 线程上下文切换 因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
线程的 cpu 时间片用完
垃圾回收 有更高优先级的线程需要运行
线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
Context Switch 频繁发生会影响性能
2.3 常见方法 2.3.1 start()和run() 被创建的Thread对象直接调用重写的run方法时, run方法是在主线程 中被执行的,而不是在我们所创建的线程中执行。所以如果想要在所创建的线程中执行run方法,需要使用Thread对象的start方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Slf4j public class Test4 { public static void main (String[] args) { Thread t1 = new Thread("t1" ) { @Override public void run () { log.debug("running" ); } }; t1.run(); } }
执行结果:
1 2021-11-03 20:36:08 [ main:1 ] - [ DEBUG ] running
注意这里的线程是主线程。
2.3.2 sleep()和yield()
sleep (使线程阻塞)
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞) ,可通过state()方法查看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j public class Test5 { public static void main (String[] args) { Thread t1 = new Thread("t1" ) { @Override public void run () { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }; t1.start(); log.debug("t1的状态是:{}" , t1.getState()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("t1的状态是:{}" , t1.getState()); } }
1 2 2021-11-04 14:26:34 [ main:0 ] - [ DEBUG ] t1的状态是:RUNNABLE 2021-11-04 14:26:35 [ main:1002 ] - [ DEBUG ] t1的状态是:TIMED_WAITING
其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class Test6 { public static void main (String[] args) { Thread t1 = new Thread("t1" ) { @Override public void run () { log.debug("enter sleep..." ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { log.debug("wake up..." ); e.printStackTrace(); } } }; t1.start(); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("interrupt..." ); t1.interrupt(); } }
1 2 3 4 5 6 2021-11-04 14:30:57 [ t1:0 ] - [ DEBUG ] enter sleep... 2021-11-04 14:30:58 [ main:1001 ] - [ DEBUG ] interrupt... 2021-11-04 14:30:58 [ t1:1001 ] - [ DEBUG ] wake up... java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at com.hongyi.c1.Test6$1.run(Test6.java:13)
睡眠结束后的线程未必会立刻得到执行
建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性 。
1 2 3 4 TimeUnit.SECONDS.sleep(1 ); TimeUnit.MINUTES.sleep(1 );
yield (让出当前线程)
调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态 (仍然有可能被执行),然后调度执行其它线程
具体的实现依赖于操作系统的任务调度器
线程优先级
2.3.3 join() 用于等待某个线程结束。哪个线程内调用join()方法,就等待哪个线程结束,然后再去执行其他线程。
如在主线程中调用t1.join()
,则是主线程等待t1
线程结束。
程序题
下列程序r的输出为?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class Test7 { static int r = 0 ; public static void main (String[] args) { test1(); } private static void test1 () { log.debug("开始" ); Thread t1 = new Thread(() -> { log.debug("开始" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } r = 10 ; log.debug("结束" ); }, "t1" ); t1.start(); log.debug("结果为: {}" , r); log.debug("结束" ); } }
答案:可能为0或10,一种可能的情况:
1 2 3 4 5 2021-11-04 14:58:15 [ main:0 ] - [ DEBUG ] 开始 2021-11-04 14:58:15 [ t1:1 ] - [ DEBUG ] 开始 2021-11-04 14:58:15 [ main:3 ] - [ DEBUG ] 结果为: 0 2021-11-04 14:58:15 [ main:3 ] - [ DEBUG ] 结束 2021-11-04 14:58:15 [ t1:5 ] - [ DEBUG ] 结束
要使得r输出为10,可以利用join:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class Test7 { static int r = 0 ; public static void main (String[] args) throws InterruptedException { test1(); } private static void test1 () throws InterruptedException { log.debug("开始" ); Thread t1 = new Thread(() -> { log.debug("开始" ); try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } r = 10 ; log.debug("结束" ); }, "t1" ); t1.start(); t1.join(); log.debug("结果为: {}" , r); log.debug("结束" ); } }
执行结果:
1 2 3 4 5 2021-11-04 15:03:59 [ main:0 ] - [ DEBUG ] 开始 2021-11-04 15:03:59 [ t1:2 ] - [ DEBUG ] 开始 2021-11-04 15:04:09 [ t1:10003 ] - [ DEBUG ] 结束 2021-11-04 15:04:09 [ main:10004 ] - [ DEBUG ] 结果为: 10 2021-11-04 15:04:09 [ main:10004 ] - [ DEBUG ] 结束
同步和异步
以调用方 的角度来讲,如果
需要等待结果返回,才能继续运行,则为同步
不需要等待结果返回就能继续运行,则为异步
带参数的join(long n)
2.3.4 interrupt() 用于打断阻塞 (sleep wait join…)的线程。 处于阻塞状态的线程,CPU不会给其分配时间片。
如果一个线程在在运行中被打断,打断标记会被置为true。
如果是打断因sleep wait join方法而被阻塞的线程,会将打断标记置为false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class Test9 { public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { log.debug("sleep..." ); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("interrupt..." ); t1.interrupt(); } }
1 2 3 4 5 6 7 8 9 10 11 2021-11-06 10:28:50 [ t1:0 ] - [ DEBUG ] sleep... 2021-11-06 10:28:51 [ main:1000 ] - [ DEBUG ] interrupt... 2021-11-06 10:28:51 [ main:1001 ] - [ DEBUG ] 打断标记为: false java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:337) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.hongyi.c1.Test9.lambda$main$0(Test9.java:13) at java.base/java.lang.Thread.run(Thread.java:832) Process finished with exit code 0
正常运行的线程在被打断后,不会停止 ,会继续执行。如果要让线程在被打断后停下来,需要使用打断标记来判断 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class Test10 { public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (true ) { boolean interrupted = Thread.currentThread().isInterrupted(); if (interrupted){ log.debug("被打断了,退出循环" ); break ; } } }, "t1" ); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("interrupt..." ); t1.interrupt(); } }
1 2 2021-11-06 10:34:06 [ main:0 ] - [ DEBUG ] interrupt... 2021-11-06 10:34:06 [ t1:0 ] - [ DEBUG ] 被打断了,退出循环
两阶段终止模式
当我们在执行线程一时,想要终止线程二,这是就需要使用interrupt方法来优雅 的停止线程二。“优雅”是指给t2一个料理后事的机会或时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Slf4j public class Test11 { public static void main (String[] args) throws InterruptedException { Monitor monitor = new Monitor(); monitor.start(); TimeUnit.SECONDS.sleep(4 ); monitor.stop(); } } @Slf4j class Monitor { private Thread monitor; public void start () { monitor = new Thread() { @Override public void run () { while (true ) { if (Thread.currentThread().isInterrupted()) { log.debug("处理后续任务" ); break ; } log.debug("监控器运行中..." ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } }; monitor.start(); } public void stop () { monitor.interrupt(); } }
执行结果:
1 2 3 4 5 6 7 8 9 10 2021-11-06 10:48:45 [ Thread-0:0 ] - [ DEBUG ] 监控器运行中... 2021-11-06 10:48:46 [ Thread-0:1000 ] - [ DEBUG ] 监控器运行中... 2021-11-06 10:48:47 [ Thread-0:2001 ] - [ DEBUG ] 监控器运行中... 2021-11-06 10:48:48 [ Thread-0:3002 ] - [ DEBUG ] 监控器运行中... java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:337) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.hongyi.c1.Monitor$1.run(Test11.java:44) 2021-11-06 10:48:49 [ Thread-0:4001 ] - [ DEBUG ] 处理后续任务
2.3.5 过时方法
stop方法 停止线程运行(可能造成共享资源无法被释放,其他线程无法使用这些共享资源)
suspend(暂停线程)/resume(恢复线程)方法
2.3.6 守护线程 当JAVA进程中有多个线程在执行时,只有当所有非守护线程都执行完毕后,JAVA进程才会结束。但当非守护线程全部执行完毕后,守护线程无论是否执行完毕,也会一同结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class Test8 { public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (true ) { if (Thread.currentThread().isInterrupted()) { break ; } } log.debug("结束" ); }, "t1" ); t1.setDaemon(true ); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("结束" ); } }
1 2 3 2021-11-04 15:36:40 [ main:0 ] - [ DEBUG ] 结束 Process finished with exit code 0
注意到t1线程是一个死循环,但它是一个守护线程,当非守护线程main线程结束后,守护线程t1也会被强制结束。
守护线程的应用
垃圾回收器线程就是一种守护线程
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等待它们处理完当前请求
2.4 线程的状态 2.4.1 五种状态 这是从 操作系统 层面来描述的
【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联(例如线程调用了start方法)
【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
【运行状态】指获取了 CPU 时间片运行中的状态
当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
【阻塞状态】
如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入 【阻塞状态】
等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
2.4.2 六种状态 这是从 Java API 层面来描述的,根据 Thread.State 枚举,分为六种状态:
NEW 线程刚被创建,但是还没有调用 start() 方法
RUNNABLE 当调用了 start() 方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了操作系统层面的 【可运行状态】、【运行状态】和【阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为 是可运行)
BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面 对【阻塞状态】的细分
sleep为TIMED_WAITING
join,wait为WAITING状态,join底层调用了wait[[Java并发编程学习笔记#3.7.3 join原理]]
当上锁被阻塞时为BLOCK
TERMINATED 当线程代码运行结束
3 管程Monitor 3.1 共享带来的问题
临界区
一个程序运行多个线程本身是没有问题的
问题出在多个线程访问共享资源
多个线程读共享资源其实也没有问题
在多个线程对共享资源读写操作时发生指令交错,就会出现问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区 ,例如,下面代码中的临界区
1 2 3 4 5 6 7 8 9 10 11 12 13 static int counter = 0 ; static void increment () { counter++; } static void decrement () { counter--; }
竞争条件
多个线程在临界区 内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
3.2 synchronized 解决方案 3.2.1 解决手段 为了避免临界区的竞态条件发生,有多种手段可以达到目的。
阻塞式的解决方案:synchronized,Lock(悲观锁)
非阻塞式的解决方案:原子变量(乐观锁)
本次课使用阻塞式的解决方案:synchronized ,来解决上述问题,即俗称的【对象锁】 ,它采用互斥的方式让同一时刻至多只有一个线程 能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住(blocked)。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
3.2.2 synchronized语法
示例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Slf4j public class Test1 { static int counter = 0 ; static Object lock = new Object(); public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) { synchronized (lock){ counter++; } } }, "t1" ); Thread t2 = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) { synchronized (lock){ counter--; } } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("{}" , counter); } }
1 2021-11-07 15:22:30 [ main:0 ] - [ DEBUG ] 0
原理
synchronized实际上利用对象保证了临界区代码的原子性,临界区内的代码在外界看来是不可分割的,不会被线程切换所打断。
锁对象面向对象改进
用对象作为锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Slf4j public class Test1 { public static void main (String[] args) throws InterruptedException { Room room = new Room(); Thread t1 = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) { room.increment(); } }, "t1" ); Thread t2 = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) { room.decrement(); } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("{}" , room.getCounter()); } } class Room { private int counter = 0 ; public void increment () { synchronized (this ){ counter++; } } public void decrement () { synchronized (this ){ counter--; } } public int getCounter () { synchronized (this ){ return counter; } } }
3.2.3 synchronized加在方法上
1 2 3 4 5 6 7 8 9 10 11 12 public class Demo { public synchronized void test () { } public void test () { synchronized (this ) { } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Demo { public synchronized static void test () { } public void test () { synchronized (Demo.class) { } } }
3.2.4 “线程八锁”问题
习题1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class Test2 { public static void main (String[] args) { Number n1 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); } } @Slf4j class Number { public synchronized void a () { log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
结果为1 2或2 1
1 2 3 4 2021-11-07 15:45:24 [ Thread-0:0 ] - [ DEBUG ] begin 2021-11-07 15:45:24 [ Thread-1:0 ] - [ DEBUG ] begin 2021-11-07 15:45:24 [ Thread-0:0 ] - [ DEBUG ] 1 2021-11-07 15:45:24 [ Thread-1:0 ] - [ DEBUG ] 2
习题2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class Test2 { public static void main (String[] args) { Number n1 = new Number(); new Thread(() -> { log.debug("begin" ); try { n1.a(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); } } @Slf4j class Number { public synchronized void a () throws InterruptedException { TimeUnit.SECONDS.sleep(1 ); log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
结果同习题1
习题3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Slf4j class Number1 { public synchronized void a () throws InterruptedException { TimeUnit.SECONDS.sleep(1 ); log.debug("1" ); } public synchronized void b () { log.debug("2" ); } public void c () { log.debug("3" ); } } @Slf4j public class Test3 { public static void main (String[] args) { Number1 n1 = new Number1(); new Thread(() -> { log.debug("begin" ); try { n1.a(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); new Thread(() -> { log.debug("begin" ); n1.c(); }).start(); } }
1 2 3 4 5 6 7 8 9 10 11 2021-11-07 15:51:22 [ Thread-0:0 ] - [ DEBUG ] begin 2021-11-07 15:51:22 [ Thread-2:0 ] - [ DEBUG ] begin 2021-11-07 15:51:22 [ Thread-2:1 ] - [ DEBUG ] 3 2021-11-07 15:51:22 [ Thread-1:0 ] - [ DEBUG ] begin 2021-11-07 15:51:23 [ Thread-0:1003 ] - [ DEBUG ] 1 2021-11-07 15:51:23 [ Thread-1:1003 ] - [ DEBUG ] 2 结果可能为: 3 1s 1 2 2 3 1s 1 3 2 1s 1
习题4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j class Number1 { public synchronized void a () throws InterruptedException { TimeUnit.SECONDS.sleep(1 ); log.debug("1" ); } public synchronized void b () { log.debug("2" ); } } @Slf4j public class Test3 { public static void main (String[] args) { Number1 n1 = new Number1(); Number1 n2 = new Number1(); new Thread(() -> { log.debug("begin" ); try { n1.a(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { log.debug("begin" ); n2.b(); }).start(); } }
1 2 3 4 5 2021-11-07 15:54:36 [ Thread-0:0 ] - [ DEBUG ] begin 2021-11-07 15:54:36 [ Thread-1:0 ] - [ DEBUG ] begin 2021-11-07 15:54:36 [ Thread-1:0 ] - [ DEBUG ] 2 2021-11-07 15:54:37 [ Thread-0:1001 ] - [ DEBUG ] 1 唯一结果:2 1s 1
3.3 变量的线程安全分析 3.3.1 成员变量和静态变量是否线程安全?
如果它们没有共享,则线程安全
如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
如果只有读操作,则线程安全
如果有读写操作,则这段代码是临界区,需要考虑线程安全
3.3.2 局部变量是否线程安全?
局部变量是线程安全的
但局部变量引用的对象则未必 (要看该对象是否被共享且被执行了读写操作):
如果该对象没有逃离方法的作用范围,它是线程安全的
如果该对象逃离方法的作用范围,需要考虑线程安全
线程安全的情况
局部变量【局部变量被初始化为基本数据类型】是安全的,示例如下:
1 2 3 4 public static void test1 () { int i = 10 ; i++; }
每个线程调用 test1() 方法时局部变量 i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。
线程不安全的情况
如果局部变量引用的对象逃离方法的范围,那么要考虑线程安全的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Test { public static void main (String[] args) { UnsafeTest unsafeTest = new UnsafeTest(); for (int i = 0 ; i < 100 ; i++){ new Thread(()->{ unsafeTest.method1(); },"线程" + i).start(); } } } class UnsafeTest { ArrayList<String> arrayList = new ArrayList<>(); public void method1 () { for (int i = 0 ; i < 100 ; i++) { method2(); method3(); } } private void method2 () { arrayList.add("1" ); } private void method3 () { arrayList.remove(0 ); } }
可以将arrayList修改成局部变量,那么就不会有上述问题了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class safeTest { public void method1 () { ArrayList<String> arrayList = new ArrayList<>(); for (int i = 0 ; i < 100 ; i++) { method2(arrayList); method3(arrayList); } } private void method2 (ArrayList arrayList) { arrayList.add("1" ); } private void method3 (ArrayList arrayList) { arrayList.remove(0 ); } }
3.3.3 常见线程安全类
String:不可变类,因此线程安全
Integer:同上
StringBuffer
Random
Vector (List的线程安全实现类)
Hashtable (Hash的线程安全实现类)
java.util.concurrent 包下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时 ,是线程安全的
它们的每个方法是原子的(都被加上了synchronized)
但注意它们多个方法的组合不是原子的 ,所以可能会出现线程安全问题
1 2 3 4 5 Hashtable table = new Hashtable(); if (table.get("key" ) == null ) { table.put("key" , value); }
1 2 3 4 5 6 7 8 sequenceDiagram participant t1 as 线程1 participant t2 as 线程2 participant table t1 ->> table : get("key") == null t2 ->> table : get("key") == null t2 ->> table : put("key", v2) t1 ->> table : put("key", v1)
不可变类线程安全性
String、Integer 等都是不可变类 ,因为其内部的状态不可以改变,因此它们的方法都是线程安全的。
有同学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,那么这些方法又是如何保证线程安全的呢?
这是因为这些方法的返回值都创建了一个新的对象 ,而不是直接改变String、Integer对象本身。
3.4 Monitor 3.4.1 Java头(对象头) 在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段用于增强对象功能,这些标记字段组成了对象头(Object Header) 。
在HotSpot虚拟机中,对象在内存中存储的布局可以分为3块区域:
对象头(Header):包含Mark Word
和Klass Word
两部分
实例数据(Instance Data)
对齐填充(Padding)
以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象;
数组对象:
其中Mark Word
的结构为:
解释:
标志位lock
占用2位,图中的一行为一种状态。每种状态的每个字段不同。
biased_lock
:对象是否启用偏向锁标记,只占1个二进制位。为1时表示对象启用偏向锁,为0时表示对象没有偏向锁。lock和biased_lock共同表示对象处于什么锁状态。
age
:4位的Java对象年龄。在GC中,如果对象在Survivor区复制一次,年龄增加1。当对象达到设定的阈值时,将会晋升到老年代。默认情况下,并行GC的年龄阈值为15,并发GC的年龄阈值为6。由于age只有4位,所以最大值为15。
identity_hashcode
:31位的对象标识hashCode
thread
:持有偏向锁的线程ID。
epoch
:偏向锁的时间戳。
ptr_to_lock_record
:轻量级锁状态下,指向栈中锁记录lock_record
的指针。
ptr_to_heavyweight_monitor
:重量级锁状态下,指向对象监视器Monitor的指针
3.4.2 Monitor概念 Monitor被翻译为监视器 或者说管程 。
结构:
WaitSet:曾持有锁,但因为某些条件不满足而暂时释放锁(调用wait)的线程集合,状态:WAITING
EntryList:尝试获取锁但失败的线程集合,状态:BLOCK
Owner:锁的持有线程,有且仅有一个
每个java对象都可以关联一个Monitor。如果某个线程使用synchronized
给对象上锁(重量级),该对象头的Mark Word
中的ptr_to_heavyweight_monitor
就被设置为指向Monitor对象的指针。
当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner
当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList 中变成BLOCKED状态 (Java中三种阻塞状态中的一种)
Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的
图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入WAITING状态的线程,后面讲wait-notify时会分析
注意:
synchronized 必须是进入同一个对象 的 monitor 才有上述的效果,不加 synchronized 的对象不会关联监视器,不遵从以上规则。
对象在线程使用了synchronized后与Monitor绑定时,会将对象头中的Mark Word 置为Monitor指针。
3.4.3 从字节码角度分析Synchronized 如下所示的代码:
1 2 3 4 5 6 7 static final Object lock=new Object();static int counter = 0 ;public static void main (String[] args) { synchronized (lock) { counter++; } }
反编译得到的部分字节码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 0 getstatic 3 dup 4 astore_1 5 monitorenter 6 getstatic 9 iconst_1 10 iadd 11 putstatic 14 aload_1 15 monitorexit 16 goto 24 (+8) 19 astore_2 20 aload_1 21 monitorexit 22 aload_2 23 athrow 24 return
注意:方法级别的 synchronized 不会在字节码指令中有所体现
3.5 Synchronized原理进阶 3.5.1 轻量级锁(优化重量级锁)
轻量级锁使用场景: 当一个对象被多个线程所访问,但访问的时间是错开的(不存在竞争) ,此时就可以使用轻量级锁 来优化。此时被上锁的对象不会和Monitor关联,而是直接和线程关联。
轻量级锁对使用者是透明的,即语法仍然是synchronized
,假设有两个方法同步块,利用同一个对象加锁:
1 2 3 4 5 6 7 8 9 10 11 12 static final Object obj = new Object();public static void method1 () { synchronized ( obj ) { method2(); } } public static void method2 () { synchronized ( obj ) { } }
创建锁记录 (Lock Record)对象,每个线程的栈帧都会包含一个锁记录对象,内部可以存储锁定对象的mark word (不再一开始就使用Monitor)
让锁记录中的Object reference指向锁对象(Object),并尝试用cas(compare and sweep)去替换Object中的mark word,将此mark word放入lock record中保存
如果cas替换成功,则将Object的对象头替换为锁记录的地址 和状态 00(轻量级锁状态) ,并由该线程给对象加锁
当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象
成功则解锁成功
失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
3.5.2 锁膨胀(轻量级锁->重量级锁)
如果在尝试加轻量级锁的过程中,cas操作无法成功,有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程,会给对象加上重量级锁(使用Monitor)
将对象头的Mark Word改为Monitor的地址,并且状态改为10(重量级锁)
并且 Thread-1 放入EntryList中,并进入阻塞状态(blocked)
当Thread-0 退出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败(因为Mark Word已经被Thread-1修改为重量级锁),那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程
3.5.3 自旋优化 重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁。
自旋成功的情况
自旋失败的情况
自旋会占用 CPU 时间 ,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。
Java 7 之后不能控制是否开启自旋功能
3.5.4 偏向锁(优化轻量级锁)
轻量级锁在没有竞争时,每次重入 (该线程执行的方法中再次锁住该对象)操作仍需要cas替换操作,这样是会使性能降低的。
所以Java 6引入了偏向锁 对性能进行优化:在第一次 cas时会将线程的ID 写入对象的Mark Word中。此后发现这个线程ID就是自己的,就表示没有竞争,就不需要再次cas,以后只要不发生竞争,这个对象就归该线程所有。
偏向状态
偏向状态
Normal:一般状态,没有加任何锁,前面62位保存的是对象的信息,最后2位为状态(01),倒数第三位表示是否使用偏向锁(未使用:0)
Biased:偏向状态,使用偏向锁,前面54位保存的当前线程的ID,最后2位为状态(01),倒数第三位表示是否使用偏向锁(使用:1)
Lightweight:使用轻量级锁,前62位保存的是锁记录的指针,最后两位为状态(00)
Heavyweight:使用重量级锁,前62位保存的是Monitor的地址指针,后两位为状态(10)
一个对象的创建过程
如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这时它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值.
偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-XX:BiasedLockingStartupDelay=0
来禁用延迟
注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
撤销偏向
以下几种情况会使对象的偏向锁失效
调用对象的hashCode方法
多个线程使用该对象
调用了wait/notify方法 (调用wait方法会导致锁膨胀而使用重量级锁 )
批量重偏向
如果对象虽然被多个线程访问,但是线程间不存在竞争,这时偏向T1的对象仍有机会重新偏向T2
当撤销超过20次后(超过阈值),JVM会觉得是不是偏向错了,这时会在给对象加锁时,重新偏向至加锁线程。
批量撤销
当撤销偏向锁的阈值超过40以后,就会将整个类的对象都改为不可偏向的 。
3.6 Wait & Notify 该函数由锁对象调用,只有当对象被锁以后,线程才能调用wait和notify方法 。线程调用后,会释放对象锁,并进入Monitor
的WAITING
集,处于WAITING
状态,直到另一个线程调用notify
被唤醒为止。
3.6.1 原理
锁对象调用wait方法(obj.wait)
,就会使当前线程进入WaitSet中,变为WAITING
状态。
处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
BLOCKED状态的线程是在竞争对象时,发现Monitor的Owner已经是别的线程了,此时就会进入EntryList中,并处于BLOCKED状态
WAITING状态的线程是获得了对象的锁 ,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态
BLOCKED状态的线程会在锁被释放的时候被唤醒,但是处于WAITING状态的线程只有被锁对象调用了notify
方法(obj.notify/obj.notifyAll)
,才会被唤醒。
3.6.2 常用API
obj.wait()
:让进入 object 监视器的线程到 waitSet 等待
obj.notify()
:在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll()
:全部唤醒
注:只有当对象被锁以后,才能调用wait和notify方法,此外,调用notify方法后不会立即释放锁 。notify并不释放锁,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在调用notify方法的线程的手里还没释放。
示例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Slf4j public class TestWaitNotify { final static Object obj = new Object(); public static void main (String[] args) throws InterruptedException { new Thread(() -> { synchronized (obj){ log.debug("执行" ); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其他代码" ); } }, "t1" ).start(); new Thread(() -> { synchronized (obj){ log.debug("执行" ); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其他代码" ); } }, "t2" ).start(); TimeUnit.SECONDS.sleep(2 ); log.debug("唤醒其他线程..." ); synchronized (obj){ obj.notify(); } } }
notify执行结果:
1 2 3 4 2021-11-09 15:12:19 [ t1:0 ] - [ DEBUG ] 执行 2021-11-09 15:12:19 [ t2:0 ] - [ DEBUG ] 执行 2021-11-09 15:12:21 [ main:2002 ] - [ DEBUG ] 唤醒其他线程... 2021-11-09 15:12:21 [ t1:2002 ] - [ DEBUG ] 其他代码
notifyAll执行结果:
1 2 3 4 5 2021-11-09 15:14:58 [ t1:0 ] - [ DEBUG ] 执行 2021-11-09 15:14:58 [ t2:1 ] - [ DEBUG ] 执行 2021-11-09 15:15:00 [ main:2001 ] - [ DEBUG ] 唤醒其他线程... 2021-11-09 15:15:00 [ t1:2001 ] - [ DEBUG ] 其他代码 2021-11-09 15:15:00 [ t2:2001 ] - [ DEBUG ] 其他代码
3.6.3 sleep(long n)
和wait(long n)
的区别 不同点
Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁
Sleep不需要与synchronized一起使用,而Wait需要与synchronized一起使用(对象被锁以后才能使用)
相同点
3.6.4 优雅地使用wait/notify 什么时候适合使用wait
当线程不满足某些条件 ,需要暂停运行时,可以使用wait。这样会将对象的锁释放 ,让其他线程能够继续运行。如果此时使用sleep,会导致所有线程都进入阻塞,导致所有线程都没法运行,直到当前线程sleep结束后,运行完毕,才能得到执行。
使用wait/notify需要注意什么
当有多个 线程在运行时,对象调用了wait方法,此时这些线程都会进入WaitSet中等待。如果这时使用了notify 方法,可能会造成虚假唤醒 (唤醒的不是满足条件的等待线程),这时就需要使用notifyAll 方法,唤醒所有的线程,对于满足条件的线程,可以跳出循环执行后面的代码,不满足条件的继续阻塞。
1 2 3 4 5 6 7 8 9 10 11 synchronized (LOCK) { while ( LOCK.wait(); } } synchronized (LOCK) { LOCK.notifyAll(); }
3.7 同步模式之保护性暂停 3.7.1 定义 即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见3.8节
生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式
因为要等待另一方的结果,因此归类到同步模式
3.7.2 实现 工具代码
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Downloader { public static List<String> download () throws IOException { HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/" ).openConnection(); List<String> lines = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))){ String line; while ((line = reader.readLine()) != null ) { lines.add(line); } } return lines; } }
正片:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Slf4j public class Test4 { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { log.debug("等待结果" ); List<String> list = (List<String>) guardedObject.get(); log.debug("结果是: {}" ,list.size()); }, "t1" ).start(); new Thread(() -> { log.debug("执行下载" ); try { List<String> list = Downloader.download(); guardedObject.complete(list); } catch (IOException e) { e.printStackTrace(); } }, "t2" ).start(); } } class GuardedObject { private Object response; public Object get () { synchronized (this ){ while (response == null ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete (Object response) { synchronized (this ){ this .response = response; this .notifyAll(); } } }
执行结果:
1 2 3 2021-11-09 15:48:10 [ t1:0 ] - [ DEBUG ] 等待结果 2021-11-09 15:48:10 [ t2:0 ] - [ DEBUG ] 执行下载 2021-11-09 15:48:10 [ t1:588 ] - [ DEBUG ] 结果是: 3
带超时判断的暂停wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public Object get (long timeout) { synchronized (this ){ long begin = System.currentTimeMillis(); long passedTime = 0 ; while (response == null ) { long waitTime = timeout - passedTime; if (waitTime < 0 ) { break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } }
3.7.3 join原理 join底层调用了wait方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final synchronized void join (long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0 ; if (millis < 0 ) { throw new IllegalArgumentException("timeout value is negative" ); } if (millis == 0 ) { while (isAlive()) { wait(0 ); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0 ) { break ; } wait(delay); now = System.currentTimeMillis() - base; } } }
3.7.4 扩展实现——多任务版GuardedObject 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc框架 的调用中就使用到了这种模式。
3.8 异步模式之生产者/消费者 3.8.1 定义
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
3.8.2 实现和测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 @Slf4j public class Test5 { public static void main (String[] args) { MessageQueue queue = new MessageQueue(2 ); for (int i = 0 ; i < 3 ; i++) { int id = i; new Thread(() ->{ queue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); } new Thread(() -> { while (true ) { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } queue.take(); } }, "消费者" ).start(); } } @Slf4j class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); private int capacity; public MessageQueue (int capacity) { this .capacity = capacity; } public Message take () { synchronized (list){ while (list.isEmpty()) { try { log.debug("队列为空,消费者等待" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.debug("消费一个消息 {}" , message); list.notifyAll(); return message; } } public void put (Message message) { synchronized (list){ while (list.size() == capacity) { try { log.debug("队列为满,生产者等待" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); log.debug("生产一个消息 {}" , message); list.notifyAll(); } } } final class Message { private int id; private Object value; public Message (int id, Object value) { this .id = id; this .value = value; } public int getId () { return id; } public Object getValue () { return value; } @Override public String toString () { return "Message{" + "id=" + id + ", value=" + value + '}' ; } }
执行结果
1 2 3 4 5 6 7 8 2021-11-10 10:08:58 [ 生产者1:0 ] - [ DEBUG ] 生产一个消息 Message{id=1, value=值1} 2021-11-10 10:08:58 [ 生产者0:1 ] - [ DEBUG ] 生产一个消息 Message{id=0, value=值0} 2021-11-10 10:08:58 [ 生产者2:1 ] - [ DEBUG ] 队列为满,生产者等待 2021-11-10 10:08:59 [ 消费者:994 ] - [ DEBUG ] 消费一个消息 Message{id=1, value=值1} 2021-11-10 10:08:59 [ 生产者2:994 ] - [ DEBUG ] 生产一个消息 Message{id=2, value=值2} 2021-11-10 10:09:00 [ 消费者:1995 ] - [ DEBUG ] 消费一个消息 Message{id=0, value=值0} 2021-11-10 10:09:01 [ 消费者:2996 ] - [ DEBUG ] 消费一个消息 Message{id=2, value=值2} 2021-11-10 10:09:02 [ 消费者:3997 ] - [ DEBUG ] 队列为空,消费者等待
3.9 Park和Unpark 3.9.1 基本使用 park/unpark都是LockSupport类中的静态方法
1 2 3 4 5 LockSupport.park; LockSupport.unpark(thread);
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class TestPark { public static void main (String[] args) throws InterruptedException { Thread thread = new Thread(()-> { log.debug("park" ); LockSupport.park(); log.debug("resume" ); }, "t1" ); thread.start(); Thread.sleep(1000 ); log.debug("unpark" ); LockSupport.unpark(thread); } }
1 2 3 2021-11-10 10:14:40 [ t1:0 ] - [ DEBUG ] park 2021-11-10 10:14:41 [ main:1000 ] - [ DEBUG ] unpark 2021-11-10 10:14:41 [ t1:1000 ] - [ DEBUG ] resume
3.9.2 特点 与wait/notify的区别
wait,notify 和 notifyAll 必须配合Object Monitor 一起使用,而park,unpark不必
park ,unpark 是以线程为单位 来阻塞 和唤醒 线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么精确
park & unpark 可以先 unpark ,而 wait & notify 不能先 notify
park不会释放锁 ,而wait会释放锁
3.9.3 原理 每个线程都有一个自己的Park对象 ,并且该对象_counter, _cond,__mutex 组成
先调用park再调用unpark时
先调用park
线程运行时,会将Park对象中的_counter的值设为0 ;
调用park时,会先查看counter的值是否为0,如果为0,则将线程放入阻塞队列cond中
放入阻塞队列中后,会再次 将counter设置为0
然后调用unpark
调用unpark方法后,会将counter的值设置为1
去唤醒阻塞队列cond中的线程
线程继续运行并将counter的值设为0
先调用unpark,再调用park
调用unpark
调用park方法
查看counter是否为0
因为unpark已经把counter设置为1,所以此时将counter设置为0,但不放入 阻塞队列cond中
3.10 线程中的状态转换
情况一:NEW –> RUNNABLE
当调用了t.start()方法时,由 NEW –> RUNNABLE
情况二: RUNNABLE <–> WAITING(条件等待)
当调用了t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING
其他线程调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 WAITING –> RUNNABLE
竞争锁失败,t 线程从 WAITING –> BLOCKED(阻塞)
情况三:RUNNABLE <–> WAITING
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING
注意是当前线程 在t 线程对象的监视器上等待(join底层调用了wait)
t 线程运行结束 ,或调用了当前线程 的 interrupt() 时,当前线程从 WAITING –> RUNNABLE
情况四: RUNNABLE <–> WAITING
当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE
情况五: RUNNABLE <–> TIMED_WAITING
t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait(long n ) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING
t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
情况六:RUNNABLE <–> TIMED_WAITING
当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE
情况七:RUNNABLE <–> TIMED_WAITING
当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE
情况八:RUNNABLE <–> TIMED_WAITING
当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE
情况九:RUNNABLE <–> BLOCKED
t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败 ,从 RUNNABLE –> BLOCKED
持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败 的线程仍然 BLOCKED
情况十: RUNNABLE <–> TERMINATED
当前线程所有代码运行完毕 ,进入 TERMINATED
3.11 多把锁 将锁的粒度细分
1 2 3 4 5 class BigRoom { private final Object studyRoom = new Object(); private final Object bedRoom = new Object(); }
例如,对于一个大房间,有两个人要使用,一个用来学习,一个用来睡觉。可以将锁的粒度细分,使得一个大房间可同时对两个不同使用方法的人有效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Test1 { public static void main (String[] args) { BigRoom bigRoom = new BigRoom(); new Thread(() -> { bigRoom.study(); }, "t1" ).start(); new Thread(() -> { bigRoom.sleep(); }, "t2" ).start(); } } @Slf4j class BigRoom { private final Object studyRoom = new Object(); private final Object bedRoom = new Object(); public void sleep () { synchronized (bedRoom){ log.debug("sleep 2h" ); try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } public void study () { synchronized (studyRoom){ log.debug("study 2h" ); try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
1 2 2021-11-11 14:44:38 [ t1:0 ] - [ DEBUG ] study 2h 2021-11-11 14:44:38 [ t2:0 ] - [ DEBUG ] sleep 2h
3.12 活跃性
定义
因为某种原因,使得代码一直无法执行完毕,这样的现象叫做活跃性。有死锁 、活锁 和饥饿 三种情况。活跃性相关的一系列问题都可以用ReentrantLock
进行解决。
3.12.1 死锁 有这样的情况:一个线程需要同时获取多把锁 ,这时就容易发生死锁
如:t1线程获得A对象 锁,接下来想获取B对象的锁,t2线程获得B对象锁,接下来想获取A对象的锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void main (String[] args) { final Object A = new Object(); final Object B = new Object(); new Thread(()->{ synchronized (A) { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (B) { } } }).start(); new Thread(()->{ synchronized (B) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (A) { } } }).start(); }
发生死锁的必要条件 :四个
互斥条件
请求和保持条件
进程已经拥有了至少一种资源,同时又去申请其他资源。因为其他资源被别的进程所使用,该进程进入阻塞状态,并且不释放自己已有的资源
不可抢占条件
进程对已获得的资源在未使用完成前不能被强占,只能在进程使用完后自己释放
循环等待条件
避免死锁
在线程使用锁对象时,顺序加锁 即可避免死锁
3.12.2 活锁 活锁出现在两个线程互相改变对方的结束条件后 ,谁也无法结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class Test3 { static volatile int count = 10 ; static final Object lock = new Object(); public static void main (String[] args) { new Thread(() -> { while (count > 0 ) { try { Thread.sleep(200 ); } catch (InterruptedException e) { e.printStackTrace(); } count--; log.debug("count: {}" , count); } }, "t1" ).start(); new Thread(() -> { while (count < 20 ) { try { Thread.sleep(200 ); } catch (InterruptedException e) { e.printStackTrace(); } count++; log.debug("count: {}" , count); } }, "t2" ).start(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 2021-11-11 14:59:11 [ t1:0 ] - [ DEBUG ] count: 9 2021-11-11 14:59:11 [ t2:0 ] - [ DEBUG ] count: 10 2021-11-11 14:59:11 [ t1:201 ] - [ DEBUG ] count: 11 2021-11-11 14:59:11 [ t2:201 ] - [ DEBUG ] count: 11 2021-11-11 14:59:11 [ t1:401 ] - [ DEBUG ] count: 11 2021-11-11 14:59:11 [ t2:401 ] - [ DEBUG ] count: 12 2021-11-11 14:59:11 [ t1:602 ] - [ DEBUG ] count: 11 2021-11-11 14:59:11 [ t2:602 ] - [ DEBUG ] count: 11 2021-11-11 14:59:11 [ t2:802 ] - [ DEBUG ] count: 12 2021-11-11 14:59:11 [ t1:804 ] - [ DEBUG ] count: 11 2021-11-11 14:59:12 [ t2:1004 ] - [ DEBUG ] count: 12 2021-11-11 14:59:12 [ t1:1006 ] - [ DEBUG ] count: 11 2021-11-11 14:59:12 [ t2:1204 ] - [ DEBUG ] count: 12 2021-11-11 14:59:12 [ t1:1206 ] - [ DEBUG ] count: 11 ...
避免活锁的方法
在线程执行时,中途给予不同的间隔时间 即可。
死锁与活锁的区别
死锁是因为线程互相持有对象想要的锁,并且都不释放,最后到时线程阻塞 ,停止运行 的现象。
活锁是因为线程间修改了对方的结束条件,而导致代码一直在运行 ,却一直运行不完 的现象。
3.12.3 饥饿 某些线程因为优先级太低,导致一直无法获得资源的现象。在使用顺序加锁时,可能会出现饥饿现象。
1 2 3 4 5 6 7 8 9 10 11 sequenceDiagram participant t1 as 线程1 participant t2 as 线程2 participant a as 对象A participant b as 对象B t1 -->> a :尝试获取锁 Note over t1,a:拥有锁 t2 -->> b :尝试获取锁 Note over t2,b:拥有锁 t1 --x b :尝试获取锁 t2 --x a :尝试获取锁
3.13 ReentrantLock ReentrantLock类是Lock接口的重要实现类。
和synchronized相比具有的的特点 (四点)
可中断lockInterruptibly
:synchronized是不可中断,处于阻塞状态的线程会一直等待锁。
可以设置超时时间tryLock
:synchronized不可设置超时时间
可以设置为公平锁 (先到先得 );Synchronized 是一种非公平锁,因为它不能保证先请求锁的线程先获取锁。它选择要获得锁的线程是随机的,而不考虑等待时间的长短。这就意味着一个线程可能会连续获取到锁,而其他线程需要一直等待,这就造成了线程的不公平竞争。
支持多个条件变量( 具有多个 waitset);Synchronized 只支持一个waiset
3.13.1 基本语法 1 2 3 4 5 6 7 8 9 10 private ReentrantLock lock = new ReentrantLock();lock.lock(); try { }finally { lock.unlock(); }
3.13.2 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
Synchronized 也可以重入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j public class Test4 { private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) { lock.lock(); try { log.debug("enter main" ); m1(); } finally { lock.unlock(); } } public static void m1 () { lock.lock(); try { log.debug("enter m1" ); m2(); } finally { lock.unlock(); } } public static void m2 () { lock.lock(); try { log.debug("enter m2" ); } finally { lock.unlock(); } } }
1 2 3 2021-11-11 15:28:54 [ main:0 ] - [ DEBUG ] enter main 2021-11-11 15:28:54 [ main:0 ] - [ DEBUG ] enter m1 2021-11-11 15:28:54 [ main:0 ] - [ DEBUG ] enter m2
3.13.3 可打断 调用lock.lockInterruptibly()
可以进行加锁:
和synchronized一样,如果加锁失败会阻塞,但不同的是,其他线程可以调用其t.interrupt()
方法让t停止阻塞,直接停止运行。
简而言之 就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j public class Test5 { private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { log.debug("尝试获得锁" ); lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("没有获得锁,返回" ); return ; } try { log.debug("获得到锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("打断 t1" ); t1.interrupt(); } }
1 2 3 4 5 6 7 8 9 2021-11-11 15:37:17 [ t1:0 ] - [ DEBUG ] 尝试获得锁 2021-11-11 15:37:18 [ main:1000 ] - [ DEBUG ] 打断 t1 2021-11-11 15:37:18 [ t1:1000 ] - [ DEBUG ] 没有获得锁 java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:958) at java.base/java.util.concurrent.locks.ReentrantLock$Sync.lockInterruptibly(ReentrantLock.java:161) at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:372) at com.hongyi.c3.Test5.lambda$main$0(Test5.java:18) at java.base/java.lang.Thread.run(Thread.java:832)
3.13.4 锁超时 使用lock.tryLock
方法会返回获取锁是否成功。如果成功则返回true,反之则返回false。
并且tryLock方法可以指定等待时间 ,参数为:tryLock(long timeout, TimeUnit unit)
,其中timeout为最长等待时间,TimeUnit为时间单位
简而言之 就是:获取失败了、获取超时了或者被打断了,不再阻塞,直接停止运行
lock.tryLock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class Test6 { private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) { Thread t1 = new Thread(() -> { log.debug("t1尝试获得锁" ); if (!lock.tryLock()) { log.debug("t1获取不到锁" ); return ; } try { log.debug("t1获得到锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("主线程获得到锁" ); t1.start(); } }
1 2 3 2021-11-11 15:45:57 [ main:0 ] - [ DEBUG ] 主线程获得到锁 2021-11-11 15:45:57 [ t1:1 ] - [ DEBUG ] t1尝试获得锁 2021-11-11 15:45:57 [ t1:1 ] - [ DEBUG ] t1获取不到锁
lock.tryLock(long, TimeUnit)
情况1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Slf4j public class Test6 { private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) { Thread t1 = new Thread(() -> { log.debug("t1尝试获得锁" ); try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { log.debug("t1获取不到锁" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); log.debug("t1获取不到锁" ); return ; } try { log.debug("t1获得到锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("主线程获得到锁" ); t1.start(); } }
1 2 3 4 2021-11-11 15:47:50 [ main:0 ] - [ DEBUG ] 主线程获得到锁 2021-11-11 15:47:50 [ t1:1 ] - [ DEBUG ] t1尝试获得锁 2021-11-11 15:47:51 [ t1:1003 ] - [ DEBUG ] t1获取不到锁 最后一条信息是等待了1s后打印的
情况2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class Test6 { private static ReentrantLock lock = new ReentrantLock(); public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { log.debug("t1尝试获得锁" ); try { if (!lock.tryLock(5 , TimeUnit.SECONDS)) { log.debug("t1获取不到锁" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); log.debug("t1获取不到锁" ); return ; } try { log.debug("t1获得到锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("主线程获得到锁" ); t1.start(); TimeUnit.SECONDS.sleep(2 ); lock.unlock(); log.debug("主线程释放了锁" ); } }
1 2 3 4 2021-11-11 15:50:17 [ main:0 ] - [ DEBUG ] 主线程获得到锁 2021-11-11 15:50:17 [ t1:1 ] - [ DEBUG ] t1尝试获得锁 2021-11-11 15:50:19 [ main:2002 ] - [ DEBUG ] 主线程释放了锁 2021-11-11 15:50:19 [ t1:2002 ] - [ DEBUG ] t1获得到锁
3.13.5 公平锁 在线程获取锁失败,进入阻塞队列时,先进入 的会在锁被释放后先获得 锁。这样的获取方式就是公平 的(先进先出,队列)。
1 2 ReentrantLock lock = new ReentrantLock(true );
3.13.6 条件变量 synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个 条件变量的,这就好比:
synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
await 前需要获得锁
await 执行后,会释放锁 ,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)去重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Slf4j public class Test7 { private static ReentrantLock lock = new ReentrantLock(); private static Boolean judge = false ; public static void main (String[] args) { Condition condition = lock.newCondition(); new Thread(()->{ lock.lock(); try { while (!judge) { log.debug("不满足条件,等待..." ); condition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("执行完毕!" ); lock.unlock(); } }, "t1" ).start(); new Thread(()->{ lock.lock(); try { Thread.sleep(1 ); judge = true ; log.debug("唤醒在condition上等待的线程" ); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t2" ).start(); } }
1 2 3 2021-11-12 09:34:50 [ t1:0 ] - [ DEBUG ] 不满足条件,等待... 2021-11-12 09:34:50 [ t2:3 ] - [ DEBUG ] 唤醒在condition上等待的线程 2021-11-12 09:34:50 [ t1:3 ] - [ DEBUG ] 执行完毕!
3.14 同步模式之顺序控制 3.14.1 固定顺序输出 要求:先打印2,后打印1。
wait/notify版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class Test8 { private static final Object lock = new Object(); private static boolean flag = false ; public static void main (String[] args) { Thread t1 = new Thread(() -> { synchronized (lock){ while (!flag) { try { log.debug("不满足条件,t1阻塞" ); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("1" ); } }, "t1" ); Thread t2 = new Thread(() -> { synchronized (lock){ log.debug("2" ); log.debug("t2唤醒线程" ); lock.notifyAll(); flag = true ; } }, "t2" ); t1.start(); t2.start(); } }
1 2 3 4 2021-11-12 09:44:27 [ t1:0 ] - [ DEBUG ] 不满足条件,t1阻塞 2021-11-12 09:44:27 [ t2:1 ] - [ DEBUG ] 2 2021-11-12 09:44:27 [ t2:1 ] - [ DEBUG ] t2唤醒线程 2021-11-12 09:44:27 [ t1:1 ] - [ DEBUG ] 1
park/unpark版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class Test9 { public static void main (String[] args) { Thread t1 = new Thread(() -> { LockSupport.park(); log.debug("1" ); }, "t1" ); Thread t2 = new Thread(() -> { LockSupport.unpark(t1); log.debug("2" ); }, "t2" ); t1.start(); t2.start(); } }
1 2 2021-11-12 09:49:10 [ t2:0 ] - [ DEBUG ] 2 2021-11-12 09:49:10 [ t1:0 ] - [ DEBUG ] 1
3.14.2 交替输出 要求:t1输出a,t2输出b,t3输出c,要求以abc循环5次的形式输出。
wait/notify版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Slf4j public class Test1 { public static void main (String[] args) { WaitNotify waitNotify = new WaitNotify(1 , 5 ); new Thread(() -> { waitNotify.print("a" , 1 , 2 ); }, "t1" ).start(); new Thread(() -> { waitNotify.print("b" , 2 , 3 ); }, "t2" ).start(); new Thread(() -> { waitNotify.print("c" , 3 , 1 ); }, "t3" ).start(); } } class WaitNotify { private int flag; private int loopNumber; public void print (String str, int waitFlag, int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ){ while (flag != waitFlag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this .notifyAll(); } } } public WaitNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } }
1 2 abcabcabcabcabc Process finished with exit code 0
await/signal版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf4j public class Test2 { public static void main (String[] args) throws InterruptedException { AwaitSignal awaitSignal = new AwaitSignal(5 ); Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); new Thread(() -> { awaitSignal.print("a" , a, b); }).start(); new Thread(() -> { awaitSignal.print("b" , b, c); }).start(); new Thread(() -> { awaitSignal.print("c" , c, a); }).start(); TimeUnit.SECONDS.sleep(1 ); awaitSignal.lock(); try { log.debug("由主线程发起开始命令" ); a.signal(); } finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { private int loopNumber; public void print (String str, Condition current, Condition next) { for (int i = 0 ; i < loopNumber; i++) { lock(); try { current.await(); System.out.print(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } public AwaitSignal (int loopNumber) { this .loopNumber = loopNumber; } }
1 2 2021-11-12 10:16:22 [ main:0 ] - [ DEBUG ] 由主线程发起开始命令 abcabcabcabcabc
park/unpark版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j public class Test3 { static Thread t1; static Thread t2; static Thread t3; public static void main (String[] args) { ParkUnpark parkUnpark = new ParkUnpark(5 ); t1 = new Thread(() -> { parkUnpark.print("a" , t2); }); t2 = new Thread(() -> { parkUnpark.print("b" , t3); }); t3 = new Thread(() -> { parkUnpark.print("b" , t1); }); t1.start(); t2.start(); t3.start(); LockSupport.unpark(t1); } } class ParkUnpark { private int loopNumber; public ParkUnpark (int loopNumber) { this .loopNumber = loopNumber; } public void print (String str, Thread next) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); LockSupport.unpark(next); } } }
1 2 abbabbabbabbabb Process finished with exit code 0
3.15 小结 本章我们需要重点掌握的是:
分析多线程访问共享资源时,哪些代码片段属于临界区
使用 synchronized 互斥解决临界区的线程安全问题
掌握 synchronized 锁对象语法
掌握 synchronzied 加载成员方法和静态方法语法
掌握 wait/notify 同步方法
使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
学会分析变量的线程安全性、掌握常见线程安全类的使用
了解线程活跃性问题:死锁、活锁、饥饿
应用方面
互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
原理方面
monitor、synchronized 、wait/notify 原理
synchronized 进阶原理
park & unpark 原理
模式方面
同步模式之保护性暂停
异步模式之生产者消费者
同步模式之顺序控制
4 共享内存 4.1 Java内存模型(JMM) JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有) 抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM规定了多线程在执行时的一些重要规则和原则,如原子性、可见性和有序性:
原子性 - 保证指令不会受到线程上下文切换的影响;JMM保证了对共享变量的读取和写入可以被视为原子操作,即线程要么完全看到共享变量的修改结果,要么不看到。
可见性 - 保证指令不会受 cpu 缓存的影响;当一个线程对共享变量进行修改后,在刷新到主内存之前,其他线程不一定能立即看到这个修改。为了确保可见性,需要通过volatile
关键字或者使用synchronized或Lock等同步机制来进行同步操作。
有序性 - 保证指令不会受 cpu 指令并行优化的影响;JMM保证了线程内的操作按照程序的顺序执行,但不保证不同线程的操作顺序。为了保证有序性,需要使用volatile关键字、synchronized或Lock等同步机制或者使用显式的内存屏障。
JMM定义了Java 虚拟机(JVM)在计算机内存(RAM)中的工作方式。JVM是整个计算机虚拟模型,所以JMM是隶属于JVM的。
JMM是一种抽象的概念,并不真实存在,它描述的一组规则或者规范。通过这些规则、规范定义了程序中各个变量的访问方式。jvm运行的程序的实体是线程,而每个线程运行时,都会创建一个工作内存(也叫栈空间) ,来保存线程所有的私有变量。而JMM内存模型规范中规定所有的变量都存储在主内存 中,而主内存中的变量是所有的线程都可以共享的,而对主内存中的变量进行操作时,必须在线程的工作内存进行操作,首先将主内存的变量copy到工作内存,进行操作后,再将变量刷回到主内存中。所有线程只有通过主内存来进行通信 。
主内存:所有线程创建的实例对象都存放在主内存中,不管该实例对象是成员变量还是方法中的本地变量(也称局部变量)
本地内存:每个线程都有一个私有的本地内存来存储共享变量的副本,并且,每个线程只能访问自己的本地内存,无法访问其他线程的本地内存。本地内存是 JMM 抽象出来的一个概念,存储了主内存中的共享变量副本。
JVM和JMM的关系
jmm中的主内存、工作内存与jvm中的Java堆、栈、方法区等并不是同一个层次的内存划分,这两者基本上是没有关系 的,如果两者一定要勉强对应起来,那从变量、主内存、工作内存的定义来看,主内存主要对应于Java堆中的对象实例数据部分,而工作内存则对应于虚拟机栈中的部分区域。从更低层次上说,主内存就直接对应于物理硬件的内存,而为了获取更好的运行速度,虚拟机(甚至是硬件系统本身的优化措施)可能会让工作内存优先存储于寄存器和高速缓存中,因为程序运行时主要访问读写的是工作内存。
4.2 可见性 4.2.1 引例 以下的循环并不会退出:
1 2 3 4 5 6 7 8 9 10 11 12 static Boolean run = true ;public static void main (String[] args) throws InterruptedException { new Thread(()->{ while (run) { } }).start(); Thread.sleep(1000 ); System.out.println("改变run的值为false" ); run = false ; }
无法退出的原因
初始状态, t 线程刚开始从主内存 读取了 run 的值到工作内存 。
因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存 中的高速缓存中, 减少对主存中 run 的访问,提高效率
1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值
解决方法
使用volatile 易变关键字:它可以用来修饰成员变量 和静态成员变量 (放在主存中的变量),他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值 ,线程操作 volatile 变量都是直接操作主存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class Test4 { volatile static boolean run = true ; public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (run) { } }, "t1" ); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("停止t1" ); run = false ; } }
1 2 2021-11-12 14:57:10 [ main:0 ] - [ DEBUG ] 停止t1 Process finished with exit code 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j public class Test4 { static boolean run = true ; final static Object lock = new Object(); public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (true ) { synchronized (lock){ if (!run) { break ; } } } }, "t1" ); t1.start(); TimeUnit.SECONDS.sleep(1 ); log.debug("停止t1" ); synchronized (lock){ run = false ; } } }
4.2.2 可见性和原子性 前面例子体现的实际就是可见性 ,它保证的是在多个线程之间,一个线程对volatile变量 的修改对另一个线程可见, 不能 保证原子性,仅用在一个写 线程,多个读 线程的情况。
从字节码的角度分析为:
1 2 3 4 5 6 getstatic run getstatic run getstatic run getstatic run putstatic run getstatic run
注意 synchronized 语句块既可以保证代码块的原子性 ,也同时保证代码块内变量的可见性 。
但缺点是 synchronized 是属于重量级 操作,性能相对更低。
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?
1 2 3 4 5 6 7 public void println (String x) { synchronized (this ) { print(x); newLine(); } }
4.2.3 两阶终止模式优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Slf4j public class Test5 { public static void main (String[] args) throws InterruptedException { Monitor monitor = new Monitor(); monitor.start(); TimeUnit.SECONDS.sleep(4 ); monitor.stop(); } } @Slf4j class Monitor { private Thread monitor; private volatile boolean stop = false ; public void start () { monitor = new Thread() { @Override public void run () { while (true ) { if (stop) { log.debug("处理后续任务" ); break ; } log.debug("监控器运行中..." ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }; monitor.start(); } public void stop () { stop = true ; } }
1 2 3 4 5 2021-11-12 15:47:55 [ Thread-0:0 ] - [ DEBUG ] 监控器运行中... 2021-11-12 15:47:56 [ Thread-0:1003 ] - [ DEBUG ] 监控器运行中... 2021-11-12 15:47:57 [ Thread-0:2004 ] - [ DEBUG ] 监控器运行中... 2021-11-12 15:47:58 [ Thread-0:3004 ] - [ DEBUG ] 监控器运行中... 2021-11-12 15:47:59 [ Thread-0:4006 ] - [ DEBUG ] 处理后续任务
4.2.4 同步模式之犹豫模式 Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同 的事,那么本线程就无需再做 了,直接结束返回 。
用一个标记来判断该任务是否已经被执行过了
需要避免线程安全问题
4.3 有序性 4.3.1 指令重排
JVM 会在不影响正确性 的前提下,可以调整 语句的执行顺序
1 2 3 4 5 static int i;static int j;i = ...; j = ...;
可以看到,至于先执行i还是先执行j,对最终的结果不会产生影响,所以上面的代码在实际执行时会有两个可能的执行顺序。
这种特性称之为『指令重排 』,多线程下『指令重排』会影响正确性 。
指令重排:
计算机在执行程序时候,为了提高代码、指令的执行效率,编译器和处理器会对指令进行重新排序,一般分为编译器对于指令的重新排序、指令并行之间的优化、以及内存指令的优化。
这么多优化都是保证在单线程的情况下 ,执行的结果是不变的,下图就是描述整个的指令重排的优化的过程:
指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。
4.3.2 CPU级别的指令重排序优化
事实上,现代处理器会设计为一个时钟周期完成一条执行时间长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 这5 个阶段。
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序 和组合 来实现指令级并行
指令重排的前提是,重排指令不能影响结果 ,例如
1 2 3 4 5 6 7 8 int a = 10 ; int b = 20 ; System.out.println( a + b ); int a = 10 ;int b = a - 5 ;
4.3.3 支持流水线的处理器 现代 CPU 支持多级指令流水线 ,例如支持同时 执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理器,就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(相当于一 条执行时间长的复杂指令),IPC = 1,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相地提高了指令的吞吐率 。
在多线程环境下,指令重排序可能导致出现意料之外的结果 。
4.3.4 Java层面的指令重排排序优化
示例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int num = 0 ;boolean ready = false ;public void actor1 (I_Result r) { if (ready){ r.r1 = num + num; } else { r.r1 = 1 ; } } public void actor2 () { num = 2 ; ready = true ; }
问r.r1
的结果会有多少种情况?有0(指令重排) ,1,4。
0的情况:15,16行代码可能因为指令重排导致执行顺序颠倒(因为对线程2来说,两个赋值语句交换顺序并不会影响最终结果),而可能出现0的情况。
利用压力测试工具Jcstress测试结果如下:
可以看到测试次数达五千万次中,有1568次的输出结果为0
4.3.5 禁用重排序 volatile 修饰的变量,可以禁用 指令重排
禁止的是加volatile关键字变量之前 的代码被重排序
以上面的程序为例,加上易变关键字后的压力测试结果如下:
4.4 volatile原理 volatile能保证可见性和有序性(禁止JVM内存重排序),不能保证原子性。
4.4.1 内存屏障 内存屏障(Memory Barrier)是一种计算机硬件或软件机制,用于控制计算机处理器和内存之间的数据同步和可见性。它负责确保在多线程或多核处理器系统中,不同线程之间的内存操作具有正确的顺序和一致的结果。
内存屏障有以下几个主要作用:
解释:
可见性
写屏障 (sfence)保证在该屏障之前 的,对共享变量的改动,都同步到主存当中
读屏障 (lfence)保证在该屏障之后 ,对共享变量的读取,加载的是主存中新数据
有序性
写屏障会确保指令重排序时,不会将写屏障之前 的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后 的代码排在读屏障之前
volatile的底层实现原理是内存屏障 ,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后 会加入写屏障
对 volatile 变量的读指令前 会加入读屏障
总结:写前读后
4.4.2 如何保证可见性
写屏障(sfence)保证在该屏障之前 的,对共享变量的改动,都同步到主存当中:
1 2 3 4 5 public void actor2 () { num = 2 ; ready = true ; }
读屏障(lfence)保证在该屏障之后 ,对共享变量的读取,加载的是主存中新数据:
1 2 3 4 5 6 7 8 9 public void actor1 (I_Result r) { if (ready){ r.r1 = num + num; } else { r.r1 = 1 ; } }
1 2 3 4 5 6 7 8 9 10 11 sequenceDiagram participant t1 as t1 线程 participant num as num=0 participant ready as volatile ready=false participant t2 as t2 线程 t1 -->> t1 : num=2 t1 ->> ready : ready=true Note over t1,ready: 写屏障 Note over num,t2:读屏障 t2 ->> ready : 读取ready=true t2 ->> num : 读取num=2
4.4.3 如何保证有序性
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
但是不能解决指令交错问题 (线程间的代码互相交错)
写屏障仅仅是保证之后的读能够读到新的结果,但不能保证读跑到它前面去
而有序性的保证也只是保证了本线程内 相关代码不被重排序
4.4.4 单例模式中双重检查锁问题 考虑一个单例模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final class Singleton { private Singleton () {}; private static Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE == null ) { synchronized (Singleton.class){ if (INSTANCE == null ) { INSTANCE = new Singleton(); } } } return INSTANCE; } }
以上实现的特点是:
懒惰实例化
首次使用getInstance()才使用synchronized加锁,后续使用时无需加锁
有隐含但很关键的一点:第一个if使用了INSTANCE变量,是在同步块之外
问题分析
略,详见[[Java设计模式学习笔记新#4.1.2 实现]]
问题解决
INSTANCE变量加上volatile关键字即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final class Singleton { private Singleton () {}; private static volatile Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE == null ) { synchronized (Singleton.class){ if (INSTANCE == null ) { INSTANCE = new Singleton(); } } } return INSTANCE; } }
在X86处理器下通过工具获取JIT编译器生成的汇编指令来查看对volatile进行写操作时:
1 instance = new Singleton();
对应的汇编代码是
有volatile变量修饰的共享变量进行写操作的时候会多出第二行汇编代码,通过查IA-32架构软件开发者手册可知,Lock前缀 的指令在多核处理器下会引发了两件事
Lock前缀指令会引起处理器缓存回写到内存
Lock前缀指令导致在执行指令期间,声言处理器的LOCK#信号。在多处理器环境中,LOCK#
信号确保在声言该信号期间,处理器可以独占任何共享内存。但是,在最近的处理器里,LOCK#
信号一般不锁总线,而是锁缓存 ,毕竟锁总线开销的比较大。使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据
一个处理器的缓存回写到内存会导致其他处理器的缓存无效
在多核处理器系统中进行操作的时候,IA-32和Intel 64处理器能嗅探其他处理器访问系统内存和它们的内部缓存 。处理器使用嗅探技术保证它的内部缓存、系统内存和其他处理器的缓存的数据在总线上保持一致
5 无锁 无锁的特点参见0.4节
,无锁是乐观锁的一种实现方式。
5.1 无锁解决线程安全问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class TestAccount { public static void main (String[] args) { Account account1 = new AccountUnsafe(10000 ); Account.demo(account1); Account account2 = new AccountCas(10000 ); Account.demo(account2); } } class AccountCas implements Account { private AtomicInteger balance; public AccountCas (int balance) { this .balance = new AtomicInteger(balance); } @Override public Integer getBalance () { return balance.get(); } @Override public void withdraw (Integer account) { while (true ) { int prev = balance.get(); int next = prev - account; if (balance.compareAndSet(prev, next)) { break ; } } } } class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public Integer getBalance () { return this .balance; } @Override public void withdraw (Integer account) { this .balance -= account; } } interface Account { Integer getBalance () ; void withdraw (Integer account) ; static void demo (Account account) { List<Thread> ts = new ArrayList<>(); for (int i = 0 ; i < 1000 ; i++) { ts.add(new Thread(() -> { account.withdraw(10 ); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e){ e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost:" + (end-start)/1000_000 + " ms" ); } }
执行结果:
1 2 3 240 cost:53 ms0 cost:46 ms
5.2 CAS与volatile 5.2.1 工作流程 前面看到的 AtomicInteger
的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
1 2 3 4 5 6 7 8 9 10 11 12 public void withdraw (Integer account) { while (true ) { int prev = balance.get(); int next = prev - account; if (balance.compareAndSet(prev, next)) { break ; } } }
其中的关键是 compareAndSet (比较并设置值),它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作 。
1 XXX.compareAndSet(A, B);
CAS有3个操作数,内存值V,旧的预期值A(之前从内存中取出的值),要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B ,否则什么都不做。
当一个线程要去修改Account对象中的值时,先获取值pre(调用get方法),然后再将其设置为新的值next(调用cas方法)。在调用cas方法时,会将pre与Account中的余额进行比较。
如果两者相等 ,就说明该值还未被其他线程修改,此时便可以进行修改操作。
如果两者不相等 ,就不设置值,重新获取值pre(调用get方法),然后再将其设置为新的值next(调用cas方法),直到修改成功为止。
注意
其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性 。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线 。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
5.2.2 volatile 获取共享变量时,为了保证该变量的可见性 ,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
注意:volatile
仅仅保证了共享变量的可见性,让其它线程能够看到新值,但不能解决指令交错问题(不能保证原子性)
CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果。如原子整数的value值就是volatile修饰的。
5.2.3 效率问题 一般情况下(多核cpu,线程数少于cpu核心数,可以让所有的线程不断自旋重试),使用无锁比使用加锁的效率更高。
5.2.4 CAS特点 结合 CAS 和 volatile 可以实现无锁并发 ,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁 的思想:乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
synchronized 是基于悲观锁 的思想:悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发 ,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞 ,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
5.3 原子整数 J.U.C(Java.Util.Concurrent)
并发包提供了
AtomicBoolean
AtomicInteger
AtomicLong
以AtomicInteger为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 AtomicInteger i = new AtomicInteger(0 ); System.out.println(i.getAndDecrement()); System.out.println(i.getAndAdd(5 )); System.out.println(i.addAndGet(-5 )); System.out.println(i.getAndUpdate(p -> p - 2 )); System.out.println(i.updateAndGet(p -> p + 2 )); final System.out.println(i.getAndAccumulate(10 , (p, x) -> p + x)); System.out.println(i.accumulateAndGet(-10 , (p, x) -> p + x));
5.4 原子引用 原子引用类型共有三类:
AtomicReference
AtomicMarkableReference
AtomicStampedReference
5.4.1 AtomicReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class TestAccount1 { public static void main (String[] args) { DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000" ))); } } class DecimalAccountCas implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountCas (BigDecimal balance) { this .balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance () { return balance.get(); } @Override public void withdraw (BigDecimal account) { while (true ) { BigDecimal prev = balance.get(); BigDecimal next = prev.subtract(account); if (balance.compareAndSet(prev, next)) { break ; } } } } interface DecimalAccount { BigDecimal getBalance () ; void withdraw (BigDecimal account) ; static void demo (DecimalAccount account) { List<Thread> ts = new ArrayList<>(); for (int i = 0 ; i < 1000 ; i++) { ts.add(new Thread(() -> { account.withdraw(BigDecimal.TEN); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e){ e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost:" + (end-start)/1000_000 + " ms" ); } }
5.4.2 ABA问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j public class Test5 { static AtomicReference<String> ref = new AtomicReference<>("A" ); public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String prev = ref.get(); other(); TimeUnit.SECONDS.sleep(1 ); log.debug("change A->C {}" , ref.compareAndSet(prev, "C" )); } private static void other () throws InterruptedException { new Thread(() -> { log.debug("change A->B {}" , ref.compareAndSet(ref.get(), "B" )); }).start(); TimeUnit.SECONDS.sleep(1 ); new Thread(() -> { log.debug("change B->A {}" , ref.compareAndSet(ref.get(), "A" )); }).start(); } }
1 2 3 4 2021-11-15 10:16:37 [ main:0 ] - [ DEBUG ] main start... 2021-11-15 10:16:37 [ Thread-0:3 ] - [ DEBUG ] change A->B true 2021-11-15 10:16:38 [ Thread-1:1007 ] - [ DEBUG ] change B->A true 2021-11-15 10:16:39 [ main:2021 ] - [ DEBUG ] change A->C true
主线程仅能判断出共享变量的值与初值 A 是否相同 ,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望: 只要有其它线程【动过了 】共享变量,那么自己的 cas 就算失败 ,这时,仅比较值是不够的,需要再加一个版本号
5.4.3 AtomicStampedReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class Test5 { static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A" , 0 ); public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String prev = ref.getReference(); int stamp = ref.getStamp(); log.debug("{}" , stamp); other(); TimeUnit.SECONDS.sleep(1 ); log.debug("{}" , ref.getStamp()); log.debug("change A->C {}" , ref.compareAndSet(prev, "C" , stamp, stamp + 1 )); } private static void other () throws InterruptedException { new Thread(() -> { log.debug("change A->B {}" , ref.compareAndSet(ref.getReference(), "B" , ref.getStamp(), ref.getStamp() + 1 )); }).start(); TimeUnit.SECONDS.sleep(1 ); new Thread(() -> { log.debug("change B->A {}" , ref.compareAndSet(ref.getReference(), "A" , ref.getStamp(), ref.getStamp() + 1 )); }).start(); } }
1 2 3 4 5 6 2021-11-15 10:22:18 [ main:0 ] - [ DEBUG ] main start... 2021-11-15 10:22:18 [ main:2 ] - [ DEBUG ] 0 2021-11-15 10:22:18 [ Thread-0:3 ] - [ DEBUG ] change A->B true 2021-11-15 10:22:19 [ Thread-1:1011 ] - [ DEBUG ] change B->A true 2021-11-15 10:22:20 [ main:2022 ] - [ DEBUG ] 2 2021-11-15 10:22:20 [ main:2022 ] - [ DEBUG ] change A->C false
5.4.4 AtomicMarkableReference AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过 ,所以就有了 AtomicMarkableReference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Demo4 { static AtomicMarkableReference<String> str = new AtomicMarkableReference<>("A" , true ); public static void main (String[] args) { new Thread(() -> { String pre = str.getReference(); System.out.println("change" ); try { other(); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("change A->C mark " + str.compareAndSet(pre, "C" , true , false )); }).start(); } static void other () throws InterruptedException { new Thread(() -> { System.out.println("change A->A mark " + str.compareAndSet("A" , "A" , true , false )); }).start(); } }
二者区别
AtomicStampedReference 需要我们传入整型变量 作为版本号,来判定是否被更改过
AtomicMarkableReference 需要我们传入布尔变量 作为标记,来判断是否被更改过
5.5 原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
5.6 原子更新器
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdate
原子更新器用于帮助我们改变某个对象中的某个属性,只能配合volatile修饰的字段使用,否则会出现异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class Test6 { public static void main (String[] args) { Student stu = new Student(); AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name" ); System.out.println(updater.compareAndSet(stu, null , "Mark" )); System.out.println(stu); } } class Student { volatile String name; @Override public String toString () { return "Student{" + "name='" + name + '\'' + '}' ; } }
1 2 true Student{name='Mark'}
5.7 原子累加器 略
5.8 LongAdder原理 略
5.9 Unsafe对象 Unsafe
对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用(private修饰的单例对象),只能通过反射 获得。
5.9.1 unsafe对象的获取 1 2 3 4 5 6 7 8 9 10 public class Test7 { public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe) theUnsafe.get(null ); System.out.println(unsafe); } }
1 sun.misc.Unsafe@2f4d3709
5.9.2 unsafe实现cas相关方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Test7 { public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe) theUnsafe.get(null ); System.out.println(unsafe); Teacher t = new Teacher(); long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id" )); long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name" )); unsafe.compareAndSwapInt(t, idOffset, 0 , 1 ); unsafe.compareAndSwapObject(t, nameOffset, null , "Mark" ); System.out.println(t); } } @Data class Teacher { volatile int id; volatile String name; }
6 不可变 6.1 问题的提出和解决
日期转换的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j public class Test1 { public static void main (String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd" ); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { try { log.debug("{}" , sdf.parse("1951-04-21" )); } catch (Exception e){ log.error("{}" , e); } }).start(); } } }
执行结果:
解决方法
方法一:加锁即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class Test1 { public static void main (String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd" ); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { synchronized (sdf){ try { log.debug("{}" , sdf.parse("1951-04-21" )); } catch (Exception e){ log.error("{}" , e); } } }).start(); } } }
执行结果:
1 2 3 4 5 6 7 8 9 10 2021-11-18 20:40:38 [ Thread-0:0 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-9:0 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-8:0 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-7:0 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-6:0 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-5:2 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-4:2 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-3:2 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-2:2 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951 2021-11-18 20:40:38 [ Thread-1:2 ] - [ DEBUG ] Sat Apr 21 00:00:00 CST 1951
方法二:使用线程安全类DateTimeFormatter
1 2 3 4 5 6 7 8 9 10 11 12 @Slf4j public class Test1 { public static void main (String[] args) { DateTimeFormatter stf = DateTimeFormatter.ofPattern("yyyy-MM-dd" ); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { TemporalAccessor parse = stf.parse("1951-04-21" ); log.debug("{}" , parse); }).start(); } } }
6.2 不可变的定义和设计
定义
如果一个对象在不能够修 改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改。
6.2.1 String类中不可变的体现 String类源码:
1 2 3 4 5 6 7 8 9 10 11 public final class String implements java .io .Serializable , Comparable <String >, CharSequence { private final char value[]; private int hash; } }
final的使用
发现该类、类中所有属性都是final的
属性用final修饰保证了该属性是只读的,不能修改
类用final修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
保护性拷贝
但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是如何实现的,就以 substring 为例。
1 2 3 4 5 6 7 8 9 10 11 public String substring (int beginIndex) { if (beginIndex < 0 ) { throw new StringIndexOutOfBoundsException(beginIndex); } int subLen = value.length - beginIndex; if (subLen < 0 ) { throw new StringIndexOutOfBoundsException(subLen); } return (beginIndex == 0 ) ? this : new String(value, beginIndex, subLen); }
发现其内部是调用 String 的构造方法创建了一个新字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public String (char value[], int offset, int count) { if (offset < 0 ) { throw new StringIndexOutOfBoundsException(offset); } if (count <= 0 ) { if (count < 0 ) { throw new StringIndexOutOfBoundsException(count); } if (offset <= value.length) { this .value = "" .value; return ; } } if (offset > value.length - count) { throw new StringIndexOutOfBoundsException(offset + count); } this .value = Arrays.copyOfRange(value, offset, offset+count); }
构造新字符串对象时,会生成新的 char[] value
,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝 (defensive copy)】
6.3 final原理 6.3.1 设置final变量 1 2 3 4 5 public class Test2 { public static void main (String[] args) { final int a = 20 ; } }
反编译后的字节码指令:
1 2 3 4 5 6 7 0 : aload_01 : invokespecial #1 4 : aload_05 : bipush 20 7 : putfield #2 <-- 写屏障 10 : return
发现final变量的赋值也会通过putfield指令来完成,同样在这条指令之后会加入写屏障,保证在其他线程读到它时不会出现为0的情况
6.3.2 获取final变量 略
7 工具类 7.1 线程池 池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
使用线程池的好处:
降低资源消耗 。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度 。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
7.1.1 自定义线程池
阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
主线程类似于生产者 ,产生任务并放入阻塞队列中
线程池类似于消费者 ,得到阻塞队列中已有的任务并执行
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 @Slf4j public class Test3 { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool(1 , 1000 , TimeUnit.MILLISECONDS, 1 , ((queue, task) -> { })); for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } @FunctionalInterface interface RejectPolicy <T > { void reject (BlockingQueue<T> queue, T task) ; } @Slf4j class ThreadPool { private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet<Worker>(); private int coreSize; private long timeout; private TimeUnit unit; private RejectPolicy<Runnable> rejectPolicy; public void execute (Runnable task) { synchronized (workers){ if (workers.size() < coreSize) { Worker worker = new Worker(task); log.debug("新增 worker {}, {}" , worker, task); workers.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool (int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .unit = unit; this .taskQueue = new BlockingQueue<>(queueCapacity); this .rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout, unit)) != null ) { try { log.debug("正在执行...{}" , task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { log.debug("执行完毕...{}" , task); task = null ; } } synchronized (workers) { log.debug("worker被移除{}" , this ); workers.remove(this ); } } } } @Slf4j class BlockingQueue <T > { private Deque<T> queue = new ArrayDeque<>(); private ReentrantLock lock = new ReentrantLock(); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capacity; public BlockingQueue (int capacity) { this .capacity = capacity; } public T take () { lock.lock(); try { while (queue.isEmpty()) { emptyWaitSet.await(); } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { if (nanos <= 0 ){ return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void put (T task) { lock.lock(); try { while (queue.size() == capacity) { log.debug("等待加入任务队列... {}" , task); fullWaitSet.await(); } log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capacity) { log.debug("等待加入任务队列... {}" , task); if (nanos <= 0 ){ return false ; } nanos = fullWaitSet.awaitNanos(nanos); } log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); return true ; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return false ; } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity) { rejectPolicy.reject(this , task); } else { log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
实现了一个简单的线程池
阻塞队列BlockingQueue用于暂存来不及被线程执行的任务
也可以说是平衡生产者和消费者执行速度上的差异
里面的获取任务和放入任务用到了生产者消费者模式
线程池中对线程Thread进行了再次的封装,封装为了Worker
在调用任务的run方法时,线程会去执行该任务,执行完毕后还会到阻塞队列中获取新任务来执行
线程池中执行任务的主要方法为execute方法
7.1.2 ThreadPoolExecutor例子 从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括Runnable
和Callable
,而执行机制由Executor
框架提供。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class WorkerThread implements Runnable { private String command; public WorkerThread (String s) { this .command=s; } @Override public void run () { System.out.println(Thread.currentThread().getName() + " Start. Command = " + command); processCommand(); System.out.println(Thread.currentThread().getName() + " End." ); } private void processCommand () { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString () { return this .command; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SimpleThreadPool { public static void main (String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5 ); for (int i = 0 ; i < 10 ; i++) { Runnable worker = new WorkerThread("" + i); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads" ); } }
isTerminated
当调用shutdown()方法后,并且所有提交的任务完成后返回为true
。
程序中我们创建了固定大小为五个工作线程的线程池。然后分配给线程池十个工作,因为线程池大小为五,它将启动五个工作线程先处理五个工作,其他的工作则处于等待状态,一旦有工作完成,空闲下来工作线程就会捡取等待队列里的其他工作进行执行。
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 pool-1-thread-2 Start.command = 1 pool-1-thread-3 Start.command = 2 pool-1-thread-1 Start.command = 0 pool-1-thread-5 Start.command = 4 pool-1-thread-4 Start.command = 3 pool-1-thread-1 End. pool-1-thread-4 End. pool-1-thread-3 End. pool-1-thread-5 End. pool-1-thread-2 End. pool-1-thread-5 Start.command = 5 pool-1-thread-2 Start.command = 6 pool-1-thread-4 Start.command = 7 pool-1-thread-3 Start.command = 9 pool-1-thread-1 Start.command = 8 pool-1-thread-4 End. pool-1-thread-5 End. pool-1-thread-2 End. pool-1-thread-3 End. pool-1-thread-1 End. Finished all threads Process finished with exit code 0
输出表明线程池中至始至终只有五个名为 “pool-1-thread-1” 到 “pool-1-thread-5” 的五个线程,这五个线程不随着工作的完成而消亡,会一直存在,并负责执行分配给线程池的任务,直到线程池消亡。
Executors 类提供了使用了 ThreadPoolExecutor 的简单的 ExecutorService 实现,但是 ThreadPoolExecutor 提供的功能远不止于此。我们可以在创建 ThreadPoolExecutor 实例时指定活动线程的数量,我们也可以限制线程池的大小并且创建我们自己的 RejectedExecutionHandler 实现来处理不能适应工作队列的工作。
这里是我们自定义的 RejectedExecutionHandler 接口的实现。
RejectedExecutionHandlerImpl.java
1 2 3 4 5 6 7 8 9 10 11 import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected" ); } }
ThreadPoolExecutor 提供了一些方法,我们可以使用这些方法来查询 executor 的当前状态,线程池大小,活动线程数量以及任务数量。因此我是用来一个监控线程在特定的时间间隔内打印 executor 信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run = true ; public MyMonitorThread (ThreadPoolExecutor executor, int delay) { this .executor = executor; this .seconds = delay; } public void shutdown () { this .run = false ; } @Override public void run () { while (run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s" , this .executor.getPoolSize(), this .executor.getCorePoolSize(), this .executor.getActiveCount(), this .executor.getCompletedTaskCount(), this .executor.getTaskCount(), this .executor.isShutdown(), this .executor.isTerminated())); try { Thread.sleep(seconds*1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class WorkerPool { public static void main (String[] args) throws InterruptedException { RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); ThreadFactory threadFactory = Executors.defaultThreadFactory(); ThreadPoolExecutor executorPool = new ThreadPoolExecutor( 2 , 4 , 10 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(2 ), threadFactory, rejectionHandler); MyMonitorThread monitor = new MyMonitorThread(executorPool, 3 ); Thread monitorThread = new Thread(monitor); monitorThread.start(); for (int i = 0 ; i < 10 ; i++){ executorPool.execute(new WorkerThread("cmd" + i)); } Thread.sleep(30000 ); executorPool.shutdown(); Thread.sleep(5000 ); monitor.shutdown(); } }
注意在初始化 ThreadPoolExecutor 时,我们保持初始池大小为 2,最大池大小为 4 而工作队列大小为 2。因此如果已经有四个正在执行的任务而此时分配来更多任务的话,工作队列将仅仅保留他们(新任务)中的两个,其他的将会被 RejectedExecutionHandlerImpl 处理。
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 pool-1-thread-1 Start. Command = cmd0 pool-1-thread-4 Start. Command = cmd5 cmd6 is rejected pool-1-thread-3 Start. Command = cmd4 pool-1-thread-2 Start. Command = cmd1 cmd7 is rejected cmd8 is rejected cmd9 is rejected [monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-4 End. pool-1-thread-1 End. pool-1-thread-2 End. pool-1-thread-3 End. pool-1-thread-1 Start. Command = cmd3 pool-1-thread-4 Start. Command = cmd2 [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-4 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
7.1.2 ThreadPoolExecutor 1) 继承关系 顶层接口:Executor
,ExecutorService
实现类:ThreadPoolExecutor
,ScheduledThreadPoolExecutor
Executor 框架结构(主要由三大部分组成):
任务(Runnable
/Callable
接口及其实现类)
执行任务需要实现的 Runnable接口 或 Callable接口。Runnable接口或 Callable 接口实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
任务的执行(Executor
接口及其实现类)
如上图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor
接口的 ExecutorService
接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
异步计算的结果(Future
接口及其实现类)
Future
接口以及 Future
接口的实现类 FutureTask
类都可以代表异步计算 的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit()
方法时会返回一个 FutureTask 对象,能够获取返回值,而execute()
方法没有返回值)
Executor 框架的使用示意图
主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
把创建完成的实现 Runnable/Callable接口的对象直接交给 ExecutorService 执行:
ExecutorService.execute(Runnable command)
ExecutorService.submit(Runnable task)
或 ExecutorService.submit(Callable <T> task)
获取返回值。
最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行
2) 线程池状态 ThreadPoolExecutor 使用 int 的高三位来表示线程池状态,低29位表示线程数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
状态名称
高3位的值
描述
RUNNING
111
接收新任务,同时处理任务队列中的任务
SHUTDOWN
000
不接受新任务,但是处理任务队列中的任务
STOP
001
中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING
010
任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED
011
终结状态
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
1 2 3 4 5 6 7 8 9 10 11 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;
获取线程池状态、线程数量以及合并两个值的操作
1 2 3 4 5 6 7 8 9 10 11 private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
3) *ThreadPoolExecutor构造方法 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize
:核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize,即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()
方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
:最大线程数 = 核心线程数 + 救急线程数
maximumPoolSize - corePoolSize = 救急线程数
keepAliveTime
:救急线程空闲时的最大生存时间
unit
:时间单位 - 针对救急线程
workQueue
:阻塞队列(存放任务)
有界阻塞队列 ArrayBlockingQueue
— 配合救急线程
无界阻塞队列 LinkedBlockingQueue
— 不存在救急线程,此时最大线程数参数无意义
最多只有一个同步元素的 SynchronousQueue
优先队列 PriorityBlockingQueue
threadFactory
:线程工厂(用于创建线程)
handler
:拒绝策略
根据这个构造方法,JDK Executors 类中提供了众多工厂方法 来创建各种用途的线程池
工作方式
当一个任务传给线程池以后,可能有以下几种可能
当前线程数小于核心线程数,则新创建一个线程来执行该任务
当前线程数等于核心线程数,但有空闲线程,则使用空闲线程来执行该任务
当前线程数等于核心线程数,且核心线程都在执行任务,将任务放到阻塞队列workQueue中等待被执行
阻塞队列满了,新建救急线程来执行任务
救急线程用完以后,超过生存时间(keepAliveTime)后会被释放
当提交的任务总数大于了 最大线程数(maximumPoolSize)与阻塞队列容量的最大值(workQueue.capacity) ,使用拒绝策略
拒绝策略
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略 。拒绝策略 jdk 提供了 4 种实现:
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy:让调用者运行任务
DiscardPolicy:放弃本次任务
DiscardOldestPolicy:放弃队列中最靠前的任务,本任务取而代之
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Demo1 { static AtomicInteger threadId = new AtomicInteger(0 ); public static void main (String[] args) { ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<Runnable>(10 ); ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r, "working_thread_" + threadId.getAndIncrement()); return thread; } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(5 , 7 , 10 , TimeUnit.SECONDS, runnable, threadFactory); for (int i = 0 ; i < 20 ; i++) { executor.execute(new Runnable() { @Override public void run () { System.out.println(Thread.currentThread()); try { Thread.sleep(100000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
4) newFixedThreadPool 这个是Executors类提供的工厂方法 来创建线程池,Executors 是Executor 框架的工具类。
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
通过源码可以看到 new ThreadPoolExecutor(xxx)方法其实是是调用了之前说的完整参数的构造方法,使用了默认的线程工厂和拒绝策略。
FixedThreadPool是固定大小的线程池,可以传入两个参数:
核心线程数:nThreads
线程工厂:threadFactory
特点:
没有救急线程,因此核心线程数 = 最大线程数
阻塞队列是无界的,可以存放任意数量的任务
适用于任务量已知,相对耗时的任务
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class Test3 { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2 , new ThreadFactory() { private AtomicInteger t = new AtomicInteger(1 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "mypool_t" + t.getAndIncrement()); } }); pool.execute(() -> { log.debug("task1" ); }); pool.execute(() -> { log.debug("task2" ); }); pool.execute(() -> { log.debug("task3" ); }); } }
5) newCachedThreadPool 1 2 3 4 5 public static ExecutorService new CachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
1 ExecutorService executorService = Executors.newCachedThreadPool();
特点:
没有核心线程,最大线程数为Integer.MAX_VALUE,所有创建的线程都是救急线程 ,空闲时生存时间为60秒
阻塞队列使用的是SynchronousQueue
SynchronousQueue是一种特殊的队列
没有容量 ,没有线程来取是放不进去的
只有当线程取任务时,才会将任务放入该阻塞队列中(一手交钱,一手交货)
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲一分钟释放线程
适合任务数比较密集,但每个任务执行时间较短的情况
SynchronousQueue特性示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class Test4 { public static void main (String[] args) throws InterruptedException { SynchronousQueue<Integer> integers = new SynchronousQueue<>(); new Thread(() -> { try { log.debug("putting {}" , 1 ); integers.put(1 ); log.debug("{} putted..." , 1 ); log.debug("putting {}" , 2 ); integers.put(2 ); log.debug("{} putted..." , 2 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread(() -> { try { log.debug("taking {}" , 1 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread(() -> { try { log.debug("taking {}" , 2 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t3" ).start(); } }
1 2 3 4 5 6 2021-11-22 15:50:38 [ t1:0 ] - [ DEBUG ] putting 1 2021-11-22 15:50:39 [ t2:999 ] - [ DEBUG ] taking 1 2021-11-22 15:50:39 [ t1:999 ] - [ DEBUG ] 1 putted... 2021-11-22 15:50:39 [ t1:999 ] - [ DEBUG ] putting 2 2021-11-22 15:50:40 [ t3:2003 ] - [ DEBUG ] taking 2 2021-11-22 15:50:40 [ t1:2003 ] - [ DEBUG ] 2 putted...
6) newSingleThreadExecutor 1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
内部调用了new ThreadPoolExecutor 的构造方法,传入的corePoolSize和maximumPoolSize都为1。然后将该对象传给了FinalizableDelegatedExecutorService。该类修饰了ThreadPoolExecutor,让外部无法调用ThreadPoolExecutor内部的某些方法来修改所创建的线程池的大小。
特点
只有1个核心线程
任务队列采用无界队列
使用场景:希望多个任务排队执行
注意
SingleThread和自己创建一个线程来运行多个任务的区别
当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而SingleThread会创建一个新线程,继续执行任务队列中剩余的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class Test5 { public static void main (String[] args) { test(); } public static void test () { ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(() -> { log.debug("1" ); int i = 1 / 0 ; }); pool.execute(() -> { log.debug("2" ); }); pool.execute(() -> { log.debug("3" ); }); } }
这里的线程1因除零操作抛出异常,但线程池又新建一个线程2继续执行任务,不会因线程1的报错而终止任务运行。
SingleThread和newFixedThreadPool(1)的区别
newFixedThreadPool(1)传值为1,可以将FixedThreadPool强转为ThreadPoolExecutor,然后通过setCorePoolSize改变核心线程数
而SingleThread无法修改核心线程数
1 2 3 4 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1 ); threadPool.setCorePoolSize(2 );
7) execute()方法
相关方法
1 2 3 4 5 6 7 8 9 10 11 12 void execute (Runnable command) ;<T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException ; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ;
传入一个Runnable对象,执行其中的run方法
1 execute(Runnable command)
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
其中调用了addWoker() 方法,再看看看这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
8) submit()方法 传入一个Callable对象,用Future来捕获返回值
1 Future<T> submit (Callable<T> task)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j public class Test6 { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2 ); Future<String> future = pool.submit(() -> { log.debug("running" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return "ok" ; }); log.debug("{}" , future.get()); } }
1 2 2021-11-23 10:59:37 [ pool-1-thread-1:0 ] - [ DEBUG ] running 2021-11-23 10:59:38 [ main:1003 ] - [ DEBUG ] ok
submit和execute的区别
提交的任务类型不同
execute只能提交Runnable类型的任务
submit既能提交Runnable类型任务也能提交Callable类型任务
异常
execute会直接抛出任务执行时的异常,可以用try、catch来捕获,和普通线程的处理方式完全一致
submit会吃掉异常,可通过Future的get方法将任务执行时的异常重新抛出。
返回值
execute()没有返回值
submit有返回值,所以需要返回值的时候必须使用submit
9) invokeAll()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class Test6 { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2 ); List<Future<String>> futures = pool.invokeAll(Arrays.asList( () -> { log.debug("begin..." ); TimeUnit.SECONDS.sleep(1 ); return "1" ; }, () -> { log.debug("begin..." ); TimeUnit.SECONDS.sleep(3 ); return "2" ; }, () -> { log.debug("begin..." ); TimeUnit.SECONDS.sleep(2 ); return "3" ; } )); futures.forEach( f -> { try { log.debug("{}" , f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } }
1 2 3 4 5 6 2021-11-24 10:17:47 [ pool-1-thread-2:0 ] - [ DEBUG ] begin... 2021-11-24 10:17:47 [ pool-1-thread-1:0 ] - [ DEBUG ] begin... 2021-11-24 10:17:48 [ pool-1-thread-1:1002 ] - [ DEBUG ] begin... 2021-11-24 10:17:50 [ main:3003 ] - [ DEBUG ] 1 2021-11-24 10:17:50 [ main:3004 ] - [ DEBUG ] 2 2021-11-24 10:17:50 [ main:3004 ] - [ DEBUG ] 3
10) 关闭线程池
shutdown()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow()
1 2 3 4 5 6 7 List<Runnable> shutdownNow () ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
其他方法
1 2 3 4 5 6 boolean isShutdown () ;boolean isTerminated () ;boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException ;
代码用例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j public class Test3 { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2 ); Future<Integer> res1 = pool.submit(() -> { log.debug("task1 running..." ); TimeUnit.SECONDS.sleep(1 ); log.debug("task1 finished..." ); return 1 ; }); Future<Integer> res2 = pool.submit(() -> { log.debug("task2 running..." ); TimeUnit.SECONDS.sleep(1 ); log.debug("task2 finished..." ); return 2 ; }); Future<Integer> res3 = pool.submit(() -> { log.debug("task3 running..." ); TimeUnit.SECONDS.sleep(1 ); log.debug("task3 finished..." ); return 3 ; }); log.debug("shutdown..." ); pool.shutdown(); } }
1 2 3 4 5 6 7 2021 -11 -24 10 :29 :30 [ pool-1 -thread-1 :0 ] - [ DEBUG ] task1 running...2021 -11 -24 10 :29 :30 [ pool-1 -thread-2 :0 ] - [ DEBUG ] task2 running...2021 -11 -24 10 :29 :30 [ main:0 ] - [ DEBUG ] shutdown...2021 -11 -24 10 :29 :31 [ pool-1 -thread-1 :1000 ] - [ DEBUG ] task1 finished...2021 -11 -24 10 :29 :31 [ pool-1 -thread-1 :1000 ] - [ DEBUG ] task3 running...2021 -11 -24 10 :29 :31 [ pool-1 -thread-2 :1002 ] - [ DEBUG ] task2 finished...2021 -11 -24 10 :29 :32 [ pool-1 -thread-1 :2002 ] - [ DEBUG ] task3 finished...
1 2 3 2021 -11 -24 10 :30 :40 [ pool-1 -thread-1 :0 ] - [ DEBUG ] task1 running...2021 -11 -24 10 :30 :40 [ main:0 ] - [ DEBUG ] shutdown...2021 -11 -24 10 :30 :40 [ pool-1 -thread-2 :0 ] - [ DEBUG ] task2 running...
11) 异步模式之工作线程
定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池 ,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率 。
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型。B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。
线程池的大小
过小会导致程序不能充分地利用系统资源、容易导致饥饿,过大会导致更多的线程上下文切换,占用更多内存
CPU 密集型运算 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算 CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:线程数 = 核数 期望 CPU 利用率 总时间(CPU计算时间+等待时间) / CPU 计算时间 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 4 100% 100% / 50% = 8 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 4 100% 100% / 10% = 40
12) 任务调度线程池 在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer
的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行 执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
Timer使用——出现的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class Test4 { public static void main (String[] args) { Timer timer = new Timer(); TimerTask task1 = new TimerTask() { @SneakyThrows @Override public void run () { log.debug("task1" ); TimeUnit.SECONDS.sleep(2 ); } }; TimerTask task2 = new TimerTask() { @Override public void run () { log.debug("task2" ); } }; log.debug("start..." ); timer.schedule(task1, 1000 ); timer.schedule(task2, 1000 ); } }
1 2 3 2021-11-24 10:54:01 [ main:0 ] - [ DEBUG ] start... 2021-11-24 10:54:02 [ Timer-0:1000 ] - [ DEBUG ] task1 2021-11-24 10:54:04 [ Timer-0:3001 ] - [ DEBUG ] task2
可以看出,01秒后过1s,task1和task2开始执行,但是task1执行需要2s,因此task2这时并未开始执行,而是等到task1执行完后(01 + 3s = 04)才开始执行。
若将task1的run方法中代码改为:
1 2 3 4 public void run () { log.debug("task1" ); int i = 1 / 0 ; }
则task2会因为task1的报错而无法执行。
ScheduledExecutorService改写——延时执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class Test5 { public static void main (String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(2 ); pool.schedule(() -> { log.debug("task1" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } }, 1 , TimeUnit.SECONDS); pool.schedule(() -> { log.debug("task2" ); }, 1 , TimeUnit.SECONDS); } }
1 2 2021-11-24 11:03:06 [ pool-1-thread-2:0 ] - [ DEBUG ] task2 2021-11-24 11:03:06 [ pool-1-thread-1:0 ] - [ DEBUG ] task1
可见task1的执行并不会影响task2的执行。
ScheduledExecutorService改写——定时执行
1 2 3 4 5 6 7 8 9 10 11 @Slf4j public class Test6 { public static void main (String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 ); log.debug("start..." ); pool.scheduleAtFixedRate(() -> { log.debug("running..." ); }, 1 , 1 , TimeUnit.SECONDS); } }
1 2 3 4 5 6 7 2021-11-24 11:08:46 [ main:1 ] - [ DEBUG ] start... 2021-11-24 11:08:47 [ pool-1-thread-1:1004 ] - [ DEBUG ] running... 2021-11-24 11:08:48 [ pool-1-thread-1:2003 ] - [ DEBUG ] running... 2021-11-24 11:08:49 [ pool-1-thread-1:3004 ] - [ DEBUG ] running... 2021-11-24 11:08:50 [ pool-1-thread-1:4003 ] - [ DEBUG ] running... 2021-11-24 11:08:51 [ pool-1-thread-1:5004 ] - [ DEBUG ] running... 2021-11-24 11:08:52 [ pool-1-thread-1:6003 ] - [ DEBUG ] running...
应用——定时任务
暂略
13) 正确处理执行任务异常 如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。
主动捉异常
1 2 3 4 5 6 7 8 9 ExecutorService pool = Executors.newFixedThreadPool(1 ); pool.submit(() -> { try { log.debug("task1" ); int i = 1 / 0 ; } catch (Exception e) { log.error("error:" , e); } });
使用 Future,错误信息都被封装进submit方法的返回方法中!
1 2 3 4 5 6 7 ExecutorService pool = Executors.newFixedThreadPool(1 ); Future<Boolean> f = pool.submit(() -> { log.debug("task1" ); int i = 1 / 0 ; return true ; }); log.debug("result:{}" , f.get());
14) tomcat线程池
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
Acceptor 只负责【接收新的 socket 连接】
Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同:如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
配置项
7.1.3 Fork/Join 1) 概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
2) 使用 提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j public class Test7 { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool(4 ); System.out.println(pool.invoke(new MyTask(5 ))); } } class MyTask extends RecursiveTask <Integer > { private int n; public MyTask (int n) { this .n = n; } @Override protected Integer compute () { if (n == 1 ) { return 1 ; } MyTask t1 = new MyTask(n - 1 ); t1.fork(); int result = n + t1.join(); return result; } }
7.2 JUC 7.2.1 AQS原理 1) 概述 全称是 AbstractQueuedSynchronizer
,是阻塞式锁 和相关的同步器工具的框架。AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…。
特点:
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
1 2 3 4 5 6 7 8 9 10 11 if (!tryAcquire(arg)) { } if (tryRelease(arg)) { }
2) 自定义同步器使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 @Slf4j public class Test1 { public static void main (String[] args) { MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try { log.debug("t1加锁成功" ); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("t1解锁" ); lock.unlock(); } }, "t1" ).start(); new Thread(() -> { lock.lock(); try { log.debug("t2加锁成功" ); } finally { log.debug("t2解锁" ); lock.unlock(); } }, "t2" ).start(); } } class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } public Condition newCondition () { return new ConditionObject(); } } private MySync mySync = new MySync(); @Override public void lock () { mySync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { mySync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return mySync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return mySync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { mySync.release(1 ); } @Override public Condition newCondition () { return mySync.newCondition(); } }
1 2 3 4 2021-12-02 16:11:29 [ t1:0 ] - [ DEBUG ] t1加锁成功 2021-12-02 16:11:30 [ t1:1001 ] - [ DEBUG ] t1解锁 2021-12-02 16:11:30 [ t2:1001 ] - [ DEBUG ] t2加锁成功 2021-12-02 16:11:30 [ t2:1001 ] - [ DEBUG ] t2解锁
3) 概述补充 AQS定义两种资源共享方式:Exclusive (独占,只有一个线程能执行,如ReentrantLock)和Share (共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可 ,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1 。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock 。
4) 结点状态waitStatus 这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
CANCELLED (1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL (-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION (-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中 ,等待获取同步锁。
PROPAGATE (-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0 :新结点入队时的默认状态。
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常 。
5) acquire(int) 此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止 ,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
函数流程如下:
tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁 ,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式 Node.EXCLUSIVE;
acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗? 就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。还是上源码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
end(Node)
此方法用于将node加入队尾。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(Node, int)
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了 。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回 。这个函数非常关键,还是上源码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire(Node, Node)
:此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态)
parkAndCheckInterrupt()
:如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
6) release(int) 此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
1 2 3 4 5 6 7 8 9 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点
tryRelease(int)
此方法尝试去释放指定量的资源。下面是tryRelease()的源码:
1 2 3 protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); }
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了! 所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
unparkSuccessor(Node)
此方法用于唤醒等待队列中下一个线程。下面是源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
7.2.2 ReentrantLock原理 可以看到ReentrantLock提供了两个同步器,实现公平锁和非公平锁,默认是非公平锁!
1)非公平锁实现原理
加锁流程
加锁解锁流程,先从构造器开始看,默认为非公平锁实现
1 2 3 public ReentrantLock () { sync = new NonfairSync(); }
第一个竞争出现时,查看源码的NonfairSync的lock方法:
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } }
Thread-1 执行了
lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
Node 的创建是懒惰的
其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquire方法的 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)
再次有多个线程经历上述过程竞争失败,变成这个样子
释放锁流程
Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease(使用ctrl+alt+b查看tryRelease方法的具体ReentrantLock
实现) 流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0
unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) { tail = head; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) { return true ; } if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } }
解锁源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if ( h != null && h.waitStatus != 0 ) { unparkSuccessor(h); } return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) { compareAndSetWaitStatus(node, ws, 0 ); } Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } }
2)可重入原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static final class NonfairSync extends Sync { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
3)可打断原理 不可打断模式:在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 static final class NonfairSync extends Sync { private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } static void selfInterrupt () { Thread.currentThread().interrupt(); } }
可打断模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 static final class NonfairSync extends Sync { public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); } } } finally { if (failed) cancelAcquire(node); } } }
4)公平锁实现原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ( (s = h.next) == null || s.thread != Thread.currentThread() ); } }
5)条件变量实现原理
图解流程
每个条件变量其实就对应着一个等待队列 ,其实现类是 ConditionObject
await 流程:开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁:
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功:
park 阻塞 Thread-0:
signal流程:假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node:
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1:
Thread-1 释放锁,进入 unlock 流程,略。
源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 public class ConditionObject implements Condition , java .io .Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) { lastWaiter = null ; } first.nextWaiter = null ; } while ( !transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if ( ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) ) { LockSupport.unpark(node.thread); } return true ; } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } private static final int REINTERRUPT = 1 ; private static final int THROW_IE = -1 ; private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await () throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil (Date deadline) throws InterruptedException { } public final boolean await (long time, TimeUnit unit) throws InterruptedException { } }
7.2.3 读写锁 1) ReentrantReadWriteLock 当读操作远远高于写操作时,这时候使用读写锁 让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!
提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Slf4j public class TestReadWriteLock { public static void main (String[] args) { DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1" ).start(); new Thread(() -> { dataContainer.read(); }, "t2" ).start(); } } @Slf4j class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁..." ); r.lock(); try { log.debug("读取" ); TimeUnit.SECONDS.sleep(1 ); return data; } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("释放读锁..." ); r.unlock(); } return data; } public void write () { log.debug("获取写锁..." ); w.lock(); try { log.debug("写入" ); } finally { log.debug("释放写锁..." ); w.unlock(); } } }
1 2 3 4 5 6 2021-12-10 11:01:55 [ t1:0 ] - [ DEBUG ] 获取读锁... 2021-12-10 11:01:55 [ t2:0 ] - [ DEBUG ] 获取读锁... 2021-12-10 11:01:55 [ t1:1 ] - [ DEBUG ] 读取 2021-12-10 11:01:55 [ t2:1 ] - [ DEBUG ] 读取 2021-12-10 11:01:56 [ t1:1003 ] - [ DEBUG ] 释放读锁... 2021-12-10 11:01:56 [ t2:1003 ] - [ DEBUG ] 释放读锁...
可见一起读取线程没有阻塞。
注意事项
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
1 2 3 4 5 6 7 8 9 10 11 12 r.lock(); try { w.lock(); try { } finally { w.unlock(); } } finally { r.unlock(); }
重入时降级支持:即持有写锁的情况下去获取读锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
2) 读写锁原理 读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个 下面执行:t1 w.lock,t2 r.lock
图解流程
t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1)
流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
-1 表示失败
0 表示成功,但后继节点不会继续唤醒
正数表示成功,而且数值是还有几个后继节点需要唤醒,我们这里的读写锁返回 1
这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
又继续执行:t3 r.lock,t4 w.lock 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
继续执行t1 w.unlock 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子
接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,图中的t2从黑色变成了蓝色(注意这里只是恢复运行而已,并没有获取到锁!) 这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点。再继续执行t2 r.unlock,t3 r.unlock t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束
源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 static final class NonfairSync extends Sync { public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if ( w == 0 || current != getExclusiveOwnerThread() ) { return false ; } if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if ( writerShouldBlock() || !compareAndSetState(c, c + acquires) ) { return false ; } setExclusiveOwnerThread(current); return true ; } final boolean writerShouldBlock () { return false ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) { setExclusiveOwnerThread(null ); } setState(nextc); return free; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 static final class NonfairSync extends Sync { public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) { doAcquireShared(arg); } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if ( exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current ) { return -1 ; } int r = sharedCount(c); if ( !readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT) ) { return 1 ; } return fullTryAcquireShared(current); } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { return 1 ; } } } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 static final class NonfairSync extends Sync { public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int unused) { for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } }
3) StampedLock 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁
1 2 long stamp = lock.readLock();lock.unlockRead(stamp);
加解写锁
1 2 long stamp = lock.writeLock();lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次戳校验。如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){}
注意:
StampedLock 不支持条件变量
StampedLock 不支持可重入
代码示例
提供一个数据容器类。内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Slf4j class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped (int data) { this .data = data; } public int read (int readTime) throws InterruptedException { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}" , stamp); Thread.sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}" , stamp); return data; } log.debug("updating to read lock...{}" , stamp); try { stamp = lock.readLock(); log.debug("read lock {}" , stamp); Thread.sleep(readTime); log.debug("read finish...{}" , stamp); return data; } finally { log.debug("read lock...{}" , stamp); lock.unlockRead(stamp); } } public void write (int newData) { long stamp = lock.writeLock(); log.debug("write lock {}" , stamp); try { TimeUnit.SECONDS.sleep(1 ); this .data = newData; } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("write unlock {}" , stamp); lock.unlockWrite(stamp); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j public class TestStampedLock { public static void main (String[] args) throws InterruptedException { DataContainerStamped dataContainer = new DataContainerStamped(1 ); new Thread(() -> { try { dataContainer.read(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ).start(); Thread.sleep(500 ); new Thread(() -> { try { dataContainer.read(0 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2" ).start(); } }
1 2 3 4 2021-12-14 16:01:21 [ t1:0 ] - [ DEBUG ] optimistic read locking...256 2021-12-14 16:01:21 [ t2:501 ] - [ DEBUG ] optimistic read locking...256 2021-12-14 16:01:21 [ t2:501 ] - [ DEBUG ] read finish...256 2021-12-14 16:01:22 [ t1:1002 ] - [ DEBUG ] read finish...256
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class TestStampedLock { public static void main (String[] args) throws InterruptedException { DataContainerStamped dataContainer = new DataContainerStamped(1 ); new Thread(() -> { try { dataContainer.read(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ).start(); Thread.sleep(1000 ); new Thread(() -> { dataContainer.write(1000 ); }, "t2" ).start(); } }
1 2 3 4 5 6 7 2021-12-14 16:05:36 [ t1:0 ] - [ DEBUG ] optimistic read locking...256 2021-12-14 16:05:37 [ t2:999 ] - [ DEBUG ] write lock 384 2021-12-14 16:05:38 [ t2:2002 ] - [ DEBUG ] write unlock 384 2021-12-14 16:05:38 [ t1:2002 ] - [ DEBUG ] updating to read lock...256 2021-12-14 16:05:38 [ t1:2002 ] - [ DEBUG ] read lock 513 2021-12-14 16:05:40 [ t1:4002 ] - [ DEBUG ] read finish...513 2021-12-14 16:05:40 [ t1:4002 ] - [ DEBUG ] read unlock...513
t1读取后,t2修改了数据,t1验戳失败,乐观读升级为读锁,重新上锁,再次读取数据。
7.2.4 Semaphore 1) 基本使用 信号量,用来限制能同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Slf4j public class TestSemaphore { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..." ); TimeUnit.SECONDS.sleep(1 ); log.debug("end" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 2021-12-14 16:18:26 [ Thread-0:0 ] - [ DEBUG ] running... 2021-12-14 16:18:26 [ Thread-2:0 ] - [ DEBUG ] running... 2021-12-14 16:18:26 [ Thread-1:0 ] - [ DEBUG ] running... 2021-12-14 16:18:27 [ Thread-1:1002 ] - [ DEBUG ] end 2021-12-14 16:18:27 [ Thread-2:1002 ] - [ DEBUG ] end 2021-12-14 16:18:27 [ Thread-0:1002 ] - [ DEBUG ] end 2021-12-14 16:18:27 [ Thread-3:1002 ] - [ DEBUG ] running... 2021-12-14 16:18:27 [ Thread-5:1002 ] - [ DEBUG ] running... 2021-12-14 16:18:27 [ Thread-4:1002 ] - [ DEBUG ] running... 2021-12-14 16:18:28 [ Thread-4:2005 ] - [ DEBUG ] end 2021-12-14 16:18:28 [ Thread-5:2005 ] - [ DEBUG ] end 2021-12-14 16:18:28 [ Thread-3:2005 ] - [ DEBUG ] end 2021-12-14 16:18:28 [ Thread-7:2005 ] - [ DEBUG ] running... 2021-12-14 16:18:28 [ Thread-8:2005 ] - [ DEBUG ] running... 2021-12-14 16:18:28 [ Thread-6:2005 ] - [ DEBUG ] running... 2021-12-14 16:18:29 [ Thread-8:3006 ] - [ DEBUG ] end 2021-12-14 16:18:29 [ Thread-6:3006 ] - [ DEBUG ] end 2021-12-14 16:18:29 [ Thread-7:3006 ] - [ DEBUG ] end 2021-12-14 16:18:29 [ Thread-9:3006 ] - [ DEBUG ] running... 2021-12-14 16:18:30 [ Thread-9:4007 ] - [ DEBUG ] end Process finished with exit code 0
2) 图解流程 Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源:
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下:
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态:
3) 源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if ( remaining < 0 || compareAndSetState(available, remaining) ) { return remaining; } } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } }
7.2.5 CountDownLatch CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch是共享锁 的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。
CountDownLatch用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一。
1) 使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j public class TestCountdownLatch { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3 ); new Thread(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(1 ); latch.countDown(); log.debug("end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(2 ); latch.countDown(); log.debug("end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(1 ); latch.countDown(); log.debug("end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); log.debug("waiting..." ); latch.await(); log.debug("end..." ); } }
1 2 3 4 5 6 7 8 2021-12-15 11:10:50 [ Thread-0:0 ] - [ DEBUG ] begin... 2021-12-15 11:10:50 [ Thread-2:1 ] - [ DEBUG ] begin... 2021-12-15 11:10:50 [ main:1 ] - [ DEBUG ] waiting... 2021-12-15 11:10:50 [ Thread-1:0 ] - [ DEBUG ] begin... 2021-12-15 11:10:51 [ Thread-0:1002 ] - [ DEBUG ] end... 2021-12-15 11:10:51 [ Thread-2:1002 ] - [ DEBUG ] end... 2021-12-15 11:10:52 [ Thread-1:2002 ] - [ DEBUG ] end... 2021-12-15 11:10:52 [ main:2002 ] - [ DEBUG ] end...
2) 改进—配合线程池使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf4j public class TestCountdownLatch { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3 ); ExecutorService service = Executors.newFixedThreadPool(4 ); service.submit(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() -> { try { log.debug("begin..." ); TimeUnit.SECONDS.sleep(3 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() -> { try { log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }); service.shutdown(); } }
1 2 3 4 5 6 7 8 2021-12-15 14:28:16 [ pool-1-thread-1:0 ] - [ DEBUG ] begin... 2021-12-15 14:28:16 [ pool-1-thread-3:1 ] - [ DEBUG ] begin... 2021-12-15 14:28:16 [ pool-1-thread-4:1 ] - [ DEBUG ] waiting... 2021-12-15 14:28:16 [ pool-1-thread-2:0 ] - [ DEBUG ] begin... 2021-12-15 14:28:17 [ pool-1-thread-1:1005 ] - [ DEBUG ] end...2 2021-12-15 14:28:18 [ pool-1-thread-2:2003 ] - [ DEBUG ] end...1 2021-12-15 14:28:19 [ pool-1-thread-3:3002 ] - [ DEBUG ] end...0 2021-12-15 14:28:19 [ pool-1-thread-4:3002 ] - [ DEBUG ] wait end...
3) 应用
等待多线程准备完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class TestCountDownLatchApp { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(10 ); ExecutorService service = Executors.newFixedThreadPool(10 ); String[] all = new String[10 ]; Random r = new Random(); for (int j = 0 ; j < 10 ; j++) { int k = j; service.submit(() -> { for (int i = 0 ; i <= 100 ; i++) { try { Thread.sleep(r.nextInt(100 )); } catch (InterruptedException e) { e.printStackTrace(); } all[k] = i + "%" ; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\nGame Start!" ); service.shutdown(); } }
等待多个远程调用结束
略,见分布式课程
7.2.6 CyclicBarri CyclicBarri[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。
构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟CountdownLatch一样,但这个可以重用 。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class TestCyclicBarrier { public static void main (String[] args) { ExecutorService service = Executors.newFixedThreadPool(2 ); CyclicBarrier barrier = new CyclicBarrier(2 , () ->{ log.debug("task1 task2 finish..." ); }); for (int i = 0 ; i < 3 ; i++) { service.submit(() -> { log.debug("task1 begin..." ); try { TimeUnit.SECONDS.sleep(1 ); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); service.submit(() -> { log.debug("task2 begin..." ); try { TimeUnit.SECONDS.sleep(2 ); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } service.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 2021-12-15 14:15:44 [ pool-1-thread-2:0 ] - [ DEBUG ] task2 begin... 2021-12-15 14:15:44 [ pool-1-thread-1:0 ] - [ DEBUG ] task1 begin... 2021-12-15 14:15:46 [ pool-1-thread-2:2002 ] - [ DEBUG ] task1 task2 finish... 2021-12-15 14:15:46 [ pool-1-thread-2:2002 ] - [ DEBUG ] task1 begin... 2021-12-15 14:15:46 [ pool-1-thread-1:2002 ] - [ DEBUG ] task2 begin... 2021-12-15 14:15:48 [ pool-1-thread-1:4004 ] - [ DEBUG ] task1 task2 finish... 2021-12-15 14:15:48 [ pool-1-thread-1:4004 ] - [ DEBUG ] task1 begin... 2021-12-15 14:15:48 [ pool-1-thread-2:4004 ] - [ DEBUG ] task2 begin... 2021-12-15 14:15:50 [ pool-1-thread-2:6007 ] - [ DEBUG ] task1 task2 finish... Process finished with exit code 0
7.3 ThreadLocal 7.3.1 简介 ThreadLocal叫做线程变量 ,意思是ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
因为每个 Thread 内有自己的实例副本,且该副本只能由当前 Thread 使用。这是也是 ThreadLocal 命名的由来。
既然每个 Thread 有自己的实例副本,且其它 Thread 不可访问,那就不存在多线程间共享的问题。
当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。
总的来说,ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。
7.3.2 简单使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ThreadLocalTest { private final static ThreadLocal<String> localVar = new ThreadLocal<>(); static void print (String str) { System.out.println("线程[" + Thread.currentThread().getId() + "]" + str + " :" + localVar.get()); localVar.remove(); } public static void main (String[] args) throws InterruptedException { new Thread(() -> { ThreadLocalTest.localVar.set("local_A" ); print("A" ); System.out.println("After remove: " + "线程[" + Thread.currentThread().getId() + "]" + localVar.get()); },"A" ).start(); Thread.sleep(1000 ); new Thread(() -> { ThreadLocalTest.localVar.set("local_B" ); print("B" ); System.out.println("After remove: " + "线程[" + Thread.currentThread().getId() + "]" + localVar.get()); },"B" ).start(); } }
打印结果:
1 2 3 4 线程[14]A :local_A After remove: 线程[14]null 线程[15]B :local_B After remove: 线程[15]null
从这个示例中我们可以看到,两个线程分别获取了自己线程存放的变量,他们之间变量的获取并不会错乱。
7.3.3 源码分析 ① ThreadLocal.set 位于ThreadLocal类中:
1 2 3 4 5 6 7 8 9 10 11 12 public void set (T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null ) map.set(this , value); else createMap(t, value); }
从上面的代码可以看出,赋值的时候首先会获取当前线程thread,并获取thread线程中的ThreadLocalMap
属性。如果map属性不为空,则直接更新value值,如果map为空,则实例化threadLocalMap,并将value值初始化。
其中ThreadLocalMap
类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static class ThreadLocalMap { static class Entry extends WeakReference <ThreadLocal <?>> { Object value; Entry(ThreadLocal<?> k, Object v) { super (k); value = v; } } ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1 ); table[i] = new Entry(firstKey, firstValue); size = 1 ; setThreshold(INITIAL_CAPACITY); } }
ThreadLocalMap是ThreadLocal的内部静态类 ,而它的构成主要是用Entry
来保存数据 ,而且还是继承的弱引用。在Entry内部使用ThreadLocal作为key,使用我们设置的value作为value。
getMap
函数:
1 2 3 ThreadLocalMap getMap (Thread t) { return t.threadLocals; }
threadLocals
位于Thread类中:
1 ThreadLocal.ThreadLocalMap threadLocals = null ;
createMap
函数:
1 2 3 4 5 6 void createMap (Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this , firstValue); }
② ThreadLocal.get 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public T get () { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null ) { ThreadLocalMap.Entry e = map.getEntry(this ); if (e != null ) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
③ ThreadLocal.remove 1 2 3 4 5 6 public void remove () { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null ) { m.remove(this ); } }
remove方法,直接将ThrealLocal 对应的值从当前Thread中的ThreadLocalMap中删除。为什么要删除,这涉及到内存泄露的问题。
7.3.4 ThreadLocal与Thread和ThreadLocalMap的关系
7.3.5 常见使用场景
每个线程需要有自己单独的实例
实例需要在多个方法中共享,但不希望被多线程共享
8 线程安全集合类 8.1 ConcurrentHashMap 8.1.1 为什么hashtable慢 Hashtable之所以效率低下主要是因为其实现使用了synchronized关键字对put等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,从而使得其表现的效率低下。
8.1.2 JDK1.7版本 在JDK1.5~1.7版本,Java使用了分段锁 机制实现ConcurrentHashMap。
简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。
Segment 内部是由 数组+链表
组成的。
8.1.3 JDK1.8版本 在JDK1.7之前,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制。因此,在JDK1.8中,ConcurrentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现 。
8.2 CopyOnWriteArrayList CopyOnWriteArrayList是ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的拷贝来实现的。
8.3 BlockingQueue BlockingQueue是一个接口。java.util.concurrent.BlockingQueue;
8.3.1 定义 BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。 负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
8.3.2 方法 BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常
特定值
阻塞
超时
插入
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
移除
remove()
poll()
take()
poll(timeout, timeunit)
检查
element()
peek()
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Producer implements Runnable { protected BlockingQueue queue = null ; public Producer (BlockingQueue queue) { this .queue = queue; } @Override public void run () { try { queue.put("1" ); Thread.sleep(1000 ); queue.put("2" ); Thread.sleep(1000 ); queue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Consumer implements Runnable { protected BlockingQueue queue = null ; public Consumer (BlockingQueue queue) { this .queue = queue; } public void run () { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class BlockingQueueTest { public static void main (String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue(1024 ); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000 ); } }
8.3.3 实现类 ① ArrayBlockingQueue ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:
1 2 3 BlockingQueue queue = new ArrayBlockingQueue(1024 ); queue.put("1" ); Object object = queue.take();
或者实现泛型:
1 2 3 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024 ); queue.put("1" ); String string = queue.take();
② LinkedBlockingQueue LinkedBlockingQueue 内部以一个链式结构对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE
作为上限。同样以先进先出的顺序进行存储。
③ SynchronousQueue SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素 。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
④ PriorityBlockingQueue PriorityBlockingQueue 是一个无界 的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable
接口。