0%

AQS原理与Lock

AQS学习

AQS介绍

  • AQS的全称是 AbstractQueuedSychronizer抽象队列同步器,位于juc.locks包下

    • 整个juc.locks.lock接口的实现类基本上除了文章末尾补充的StampedLock之外,其余都是借助AQS实现同步功能
  • AQS 是一个用来构建锁和同步器的框架,比如常见的 ReentrantLock,Semaphore(信号量),其他的诸如ReentrantReadWriteLock,CountDownLatch,FutureTask(jdk1.7) 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器,只要子类实现它的几个protected方法就可以了

    • 所谓同步器就是为了实现线程同步而设计的组件,锁,信号量都可以视为同步器;线程同步的概念就是:线程之间按照一定的顺序执行
    • 并发编程的两大问题
      • 线程通信(数据交互)
      • 线程同步(线程的执行顺序)
    • FutureTask在jdk1.7时依赖AQS构建,在JDK1.8后改为使用CAS+state变量的维护+WaitNode类型的链表来维护等待的线程—-可以说继承了AQS的思想
  • AQS的原理:

    • AQS 核心思想是,使用CAS更改state状态变量来申请锁,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将state变量设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

      CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现线程的等待以及后续的唤醒和锁的分配机制

      实际上还维护了一个条件队列,专门为Condition服务的

      对列的操作以及节点状态的更新也使用到了CAS进行更新

      image-20210127091233916
    • AQS的特征

      • 抽象类,所以说其是一个用来实现锁和同步器的框架
      • 使用队列维持锁的分配机制
        • 有一点需要搞清楚的是:线程等待队列起到的作用绝不仅仅是让线程排队等待,更重要的是其通过维持一个线程队列来实现的后续的线程的唤醒与锁分配的机制,这个的这一套逻辑是作为模板实现在了AQS类中,其余的锁状态的维护,获得与释放锁的执行都交给子类调用模板方法去实现了
      • 使用此框架可以构建线程同步器
    • 使用了AQS的同步器(因为AQS是抽象类,可以从继承关系中得出哪些同步器使用了AQS的架构)

      • ReentrantLock
      • ReentrantReadWriteLock
      • Semaphore
      • CountDownLatch
      • Worker in ThreadPoolExecutor

