多并发中的锁

为什么需要锁?

在单线程环境下,不论怎么操作数据都不会出现什么致命的操作,但是在这个多并发的时代,我们更多的要考虑多并发,并保证多并发环境的安全性。

在并发环境下,多个线程争抢操作同一份数据,这样可能会导致数据异常,这个时候就需要一种机制来保证这一份数据的安全性。

锁是如何实现的?

在java中,每一个Object都有一把锁,这把锁放在对象的对象头中,锁中记录了当前对象被哪个线程所占用。

内存中的对象结构

对齐填充字节是为了满足java对象必须是8比特的倍数这一条件所设计的

实例数据是为了保存对象属性和方法的

对象头是为了保存对象的运行时数据,对象头包含了两部分,一个是Mark Word 一个是Class Pointer

  • Class Pointer 指向Class对象

  • Mark Word (32 bit)存放了很多和当前对象运行时状态有关的数据

    • hashCode
    • 锁状态

synchronized实现原理

synchronized被编译后会生成monitorenter和monitorexit两个字节码指令,依赖这两个指令就可以实现锁的机制,从而实现线程的同步

这是未上锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.TimeUnit;

public class TestSync {
private int count;
public void add(){
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":===>"+count++);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
TestSync ts = new TestSync();
new Thread(()->{
for (int i = 0; i < 1000; i++) {
ts.add();
}
}).start();
new Thread(()->{
for (int i = 0; i < 1000; i++) {
ts.add();
}
}).start();
new Thread(()->{
for (int i = 0; i < 1000; i++) {
ts.add();
}
}).start();
}

我们来加一下synchronized代码块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.lizhi;

import java.util.concurrent.TimeUnit;

public class TestSync {
private int count;
public void add(){
synchronized(this){
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":===>"+count++);
}
}
}

可以发现结果十分正确,我们来反编译一下,得到以下字节码文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
javap -c .\TestSync.class
Compiled from "TestSync.java"
public class com.lizhi.TestSync {
public com.lizhi.TestSync();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return

public void add();
Code:
0: aload_0
1: dup
2: astore_1
3: monitorenter
4: getstatic #2 // Field java/util/concurrent/TimeUnit.MILLISECONDS:Ljava/util/concurrent/TimeUnit;
7: ldc2_w #3 // long 30l
10: invokevirtual #5 // Method java/util/concurrent/TimeUnit.sleep:(J)V
13: goto 21
16: astore_2
17: aload_2
18: invokevirtual #7 // Method java/lang/InterruptedException.printStackTrace:()V
21: getstatic #8 // Field java/lang/System.out:Ljava/io/PrintStream;
24: invokestatic #9 // Method java/lang/Thread.currentThread:()Ljava/lang/Thread;
27: invokevirtual #10 // Method java/lang/Thread.getName:()Ljava/lang/String;
30: aload_0
31: dup
32: getfield #11 // Field count:I
35: dup_x1
36: iconst_1
37: iadd
38: putfield #11 // Field count:I
41: invokedynamic #12, 0 // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;I)Ljava/lang/String;
46: invokevirtual #13 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
49: aload_1
50: monitorexit
51: goto 59
54: astore_3
55: aload_1
56: monitorexit
57: aload_3
58: athrow
59: return
Exceptio n table:
from to target type
4 13 16 Class java/lang/InterruptedException
4 51 54 any
54 57 54 any
}

可以看到在第3行和第50行出现了monitorenter和monitorexit

Monitor

该单词的意思是监视器,可以将其想成一间宾馆的客房,把各个线程想像成客人。如果客人未退房,在这之后的客人则无法入住,只能等待这个客人退房后方可入住。

monitor是依赖于操作系统的mutex lock来实现的

java线程实际上是对操作系统线程的映射,每当挂起或者唤醒一个线程都要切换

操作系统内核态,这种操作是比较重量级的,在一些情况下甚至切换时间会超过任务本身执行的时间,这样的情况下,使用synchronized的会对系统的性能产生很严重的影响,但是从java6开始,synchronized进行了优化,引入了偏向锁和轻量级锁。

锁的四种状态

由低到高分别是:无锁、偏向锁、轻量级锁、重量级锁。

这就与Mark Word中的四种状态对应上了

注意:锁只能升级不能降级

无锁

无锁即没有对自身资源上锁,所有线程都可以拿到资源并操作

这就可能出现两种情况

  1. 某个对象不会出现在多线程环境下,或者说即使出现在多线程环境下也不会出现竞争的情况
  2. 资源会被竞争,但是我不想对资源锁定,不过这样也不行,会导致一些问题,就想通过一种其他机制来控制多线程,

比如说,如果有多个进程想修改同一个值,但是只能有一个线程修改成功,其他失败的线程需要不断尝试,直到修改成功 ,这就是CAS(Compare And Swap),CAS在操作系统中通过一条指令来实现,所以其就可以保证原子性,有了这种机制,我们就可以实现无锁编程

偏向锁

假如一个对象被加锁了,那么在实际运行过程中只有一个线程会获得这个对象锁,并只有当线程执行完任务,才会将锁释放。

我们设想的是,最好这个对象能够认识这个线程,当这个线程过来,那么这个对象就把自己的锁给他,这可以说这个对象偏爱这个线程,这就是偏向锁的概念

那么这个认识的人,被这个对象记在哪里呢?那就是在对象头中MarkWord的线程ID中,假如情况发生了变化,这个对象发现,不止有一个线程在争抢锁,那么该偏向锁会直接升级成轻量级锁