AQS作用机制

  • 使用内部定义的volatile变量state作为资源状态标识符

    • 从这里也可以瞥见基于juc.locks.Lock接口实现的锁与synchronzized的一个很大的不同:synchronized使用的锁对象是一个Java对象,线程对于锁的竞争实际上是对对象监视器的竞争,而在基于AQS实现的同步器中,或者说基于juc.locks.Lock实现的锁都是竞争更改(CAS)这个state变量的值来实现对于锁的获取与释放

      • 因为state作为一个int类型的变量,其取值是很灵活的,所以Synchronized只能实现独占锁,但是AQS可以实现独占锁(只能请求一个资源),也可以方便的实现可重入(state作为计数器);或者是共享锁(一次性申请多个资源(state>1),使多个线程并行)
    • 提供三个fina protected原子方法来实现资源状态的修改

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      private static final Unsafe unsafe = Unsafe.getUnsafe();
      private static final long stateOffset;
      private volatile int state;


      static {
      try {
      stateOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
      headOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
      tailOffset = unsafe.objectFieldOffset
      (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
      waitStatusOffset = unsafe.objectFieldOffset
      (Node.class.getDeclaredField("waitStatus"));
      nextOffset = unsafe.objectFieldOffset
      (Node.class.getDeclaredField("next"));

      } catch (Exception ex) { throw new Error(ex); }
      }



      // 只有一句volatile变量的读,是原子方法
      protected final int getState() {
      return state;
      }
      // 只有一句volatile变量的写,是原子方法
      // 在可重入特性的设计中使用
      protected final void setState(int newState) {
      state = newState;
      }
      // 使用CAS原子更新
      protected final boolean compareAndSetState(int expect, int update) {

      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
      }
      • setStatecompareAndSetState都是原子方法,二者的区别在于后者的写是有条件的,或者说,setState是在当前线程就是锁拥有者时设置state的值(比方说ReentrantLock中设置可重入特性时的代码),而compareAndSetState是在当前线程不是锁拥有者时尝试设置state时使用
  • 作为一个典型的模板方法设计模式的使用案例,AQS只负责实现排队和阻塞的机制,比如线程等待队列的维护(如获取资源失败入队/唤醒出队等);资源状态(锁状态)的维护的部分交给子类去实现——-包括公平性与非公平性、是否可重入,都在子类的实现中得以表达

    • AQS中的几个需要子类实现的核心方法
      • isHeldExclusively():该线程是否正在独占资源。只有用到Condition时才需要去实现它。
      • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
      • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
      • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源(相当于返回剩余的可分配锁资源的线程的数量)。
      • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    • 方法默认的实现是抛出一个UnsupportedOperationException,所以如果子类不实现并直接使用的话会直接抛出异常
      • 没有使用abstract修饰的原因在于,子类并不一定需要实现全部的方法,只需关注自己要实现的功能去实现即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法
    • 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞(暂时不知晓这样的原因)
    • AQS除了上述的方法外其余的都是final方法,是子类直接用的方法,不允许复写,只复写以上的几个关键方法足矣
  • 使用AQS构建的同步器实际上主要实现以上的5个方法即可(即资源的状态维护),至于具体的线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了

    • 上述5个方法并不是锁获取与释放的入口方法(入口方法定义在AQS中),但是会被入口方法直接调用,子类的同步器的特性包括公平性与是否重入的特性,都在上述方法中进行设计

AQS中的资源共享模式

  • 独占模式

    • 资源独占,资源(锁)一次只能由一个线程获取,典型的锁都是这样的,比如ReentrantLockThreadLocalPool中的Worker等等
    • 又可分为公平锁和非公平锁(后边有以ReentrantLock介绍实现方式)
      • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
      • 非公平锁:当线程要获取锁时,无视已经在排队的线程的队列顺序直接去抢锁,谁抢到就是谁的
    • 也可以分为重入锁与不可重入锁(后文以ReentrantLock与ThreadPoolExecutor中的Worker对比观察具体实现
  • 共享模式

    • 同时可以被多个线程获取,具体的资源个数(支持并发线程的数量)可以通过参数指定。如SemaphoreCountDownLatchCyclicBarrier
  • 混合模式

    • 也就是独占模式与共享模式的混合,比如ReentrantReadWriteLock
      • 读写锁中的读锁可以同时被多个线程占据(共享模式),写锁同时只能被一个线程持有(独占模式)

线程队列的实现

等待队列

  • 队列的实现就是其队列节点类Node的实现(等待的线程被包装成Node存储在CLH队列中)中,查看其源码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    static final class Node {

    /** Marker to indicate a node is waiting in shared mode */
    // 标记一个结点(对应的线程)在共享模式下等待
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标记一个结点(对应的线程)在独占模式下等待
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    // waitStatus的值,表示该结点(对应的线程)已被取消
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // waitStatus的值,表示后继结点(对应的线程)需要被唤醒
    static final int SIGNAL = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // waitStatus的值,表示该结点(对应的线程)在等待某一条件
    static final int CONDITION = -2;
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
    // 自己理解是,在共享模式下,链式的唤醒后续的节点来获得资源
    static final int PROPAGATE = -3;

    /**
    * Status field, taking on only the values:
    * SIGNAL: The successor of this node is (or will soon be)
    * blocked (via park), so the current node must
    * unpark its successor when it releases or
    * cancels. To avoid races, acquire methods must
    * first indicate they need a signal,
    * then retry the atomic acquire, and then,
    * on failure, block.
    * CANCELLED: This node is cancelled due to timeout or interrupt.
    * Nodes never leave this state. In particular,
    * a thread with cancelled node never again blocks.
    * CONDITION: This node is currently on a condition queue.
    * It will not be used as a sync queue node
    * until transferred, at which time the status
    * will be set to 0. (Use of this value here has
    * nothing to do with the other uses of the
    * field, but simplifies mechanics.)
    * PROPAGATE: A releaseShared should be propagated to other
    * nodes. This is set (for head node only) in
    * doReleaseShared to ensure propagation
    * continues, even if other operations have
    * since intervened.
    * 0: None of the above
    *
    * The values are arranged numerically to simplify use.
    * Non-negative values mean that a node doesn't need to
    * signal. So, most code doesn't need to check for particular
    * values, just for sign.
    *
    * The field is initialized to 0 for normal sync nodes, and
    * CONDITION for condition nodes. It is modified using CAS
    * (or when possible, unconditional volatile writes).
    */
    // 当前节点对应线程的等待状态,取值范围,-3,-2,-1,0,1
    // 正常的一般节点,此值为0
    volatile int waitStatus;

    /**
    * Link to predecessor node that current node/thread relies on
    * for checking waitStatus. Assigned during enqueuing, and nulled
    * out (for sake of GC) only upon dequeuing. Also, upon
    * cancellation of a predecessor, we short-circuit while
    * finding a non-cancelled one, which will always exist
    * because the head node is never cancelled: A node becomes
    * head only as a result of successful acquire. A
    * cancelled thread never succeeds in acquiring, and a thread only
    * cancels itself, not any other node.
    */
    // 前驱结点
    volatile Node prev;

    /**
    * Link to the successor node that the current node/thread
    * unparks upon release. Assigned during enqueuing, adjusted
    * when bypassing cancelled predecessors, and nulled out (for
    * sake of GC) when dequeued. The enq operation does not
    * assign next field of a predecessor until after attachment,
    * so seeing a null next field does not necessarily mean that
    * node is at end of queue. However, if a next field appears
    * to be null, we can scan prev's from the tail to
    * double-check. The next field of cancelled nodes is set to
    * point to the node itself instead of null, to make life
    * easier for isOnSyncQueue.
    */
    // 后继结点
    volatile Node next;

    /**
    * The thread that enqueued this node. Initialized on
    * construction and nulled out after use.
    */

    // 结点对应的线程
    volatile Thread thread;

    /**
    * Link to next node waiting on condition, or the special
    * value SHARED. Because condition queues are accessed only
    * when holding in exclusive mode, we just need a simple
    * linked queue to hold nodes while they are waiting on
    * conditions. They are then transferred to the queue to
    * re-acquire. And because conditions can only be exclusive,
    * we save a field by using special value to indicate shared
    * mode.
    */
    // 等待队列里下一个结点,SHARED用来标识当前等待的节点是处于共享模式,EXCLUSIVE标识当前等待的节点处于独占模式
    // 此属性也会在Condition条件队列中使用,其值一般是一个普通的Node节点
    Node nextWaiter;

    /**
    * Returns true if node is waiting in shared mode.
    */
    // 判断模式
    final boolean isShared() {
    return nextWaiter == SHARED;
    }

    /**
    * Returns previous node, or throws NullPointerException if null.
    * Use when predecessor cannot be null. The null check could
    * be elided, but is present to help the VM.
    *
    * @return the predecessor of this node
    */


    // 返回前驱节点,如果为空会报异常
    final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }
    // Used to establish initial head or SHARED marker
    Node() {
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
    }
    }
    • 结尾的三个构造函数分别用在不同的地方

      • 空参数的用在初始化列表的头

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        /**
        * 1.把node插入到列表尾部
        * 2.初始化列表
        */
        private Node enq(final Node node) {
        for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
        if (compareAndSetHead(new Node()))
        tail = head;
        } else {
        node.prev = t;
        if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
        }
        }
        }
        }
      • 第二个用于addWaiter

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        // 在等待队列中向尾部添加节点,可以指定资源共享模式
        private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
        }
        }
        enq(node);
        return node;
        }
      • 第三个用于条件队列中

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
        firstWaiter = node;
        else
        t.nextWaiter = node;
        lastWaiter = node;
        return node;
        }

条件队列

  • 通过内部类ConditionObject封装 维护的是条件队列 (单向队列),复用Node节点类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
    * Creates a new {@code ConditionObject} instance.
    */
    public ConditionObject() { }

    // ...
    }
    • 同样维护了头结点与尾节点,但实际上是单向队列,使用Node种的nextWaiter属性做后向指针

队列总结

  • AQS中实际维护了两个队列
    • CLH队列,维护的是等待队列,单纯是因为没有竞争到锁而阻塞在队列中,需要意识到的是,无论是共享模式还是独占模式的节点都存储在一个等待队列中,以nextWaiter属性作为其模式的区分
      • 内部维护了头尾两个节点指针
      • 队列成员不是线程实例,而是用线程实例包装成的Node(静态内部类)对象
        • Node对象
          • 维护了资源共享模式
          • 维护了对应线程的状态
          • 维护了前驱与后继节点(双向队列)
    • 通过内部类ConditionObject封装 维护的是条件队列 (单向队列)
      • nextWaiter属性在此单向队列中被用作为后继指针
      • 最典型的应用的就是ReentrantLock 中的Condition(实现线程的等待与通知)

使用AQS获取与释放锁的原理

关于模板方法设计模式的再描述

  • 在自定义同步组件(即AQS的子类)中,调用AQS的模板方法作为入口方法,而这些模板入口方法会调用子类实现的方法实现自定义逻辑与顶层逻辑的组合

    • 举例来说就是acquire方法与tryAcquire方法,自定义组件中调用AQS的模板方法acquire,此方法会直接调用AQS实现类中的tryAcquire方法

    模板方法模式是一种行为设计模式,它定义一个操作中的算法的骨架,而将一些步骤延迟到子类中。 模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤的实现方式

    模板方法模式是基于”继承“的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码。举个很简单的例子假如我们要去一个地方的步骤是:购票buyTicket()->安检securityCheck()->乘坐某某工具回家ride()->到达目的地arrive()。我们可能乘坐不同的交通工具回家比如飞机或者火车,所以除了ride()方法,其他方法的实现几乎相同。我们可以定义一个包含了这些方法的抽象类,然后用户根据自己的需要继承该抽象类然后实现ride()方法。

独占模式

独占模式获取锁

  • 获取资源的入口方法:acquire

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    // 入口方法
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 如果最终自旋轮到当前线程了,可能线程被外部中断,就只能自行执行中断程序
    // 或者是一开始就拿到了资源,也去首先检查有没有被外部中断
    selfInterrupt();
    }


    // 将当前线程插入到等待队列当中
    private Node addWaiter(Node mode) {
    // 将当前线程包装为Node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    // 使用CAS尝试,如果成功就直接返回
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    // 如果等待队列为空(队列还没有进行初始化),首次CAS尝试失败后进入到自旋尝试插入
    enq(node);
    // 直到自旋插入成功后,才会返回
    return node;
    }



    // 自旋插入节点,如果队列为空还没有初始化,就执行初始化
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }



    // 设定等待队列中的线程状态
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    // 自旋
    for (;;) {
    // 获取队列中的前一个节点
    // 注意不用判断前一个节点是不是null,predecessor方法已经进行了判断
    // 如果前向节点为空,直接抛出异常跳到finally代码块中
    final Node p = node.predecessor();
    // 如果节点的前一个节点是Head,就可以重新尝试获取资源了 tryAcquire
    // 需要注意的是,这里仅仅是尝试获取,并不是一定获取到,其原因就在于非公平锁导致了其他线程可能截胡
    if (p == head && tryAcquire(arg)) {
    // 当前线程排到了队列的头部,可以尝试重新获取资源,并且资源获取成功了
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    // 如果还没轮到自己去重新尝试获取资源就可以休息了,进入到waiting状态,直到前一个线程执行获得锁并释放锁执行unpark方法,唤醒自己
    // parkAndCheckInterrupt 内部使用到了LockSupport.park(this)
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    // 前继节点为空,此节点不再尝试获取锁
    if (failed)
    cancelAcquire(node);
    }
    }
    • arg参数表示要获取的资源的个数,在独占模式下只能为1

      • 对于独占式来说,资源(锁)同时只能由一个线程把持,所以初始化值必须是1,但是对于可重入锁来说,再次尝试获取锁时可以累加这个值
    • 对于不用的Node节点,比如已经成功获取锁的节点,或者直接进入CANCEL状态的节点,不用的属性直接设置为null,以帮助垃圾回收

    • 等待队列中的Node线程的状态的维护是通过acquireQueued方法实现的(很重要的但是也比较难以理解的一个函数)

      • AQS 实现的同步器(无论是独占还是共享)中,处于等待队列中的线程通过LockSupport.park方法进入waiting状态,并等待unpark唤醒(unpark方法将有当前线程排队的前一个线程作为头部线程获取锁再释放锁后执行unpark,唤醒本线程)

        LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数

        • park(boolean isAbsolute, long time):阻塞当前线程
        • unpark(Thread jthread):使给定的线程停止阻塞
      • 当线程首次执行此方法,如果线程没有排到队列的头部,会执行park方法,进入阻塞状态

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
        }
        public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        // 当前线程执行到此处时,就已经进入了挂起状态,需要等待unpark唤醒
        UNSAFE.park(false, 0L);
        // unpark唤醒后,恢复blocker
        setBlocker(t, null);
        }
        // 被阻塞后,根本不会执行到此方法,唤醒后才会执行此方法,判断线程是否被中断了,因为申请的是不可中断的资源,所谓不可中断意思就是线程被外部通知中断后,不会报异常,而是会自行执行中断方法,即返回true,否则一切正常返回false
        public static boolean interrupted() {
        return currentThread().isInterrupted(true);
        }

        // 唤醒后重新进入到for循环中,如果当前线程还不是头部线程,则可能继续进入阻塞,如果当前是头部线程,但是因为非公平状态下,被其他线程截胡,那么也可能重新进入阻塞,或者当前线程称为头部线程,但是被外部通知了中断,所以返回true,在acquire方法中执行中断
      • 当前是头部线程,但是因为非公平状态下,被其他线程截胡,那么也可能重新进入阻塞

    • 除了acquire之外还有类似的获取资源的方法

      • tryAcquireNanos:带超时时间的锁申请比如ReentrantLocktryLock方法

      • acquireInterruptibly:申请可中断的资源(独占模式)

      • acquireShared:申请共享模式的资源(共享模式)

      • acquireSharedInterruptibly:申请可中断的资源(共享模式)

        可中断的意思是,在线程中断时可能会抛出InterruptedException

        对于可中断资源的申请,用来实现锁的申请锁过程可中断这种特性,而不是像传统的锁一样,申请不到就一直挂着,当然对于传统的一直挂起的改进还有超时设置

    • acquire获取资源的流程图

      image-20210712171034161

独占模式释放锁

  • 释放资源的入口方法 release

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // 独占式的资源释放
    public final boolean release(int arg) {
    // tryRelease由子类实现
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }




    // 唤醒队列中后继的一个阻塞线程
    private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    //如果状态是负数,尝试把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    // 得到头结点的后继结点head.next
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于0
    // 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
    if (s == null || s.waitStatus > 0) {
    s = null;
    // 等待队列中所有还有用的结点,都向前移动,找到最近的一个还没有被取消的节点
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    // 唤醒节点
    LockSupport.unpark(s.thread);
    }
    • release入口方法执行子类实现的tryRelease方法来执行锁的释放
    • 主要是unparkSuccessor方法的执行,从头结点开始找后继节点中最近的一个未被取消的节点执行唤醒,调用LockSupport。unpark方法,线程被唤醒后,就对应上前边的acquireQueued方法中的内容了
      • 为什么是从头结点开始找,而不是从当前节点开始找呢,个人理解是,当前节点可能是非公平条件下截胡的,所以得从头开始找

共享模式