轻量级锁

那当锁升级为轻量级锁的时候,如何判断线程和所之间的绑定关系呢?可以再看下那张markword的表,这里已经不再使用线程ID这个字段了,而是将30bit变为指向线程栈中的锁记录(Lock Record)的指针

当一个线程想要获得对象的锁的时候,加入看到锁的标志位为00那么就知道它是轻量级锁,这时候线程会在自己的虚拟机栈中开辟一块被称为Lock Record的空间(线程私有的哦),该控件用来存放对象头中的Mark word的副本以及owner指针,线程通过CAS去尝试获得锁,一旦获取那么将会复制该对象头中的markword到本线程的Lock record中并且将Locker record中的owner指针指向该对象,并且对象的前三十位将会生成一个指针,指向线程虚拟机栈中的Lock Record,这样一来就实现了线程和对象锁的绑定。

当线程已经被抢占了,那其他想抢占的线程会自旋等待(可以理解为轮询),不断尝试拿到锁,这种操作区别于被操作系统挂起阻塞,如果对象的锁很快就被释放的话,自旋就不需要进行系统中断和重新恢复,所以它的效率会更高。

自旋相当于CPU空转,如果CPU空转时间过长,性能也会产生损耗,然后就有了一种优化,就是适应性自旋。简单来说,就是自旋的时间会根据上一次在同一个锁上自旋的时间以及锁的状态来决定的

重量级锁

如果同时有多个线程要获得这个对象锁,也就是说一旦自旋等待的线程超过一个,则会升级为重量级锁,那么这个时候,这就需要使用monitor来对线程进行控制,此时将会完全锁定资源,对线程的管控也更为严格

无锁编程

java中的synchronized就是悲观锁(Pessimistic Concurrency Control),什么叫悲观锁呢?

简单来说就是操作系统会悲观地认为,如果不严格同步线程调用,那么一定会产生异常,所以互斥锁会锁定资源,直供一个线程使用,而阻塞其他线程,因此叫悲观锁,但是悲观锁不是万能的,如果大部分操作都是读的操作,那么就没有必要在每次调用的时候都锁定资源。可能会出现同步代码块执行的耗时远远小于线程切换的耗时。使用悲观锁,会导致性能十分不容乐观,比如大量用户需要同时读取同一份资源,如果使用的是悲观锁,则同一时间只能有一个用户可以查看,其他用户都在一直白屏,但是!其实对于读的操作,即使是多个线程同时操作该数据,也不会对该数据造成影响,我们能不能在不锁定资源的情况下,也可以对线程调用进行一些协调呢?这时候就出现了乐观锁这种机制,实现原理就是CAS(Compare And Swap)

CAS

我们来举个栗子:人就是线程,厕所就是对象,该对象有一个值来表示是否被占用occupy 0表示空闲 1表示有人

很多人要去抢厕所,然后线程A(Old Value = 0 ,new Value =1)、B(Old Value = 0,new Value = 1)这两个人冲到了最前面,然后A率先和厕所的occupy比较(Compare)发现和oldValue一致,则将oldValue与厕所的occupy交换(And Swap),这时候B一看自己的oldValue与厕所的occupy(1)不一致,他很不甘心,但也只能在厕所外踱步(自旋),通常在等了一段时间后(设置最大自旋次数),骂出一句国粹后就离去了

这时候可能就出现了问题,因为Compare和Swap是两个操作,没有原子性,这就有可能出现,当A即将将值改为1的那一刹那,B抢到了时间片,将其改为了1,然后A也将其改为1,这就很尴尬了,两个人一起进入了厕所。

那么如何保证CAS的原子性呢?难道还是锁来实现同步吗?这不就是一个鸡生蛋蛋生鸡的问题了吗?

其实啊,各种不同架构的CPU都提供了指令级别的CAS原子操作,

比如在X86架构下,通过cmpxchg指令可以支持CAS

在ARM下,通过LL/SC来实现CS……

也就是说不通过操作系统的同步原语(比如 mutex),CPU已经实现了CS,上层只需要调用即可,这样我们就可以不依赖锁来进行线程同步,但是这也不意味着无锁可以完全代替有锁

利用CAS特性来进行无锁编程

需求:使用三条线程,将一个值,从0累加到1000

1、错误

1
2
3
4
5
6
7
8
9
10
11
12
public class Application2 {
static int count =0 ;
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
while(count<1000){
System.out.println(Thread.currentThread().getName()+":==>"+count++);
}
}).start();
}
}
}

2、使用synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Application2 {
static int count =0 ;
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
while(count<1000){
synchronized (Application2.class){
System.out.println(Thread.currentThread().getName()+":==>"+count++);
}
}
}).start();
}
}
}

3、无锁

1
2
3
4
5
6
7
8
9
10
11
12
public class Application2 {
static AtomicInteger count =new AtomicInteger(0) ;
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
while(count.get()<1000){
System.out.println(Thread.currentThread().getName()+":==>"+count.getAndIncrement());
}
}).start();
}
}
}

探究源码

AtomicInteger的getAndIncrement方法

1
2
3
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}

U的getAndAddInt方法

1
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

U是一个操作底层的类

1
2
3
4
5
6
7
8
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
1
2
3
4
5
6
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
int expected,
int x) {
return compareAndSetInt(o, offset, expected, x);
}
1
2
3
4
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
int expected,
int x);

启动次数可以通过启动参数来配置,默认是10,所以不会出现死循环

Unsafe

给作者买杯咖啡吧~~~