共享模式获取锁

  • 入口方法acquireShared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final void acquireShared(int arg) {
// 若返回值小于0表示已经没有配额了,若为大于等于0说明还有配额,并已经分配了配额,直接返回
if (tryAcquireShared(arg) < 0)
// 当前已经没配额,准备入队列了
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
// 加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 又可以获得配额了
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 判断挂起的时候有没有被通知中断
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 与独占模式下一样
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • 注意此时arg参数就可以是比1大的数了,因为共享锁可以被多个线程占用
  • 基本上与独占模式类似,因此不细分析,最大的不同在于setHeadAndPropagate函数的执行,在独占模式下,如果当前线程获取锁了,直接设置为头节点就完事了,但是共享模式下,如果配额大于0,就会持续的向后继节点传递唤醒(唤醒的方式是执行doReleaseShared方法,进行锁的释放)以获取配额,直到配额重新为0

共享模式释放锁

  • 入口方法releaseShared
1
// 共享式的资源释放// 一般参数为1public final boolean releaseShared(int arg) {  			// 允许向后传递信号,进行唤醒        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }private void doReleaseShared() {  /*         * Ensure that a release propagates, even if there are other         * in-progress acquires/releases.  This proceeds in the usual         * way of trying to unparkSuccessor of head if it needs         * signal. But if it does not, status is set to PROPAGATE to         * ensure that upon release, propagation continues.         * Additionally, we must loop in case a new node is added         * while we are doing this. Also, unlike other uses of         * unparkSuccessor, we need to know if CAS to reset status         * fails, if so rechecking.         */  for (;;) {    Node h = head;    if (h != null && h != tail) {      int ws = h.waitStatus;      if (ws == Node.SIGNAL) {        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))          continue;            // loop to recheck cases        unparkSuccessor(h);      }      else if (ws == 0 &&               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))        continue;                // loop on failed CAS    }    if (h == head)                   // loop if head changed      break;  }}// 唤醒队列中后继的一个阻塞线程private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */  			//如果状态是负数,尝试把它设置为0        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        /*         * Thread to unpark is held in successor, which is normally         * just the next node.  But if cancelled or apparently null,         * traverse backwards from tail to find the actual         * non-cancelled successor.         */  			// 得到头结点的后继结点head.next        Node s = node.next;  			// 如果这个后继结点为空或者状态大于0    		// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消        if (s == null || s.waitStatus > 0) {            s = null;          // 等待队列中所有还有用的结点,都向前移动,找到最近的一个还没有被取消的节点            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        if (s != null)            // 唤醒节点            LockSupport.unpark(s.thread);    }
  • 需要注意的是在共享模式下,releaseShared返回false未必意味着解锁失败,而只是不允许向队列的后续节点传递信号以唤醒节点获取配额罢了,实际上在CountDownLatch中返回值并没有影响
  • doReleaseShared不断的尝试唤醒后继的节点,直到确实有节点成功获取配额,并使自己更新为头结点

总结

  • 对于waitStatus属性的作用还是没有理清楚,后续有时间再说…..

使用AQS构建的常用组件

ReentrantLock

  • 特征:

    • 可重入锁,其可重入的特性的实现就是通过使用锁状态标志字段state作为计数器
    • 使用AQS的独占模式
    • 支持公平锁与非公平锁(默认是非公平锁,非公平锁的性能更好,吞吐量更大,响应也更快,但是容易出现饥饿线程的问题
  • ReentrantLock在其他源码中的使用

    • JDK1.8之前的ConcurrentHashMap中的Segment是继承ReentrantLock实现锁的
    • CopyOnWriteArrayList使用ReentrantLock进行加锁
    • CyclicBarrier使用ReentrantLock(与Condition)进行加锁
    • ArrayBlockingQueue、LinkedBlockingQueue使用ReentrantLock进行加锁

通过ReentrantLock的源码学习公平锁与非公平锁的实现

  • 源码分析:

    • 通过构造器传参来决定使用哪种锁

      1
      // NonfairSync与FairSync都是内部类public ReentrantLock() {  sync = new NonfairSync();}public ReentrantLock(boolean fair) {  sync = fair ? new FairSync() : new NonfairSync();}
    • 公平锁的lock

      1
      static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        final void lock() {            // 参数中的1是新的资源状态值,默认未加锁的资源状态值为0            acquire(1);        }        /**         * Fair version of tryAcquire.  Don't grant access unless         * recursive call or no waiters or is first.         */  			// 独占方式。尝试获取资源,成功则返回true,失败则返回false        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 获取资源状态            int c = getState();            // 资源未被占用            if (c == 0) {                // 和非公平锁相比,这里多了一个判断:是否有线程在等待                // 若没有等待的,则设置加锁: 设置状态为1并设置当前线程为ownerThread                // 若有等待的则当前线程设置为休眠----即直接返回false                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 若资源被占用,但是是当前线程的重入,state++\            // 若不是当前线程的重入,则直接返回false            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }// AQS/* * 尝试获取锁,若获取失败则添加到等待队列中,线程进入休眠 */public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }// Sync AQS的实现abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        /**         * Performs {@link Lock#lock}. The main reason for subclassing         * is to allow fast path for nonfair version.         */        abstract void lock();        /**         * Performs non-fair tryLock.  tryAcquire is implemented in         * subclasses, but both need nonfair try for trylock method.         */        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }				// 独占方式。尝试释放资源,成功则返回true,失败则返回false        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }				// 该线程是否正在独占资源。只有用到condition才需要去实现它        protected final boolean isHeldExclusively() {            // While we must in general read state before owner,            // we don't need to do so to check if current thread is owner            return getExclusiveOwnerThread() == Thread.currentThread();        }        final ConditionObject newCondition() {            return new ConditionObject();        }        // Methods relayed from outer class        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        final boolean isLocked() {            return getState() != 0;        }        /**         * Reconstitutes the instance from a stream (that is, deserializes it).         */        private void readObject(java.io.ObjectInputStream s)            throws java.io.IOException, ClassNotFoundException {            s.defaultReadObject();            setState(0); // reset to unlocked state        }    }
    • 非公平锁的lock

      1
      static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        /**         * Performs lock.  Try immediate barge, backing up to normal         * acquire on failure.         */        final void lock() {            // 不管是否有人排队,直接CAS设置,设置成功说明线程没人用,随机顺利拿下使用权            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                // 设置失败则尝试走类似公平锁的道路                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }// 非公平锁final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                // 同样不管是否有排队的,这是与公平锁的唯一区别                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            // 对于重入的支持            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            // 非公平锁获取失败后也得排队等            return false;        }
  • 公平锁与非公平锁在具体实现时的不同之处

    • 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了,公平锁调用lock函数则是直接调用acquire方法
    • 非公平锁在 CAS 失败后,和公平锁一样都会进入到acquire方法,然后调用tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
    • 如果非公平锁两次 CAS 都不成功(没错,非公平锁会有两次CAS尝试),那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
    • 相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

通过ReentrantLock与线程池的Worker对比学习可重入锁与不可重入锁的实现

  • Worker的实现源码

    1
    protected boolean tryAcquire(int unused) {  if (compareAndSetState(0, 1)) {    setExclusiveOwnerThread(Thread.currentThread());    return true;  }  return false;}
    • 实际上Worker的实现并没有特殊之处,还是得关注ReentrantLock是怎么实现可重入的
    • Worker也是非公平的
  • ReentrantLock(以公平锁为例)

    1
    protected final boolean tryAcquire(int acquires) {  final Thread current = Thread.currentThread();  int c = getState();  // 资源未被占用  if (c == 0) {    if (!hasQueuedPredecessors() &&        compareAndSetState(0, acquires)) {      setExclusiveOwnerThread(current);      return true;    }  }  // 若资源被占用,但是是当前线程的重入,state++\  // 若不是当前线程的重入,则直接返回false  else if (current == getExclusiveOwnerThread()) {    int nextc = c + acquires;    if (nextc < 0)      throw new Error("Maximum lock count exceeded");    setState(nextc);    return true;  }  return false;}// 对应的锁释放时也有关于重入的逻辑protected final boolean tryRelease(int releases) {  int c = getState() - releases;  if (Thread.currentThread() != getExclusiveOwnerThread())    throw new IllegalMonitorStateException();  boolean free = false;  if (c == 0) {    free = true;    setExclusiveOwnerThread(null);  }  setState(c);  return free;}
    • 关键在于在判断资源确实已经被占用后,还要再判断当前占用锁的线程是否就是当前线程,如果是,在对state执行累加以实现重入计数,并返回true
    • 对应的解锁,直接对state执行递减,并判断state是否为0,若为0则是完全释放了锁,返回true,并设置当前独占线程为null,否则还没有完全释放,直接返回false,当前独占线程仍是当前线程,state仍为非0,此时其他线程依然无法获得锁

synchronized与ReentrantLock的区别

  • 谈谈 synchronized 和 ReentrantLock 的区别

    1. 相同点,两者都是可重入锁

      “可重入锁” 指的是自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果不可锁重入的话,就会造成死锁。同一个线程每次获取锁,锁的计数器都自增 1,所以要等到锁的计数器下降为 0 时才能释放锁

      以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的

    2. Lock同步锁是基于Java实现的(JDK层面),而synchronized是基于底层操作系统的Mutex Lock(互斥锁)实现的(JVM层面的实现),每次获取和释放锁都会带来用户态和内核态的切换,从而增加系统的性能开销,但是在JDK 1.6后,Java对synchronized同步锁做了充分的优化,甚至在某些场景下,它的性能已经超越了Lock同步锁

      • ReentrantLock是基于AQS实现的
    3. 相比于synchronized,ReentrantLock增加了三个高级功能

      • 等待可中断,以及可设置超时 : ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。

      • 可实现公平锁 : ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来制定是否是公平的。

      • 可实现选择性通知(锁可以绑定多个条件)本质上应该就是线程同步的一种实现方式阻塞+通知): synchronized关键字与wait()notify()/notifyAll()(都是Object的方法,也就是对象锁要调用的方法)方法相结合可以实现等待/通知机制。ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition()方法。

        主要就是多路通知的功能,详见ConditionDemo

        Condition是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器)线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。而synchronized关键字就相当于整个 Lock 对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程这样会造成很大的效率问题,而Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中的所有等待线程

        关于Condition机制的使用可以参考ThreadPoolExecutor类中的awaitTermination方法

    4. 实际上使用Lock接口的各种锁还有一点比较麻烦的就是,必须先获取锁,然后将同步代码放到try-finally代码块中,并且必须在finally中释放锁,而synchronized的异常条件下的锁的释放是自动维护的(从字节码可以看到)

Condition的源码分析

  • Condition本身是个接口,其只有两个实现类

    • AQS类的内部类ConditionObject最常用
    • AbstractQueuedLongSynchronizer的内部类ConditionObject
    • 以上两个实现类内部都维护着AQS内部两队列之一的条件队列
  • 下边从Condition的主要方法进行源码分析

    • awaitawait方法会完全释放锁资源,但是此所谓释放仅仅是更改锁状态而已…你懂的

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      // 从方法声明看,显然此方法的阻塞可以通过signall方法或者外部中断来唤醒
      public final void await() throws InterruptedException {
      if (Thread.interrupted())
      throw new InterruptedException();
      // 向条件队列添加节点
      Node node = addConditionWaiter();
      // 完全释放锁资源
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      // 如果当前节点不是等待队列中的节点,或者说如果当前节点是条件队列中的节点的话,进入循环
      while (!isOnSyncQueue(node)) {
      // 阻塞当前线程,等待Condition.singall调用unpark方法来唤醒
      LockSupport.park(this);
      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
      }
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
      if (node.nextWaiter != null) // clean up if cancelled
      unlinkCancelledWaiters();
      if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);
      }


      // 向条件队列添加节点
      private Node addConditionWaiter() {
      Node t = lastWaiter;
      // If lastWaiter is cancelled, clean out.
      if (t != null && t.waitStatus != Node.CONDITION) {
      unlinkCancelledWaiters();
      t = lastWaiter;
      }
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      if (t == null)
      firstWaiter = node;
      else
      t.nextWaiter = node;
      lastWaiter = node;
      return node;
      }


      final int fullyRelease(Node node) {
      boolean failed = true;
      try {
      int savedState = getState();
      if (release(savedState)) {
      failed = false;
      return savedState;
      } else {
      // 应该能保证可以释放资源,因为当前线程是锁的占有者
      // 否则会抛出异常
      throw new IllegalMonitorStateException();
      }
      } finally {
      if (failed)
      node.waitStatus = Node.CANCELLED;
      }
      }

      // 判断当前节点是否是等待队列的节点
      final boolean isOnSyncQueue(Node node) {
      // 如果当前节点的状态是CONDITION或者node的前向节点为空,则表示其肯定位于条件队列,这是因为等待队列的头结点一直保持非空
      if (node.waitStatus == Node.CONDITION || node.prev == null)
      return false;
      // 如果有后继节点的话,那么一定位于等待队列而不是条件队列,因为条件节点中的后继节点是用nextWaiter维护的
      if (node.next != null) // If has successor, it must be on queue
      return true;
      /*
      * node.prev can be non-null, but not yet on queue because
      * the CAS to place it on queue can fail. So we have to
      * traverse from tail to make sure it actually made it. It
      * will always be near the tail in calls to this method, and
      * unless the CAS failed (which is unlikely), it will be
      * there, so we hardly ever traverse much.
      */
      // 从等待队列的尾部开始找是否存在当前node,如果找到了返回true,找不到返回false
      return findNodeFromTail(node);
      }
    • signal方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      public final void signal() {
      if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      if (first != null)
      doSignal(first);
      }

      private void doSignal(Node first) {
      do {
      // 从等待队列的头部开始,找到合适的唤醒节点
      if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
      first.nextWaiter = null;
      // 如果返回false,说明该节点已经被取消,则判断队列中的下一个节点
      } while (!transferForSignal(first) &&
      (first = firstWaiter) != null);
      }

      final boolean transferForSignal(Node node) {
      /*
      * If cannot change waitStatus, the node has been cancelled.
      */
      if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      return false;

      /*
      * Splice onto queue and try to set waitStatus of predecessor to
      * indicate that thread is (probably) waiting. If cancelled or
      * attempt to set waitStatus fails, wake up to resync (in which
      * case the waitStatus can be transiently and harmlessly wrong).
      */
      Node p = enq(node);
      int ws = p.waitStatus;
      if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      // 调用unpark方法进行唤醒
      LockSupport.unpark(node.thread);
      return true;
      }
      • 显然执行此方法的线程应当是持有锁的线程,所以需要使用isHeldExclusively做判定
      • 选取条件队列的头节点,也就是等待时间最长的那个节点进行唤醒
    • 关于Condition机制的使用可以参考ThreadPoolExecutor类中的awaitTermination方法

信号量Semaphore

  • 信号量是基于AQS设计的共享类型的同步器

    • 共享类型的同步器,往往用来执行多线程的配合而独占式的同步器相比会比较粗暴,就是先到先得
      • 独占式的同步器(比如Synchronized与ReentrantLock)如果也需要小范围的配合,则需要使用到等待通知机制,也就是带条件的锁,可以使用wait与notify等方法或者Conditional等
    • 其实JDK中提供的很多多线程通信工具类都是基于信号量模型的
    • Semaphore 经常用于限制同时获取某种资源的线程数量或者说是并发执行任务的线程的数量
  • Semaphore信号量实例的使用(核心的两个方法就是acquire与release)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    /**
    * 信号量实例的使用
    */
    public class SemaphoreDemo {

    // 请求并发访问资源的线程数量
    private final static int threadCount = 550;

    public static void main(String[] args) {

    // 创建一个线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    /*
    * 创建信号量实例
    * 1. 参数的含义是许可证的数量
    */
    Semaphore semaphore = new Semaphore(20);

    for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;

    threadPool.execute(() -> {
    try {
    /*
    获取一个许可,同时可访问资源的线程数量为20/1 = 20(参数默认为1,也就是一次性获得一个许可证)
    参数也可以设置为多个比如5,那么一个线程一次性获得5个许可证,那么可以运行的线程的数量为20/5 = 4
    1. acquire方法阻塞,直到获得许可证
    */
    semaphore.acquire();
    test(threadNum);
    /*
    释放一个许可,或者说增加一个许可证
    需要注意的是acquire多少就应该release多少
    */
    semaphore.release();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }

    threadPool.shutdown();
    System.out.println("finish");
    }

    public static void test(int threadNum) throws InterruptedException {

    Thread.sleep(1000);
    System.out.println("threadnum: " + threadNum);
    Thread.sleep(1000);

    }

    }
    • 实际上并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制同时获取某种资源的线程数量
    • 除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回 false
      • 也可以一次获取或者释放多个许可证,但是一般都使用一个,另外注意获取几个最后就释放几个,否则容易死锁
  • 查看Semaphore源码会发现其也有公平模式与非公平模式,与ReentrantLock一样都是在内部实现公平与非公平模式的两个AQS同步器

    • 公平模式: 调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;

      1
      /*  1. 返回剩余许可证的数量  2. 如果前边有排队的,立即返回-1,当前线程加入队列进入休眠状态  3. 如果没有排队的,并且确实还有剩余的许可证,则返回剩余的许可证数量,不再堵塞 */protected int tryAcquireShared(int acquires) {  for (;;) {    if (hasQueuedPredecessors())      return -1;    int available = getState();    int remaining = available - acquires;    if (remaining < 0 ||        compareAndSetState(available, remaining))      return remaining;  }}
      • 信号量同步器中的state就是许可证的数量
    • 非公平模式: 抢占式的。

      1
      final int nonfairTryAcquireShared(int acquires) {            for (;;) {                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }
  • acquire的具体逻辑

    1
    public void acquire(int permits) throws InterruptedException {  if (permits < 0) throw new IllegalArgumentException();  sync.acquireSharedInterruptibly(permits);}public final void acquireSharedInterruptibly(int arg)  throws InterruptedException {  if (Thread.interrupted())    throw new InterruptedException();    // 如果在公平锁中发现队列中有排队的,会返回-1  // 如果在非公平锁中成功设置state属性,但是发现不足以分配足够的令牌时  if (tryAcquireShared(arg) < 0)    doAcquireSharedInterruptibly(arg);}// AQS中的doAcquireSharedInterruptibly方法就是放入阻塞队列并自旋的方法private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {  			// 将当前线程放入阻塞队列中,并设置模式为共享模式        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {          // 与独占模式的acquire类似            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }              	// 挂起                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }
    • SemaphoreCountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits(acquire方法的形参名)。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,当先前执行任务的线程继续执行 release 方法,release 方法使得 state 的变量增加,那么自旋的线程便会进一步唤醒并判断是否可以拿到锁,如果资源还是不够就接着Park阻塞。 如此,每次只有最多不超过 permits 数量的线程能成功获取有限的资源,便限制了执行任务线程的数量

CountDownLatch

  • 共享性的同步器

  • CountDownLatch具有倒计时器的功能,形象的例子:一个裁判(主线程)的任务在终点(主线程的阻塞位置)为每一个运动员(线程)打点计时,运动员全部跑到终点后(线程执行完毕),裁判就不必一直待在终点处了,可以继续做别的事情了

  • CountDownLatch的执行原理

    • CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程await() 方法之后的语句得到执行
    • CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作
  • 使用CountDownLatch的两个场景

    • 一等多
      • 主线程await阻塞,其余线程执行countDown,直到state减少到0,主线程方形,适用场景:启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行
    • 多等一
      • 主线程state设置为1,其余线程await,主线程执行countDown后,其余线程同时放行,适用场景:实现执行任务的最大并行性,强调的是多个线程在某一时刻同时开始执行
    • 多等多(未尝不可,但是没有测试过)
  • CountDownLatch使用示例

    1
    public class CountDownLatchDemo {    final static Logger logger = LogManager.getLogger(CountDownLatchDemo.class.getName());    private final static int THREAD_COUNT = 500;    public static void main(String[] args) throws InterruptedException {        ExecutorService es = Executors.newFixedThreadPool(300);        CountDownLatch count = new CountDownLatch(THREAD_COUNT);        for (int i = 0; i < THREAD_COUNT; i++) {            final int thread_num = i;            es.execute(() -> {                try {                    // logger.info("count: [{}]", count.getCount());                    test(thread_num);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    count.countDown();                }            });        }        count.await();        es.shutdown();        logger.info("finish");    }    public static void test(int thread) throws InterruptedException {        Thread.sleep(1000l);        logger.info("thread [{}]", thread);        Thread.sleep(1000l);    }}
    • 使用countDownLatch的注意事项:

      • 务必保证state能够减少到0,否则会进入到死锁状态,比如上述代码中,把countDown放到finally代码块中执行
    • CountDownLatch的主要使用方法:

      • CountDown

        1
        public void countDown() {  sync.releaseShared(1);}public final boolean releaseShared(int arg) {  if (tryReleaseShared(arg)) {    doReleaseShared();    return true;  }  return false;}protected boolean tryReleaseShared(int releases) {  // Decrement count; signal when transition to zero  for (;;) {    int c = getState();    // 1    if (c == 0)      return false;    // 3    int nextc = c-1;    if (compareAndSetState(c, nextc))      // 2      return nextc == 0;  }}
        • 分为以下几个状态:
          1. 未执行操作前,state已经为0,此时不执行任何操作
          2. 执行操作后state变为0,执行doReleaseShared 唤醒执行await等待的线程
          3. 正常的state-1操作
      • await

        1
        public void await() throws InterruptedException {  sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)  throws InterruptedException {  if (Thread.interrupted())    throw new InterruptedException();  if (tryAcquireShared(arg) < 0)    doAcquireSharedInterruptibly(arg);}protected int tryAcquireShared(int acquires) {  return (getState() == 0) ? 1 : -1;}public boolean await(long timeout, TimeUnit unit)  throws InterruptedException {  return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)  throws InterruptedException {  if (Thread.interrupted())    throw new InterruptedException();  return tryAcquireShared(arg) >= 0 ||    doAcquireSharedNanos(arg, nanosTimeout);}    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)            throws InterruptedException {        if (nanosTimeout <= 0L)            return false;        final long deadline = System.nanoTime() + nanosTimeout;        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return true;                    }                }                nanosTimeout = deadline - System.nanoTime();                if (nanosTimeout <= 0L)                    return false;                if (shouldParkAfterFailedAcquire(p, node) &&                    nanosTimeout > spinForTimeoutThreshold)                    LockSupport.parkNanos(this, nanosTimeout);                if (Thread.interrupted())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }
        • 如果初始化的时候传入的state参数就是0,则不会有阻塞效果
        • 一直阻塞,除非除非线程被中断或超出了指定的等待时间
        • await有一个重载方法,可以实现带超时时间的阻塞等待,如果在超时时间内,state达到0,则返回true,如果超出超时时间就直接返回false
    • CountDownLatch的不足:

      • 其实例是一次性的,只能对state进行一次性的设置,倒计时结束后,实例就完全没用了
      • 循环栅栏就弥补了此缺陷,从循环二字中也可以体会到
  • 使用CountDownLatch的案例

    并发的读取多个没有顺序依赖的文件,并对读取结果整合后返回

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class CountDownLatchExample1 {
    // 处理文件的数量
    private static final int threadCount = 6;

    public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
    final int threadnum = i;
    threadPool.execute(() -> {
    try {
    //处理文件的业务操作
    ......
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    //表示一个文件已经被完成
    countDownLatch.countDown();
    }

    });
    }
    countDownLatch.await();
    threadPool.shutdown();
    System.out.println("finish");
    }

    }

    可以使用CompletableFuture进行优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //文件夹位置
    List<String> filePaths = Arrays.asList(...)
    // 异步处理所有文件
    List<CompletableFuture<String>> fileFutures = filePaths.stream()
    .map(filePath -> doSomeThing(filePath))
    .collect(Collectors.toList());
    // 将他们合并起来
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    fileFutures.toArray(new CompletableFuture[fileFutures.size()])
    );

CyclicBarrier

  • 它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。**await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时(即特定数量的线程都阻塞到一个公共屏障点时),阻塞解除,所有在此 CyclicBarrier上面阻塞的线程开始运行**。

    • 这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,所以称它为循环的 barrier
  • CyclicBarrier是基于ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 实现的,而CountDownLatch 的实现是基于 AQS 的

  • 源码分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    // 内部维持的状态变量
    private int count;

    private final ReentrantLock lock = new ReentrantLock();

    private final Condition trip = lock.newCondition();

    // 两个构造方法

    // parties是指定要拦住的线程的数量
    // barrierAction是设置数量的线程都触发屏障后,执行的触发任务,没有就设置为null
    public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
    }


    public CyclicBarrier(int parties) {
    this(parties, null);
    }

    public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);
    } catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen
    }
    }

    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    final ReentrantLock lock = this.lock;
    // 使用ReentrantLock上锁
    lock.lock();
    try {
    final Generation g = generation;
    // 默认是false
    if (g.broken)
    throw new BrokenBarrierException();

    if (Thread.interrupted()) {
    breakBarrier();
    throw new InterruptedException();
    }
    // 状态变量-1
    int index = --count;
    if (index == 0) { // tripped
    boolean ranAction = false;
    try {
    final Runnable command = barrierCommand;
    if (command != null)
    command.run();
    ranAction = true;
    // 将count 重置为 parties 属性的初始化值
    // 唤醒之前等待的线程
    // 下一波执行开始
    nextGeneration();
    return 0;
    } finally {
    if (!ranAction)
    breakBarrier();
    }
    }

    // loop until tripped, broken, interrupted, or timed out
    // 在死循环中阻塞等待
    for (;;) {
    try {
    if (!timed)
    trip.await();
    else if (nanos > 0L)
    nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
    breakBarrier();
    throw ie;
    } else {
    // We're about to finish waiting even if we had not
    // been interrupted, so this interrupt is deemed to
    // "belong" to subsequent execution.
    Thread.currentThread().interrupt();
    }
    }

    if (g.broken)
    throw new BrokenBarrierException();

    if (g != generation)
    return index;

    if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
    }
    }
    } finally {
    lock.unlock();
    }
    }

    public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
    return dowait(true, unit.toNanos(timeout));
    }

    private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
    }
  • 使用案例

    1
    /** * 循环栅栏使用demo */public class CyclicBarrierDemo {    final static Logger logger = LogManager.getLogger(CyclicBarrierDemo.class.getName());    private static final int THREAD_COUNT = 100;    // 指定要拦住的线程的数量    private final static CyclicBarrier cb = new CyclicBarrier(5, () -> {        logger.info("拦截线程达到,触发执行");    });    public static void main(String[] args) throws InterruptedException {        ExecutorService es = Executors.newFixedThreadPool(10);        for (int i = 0; i < THREAD_COUNT; i++) {            Thread.sleep(1000l);            final int thread_num = i;            es.execute(() -> {                try {                    test(thread_num);                } catch (BrokenBarrierException e) {                    e.printStackTrace();                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        es.shutdown();    }    public static void test (int num) throws BrokenBarrierException, InterruptedException {        logger.info("thread [{}] is ready", num);        cb.await();        logger.info("thread [{}] is finish", num);    }}
    • CyclicBarrier 和 CountDownLatch 的区别

      • 对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
      • CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
      • CyclicBarrier可复用,CountDownLatch不可复用
    • CyclicBarrier和CountdownLatch的使用场景是很类似的,cyclicBarrier也可以实现最大并行,也可以实现子任务优先执行,全部子任务执行完毕后再执行任务,比如以下的经典案例

      我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水

线程池中的worker

  • Worker继承AQS的一个巨大应用就是判断线程的状态,使线程池更好的控制线程的生命周期
    • 使用AQS独占模式,并且是不可重入的(同时也是非公平的)
  • 详见Java线程池总结

ReentrantReadWriteLock

  • 使用了AQS的混合模式,对于传统的锁来说,读写锁细化了锁的粒度,所以能够提升锁的性能,这一点与jdk1.6的Synchronized的优化,ConcunrrentHashMap在JDK1.8的优化类似

  • 适用于读操作远大于写操作的场景,读操作下是共享型的锁(读锁与写锁是排斥的),写操作下是独占型的锁

  • 特征

    • 只允许一个线程写入(其他线程既不能写入也不能读取)
    • 没有写入时,多个线程允许同时读(提高性能)
  • Demo—很垃圾的Demo,自己并没有想出来怎么用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    package com.diego.Thread;

    import java.util.Arrays;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;

    /**
    * @descrip
    * @author: 李佳
    * @create: 2020-09-23-09:39
    **/
    public class ReadWriteLockDemo {

    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private Integer count = 0;

    public static void main(String[] args) throws InterruptedException {

    ExecutorService es = Executors.newFixedThreadPool(10);
    ReadWriteLockDemo instance = new ReadWriteLockDemo();

    for (int i =0; i < 100; i ++) {
    final int time = i;
    es.execute(() -> {
    instance.getCount(time);
    });
    }

    for (int i =0; i < 100; i ++) {
    final int time = i;
    es.execute(() -> {
    instance.inc(time);
    });
    }



    Thread.sleep(20000);
    System.out.println(instance.count);
    es.shutdown();
    }
    public void inc(int time) {

    wlock.lock(); //加写锁
    try {
    System.out.println("第 " + time + "次写");
    count++;
    } finally {
    wlock.unlock(); // 释放写锁
    }

    }

    public int getCount(int time) {

    rlock.lock(); // 加读锁
    try {
    System.out.println("第 " + time + "次读");
    return this.count;
    } finally {
    rlock.unlock(); // 释放读锁
    }

    }

    }

  • 粗略的源码分析

    • 同样支持实现公平与非公平锁
    • 内部实现了独占与共享两种类型的同步器的模板方法
    • 读锁尝试获取时,如果有写锁占用则获取失败,写锁尝试获取时,如果读锁的占用线程数不为0或者有其他写锁占用,则获取失败
    • ReentrantReadWriteLock 基于 AQS 实现,它的自定义同步器(继承 AQS)需要在同步状态 state 上维护多个读线程和一个写线程,该状态的设计成为实现读写锁的关键。ReentrantReadWriteLock 很好的利用了高低位来实现一个整型控制两种状态的功能,读写锁将变量切分成了两个部分,高 16 位表示读,低 16 位表示写

StampedLock

  • ReentrantReadWriteLock实现的是ReadWriteLock这个接口,另外一个实现了此接口的就是StampedLock 但是StampedLock并没有使用AQS,但是其实现方式与AQS类似,不知道为什么其不适用AQS提供的框架

    • StampedLock进一步的进行性能优化,在使用ReentrantReadWriteLock,多线程读的时候,不允许其他线程进行写操作,换句话说读锁实际上是一个悲观锁,因此为了进一步的提升效率,引入StampedLock,区别就在于读的过程中也允许其他线程写入,读锁与写锁不再互斥,读锁进化成了乐观锁

      • 一般来说乐观锁的并发效率比悲观锁要高,因为悲观锁是严禁读的过程中有写入的,而乐观锁允许读的过程中有写入,只需要额外加一个机制,检测是否有新的写入,再决定是否从新读一次,不会再阻塞写线程
      • StampedLock是不可重入锁
      • StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写—–还没用过
      • StamptedLock是如何在允许写线程竞争读锁的条件下,维持数据的可见性呢,使用Unsafe提供的内存屏障工具,具体的参考Unsafe类介绍这篇文章
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      // Demo

      package com.diego.Thread;

      import java.util.concurrent.locks.Lock;
      import java.util.concurrent.locks.StampedLock;

      /**
      * @descrip
      * @author: 李佳
      * @create: 2020-09-23-20:58
      **/
      // 需要注意的是在此类型锁中,获得所与释放说都需要一个long类型的stamp(版本号),而不是直接使用实例
      public class StampedLockDemo {
      private final StampedLock stampedLock = new StampedLock();

      // 表示位置的x与y
      private double x;
      private double y;

      // 写入数据
      public void move (double delta_x, double delta_y) {

      // 获取写锁
      // stamp是锁的版本号
      long stamp = stampedLock.writeLock();
      try{
      x += delta_x;
      y += delta_y;
      }finally {
      // 释放写锁
      stampedLock.unlockWrite(stamp);
      }
      }

      // 读数据
      public double distanceFromOrigin () {

      // 首先优先尝试使用乐观读锁
      long stamp = stampedLock.tryOptimisticRead();

      // 使用乐观读锁后,乐观的假定读的过程中不会有写,所以也不用加锁(乐观锁就是无锁),直接进行读取

      double current_x = x;

      double current_y = y;

      // 尽管乐观,但是还是要考虑现实,所以需要检查是否读的过程中被写线程修改了
      // 有点类似于CAS的判断
      if (!stampedLock.validate(stamp)) {
      // 已经被修改了,转去使用悲观读锁
      long readLock = stampedLock.readLock();
      try {
      current_x = x;

      current_y = y;
      }finally {
      stampedLock.unlockRead(readLock);
      }
      }
      // 乐观取得成功,真的没人写,或者使用悲观读锁重新读取了一次
      return Math.sqrt(current_x * current_x + current_y * current_y);
      }
      }

      • 对于乐观锁来说,开发者认为数据发送时发生并发冲突的概率不大,所以读操作前不上锁,到了写操作时才会进行判断,数据在此期间是否被其他线程修改。如果发生修改,那就返回写入失败;如果没有被修改,那就执行修改操作,返回修改成功
        • 对于CAS来说,通过比较是否维持在当前值来判断是否有其他线程进行了更新,如果确有更新则直接返回false,更新失败
        • 对于StampedLock来说就是验证stamp,如果确有更新就加悲观读锁
      • StampedLock与CopyOnWriteArrayList类似,都将读锁设计为了乐观锁,不同之处在于后者完全没有后顾之忧,而前者需要判断是否要转悲观锁