缘起

每门编程语言基本都离不开并发问题,Java亦如此。谈到Java的并发就离不开Doug lea老爷子贡献的juc包,而AQS又是juc里面的佼佼者
因此今天就一起来聊聊AQS

概念

AQS是什么,这里借用官方的话
Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out wait queues

AQS的全程是AbstractQueuedSynchronizer,在这里咱们进行咬文嚼字一下。
Abstract:这是AQS采用模板设计模式的基础,AQS中将定义了大部分同步的流程,仅将加解锁的操作留给子类根据需求进行自定义(这也就是为什么使用AQS可以快速开发锁或者同步器的主要原因)
Queued:这里指的是CLH队列;当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH是一个虚拟的双向链表实现的队列,获取不到锁资源时会被AQS封装成Node节点并加入队列。每个线程执行结束都会唤醒其下一个节点
Synchronizer:同步控制,AQS的同步控制实现有两种。一种是volatile+CAS的乐观锁设计,另一种是LockSupport+CAS的悲观锁设计

切入点

  1. 模板方法设计
  2. 可重入设计
  3. CLH队列设计
  4. 公平/非公平锁设计

模板方法设计

我们来看看AQS的设计流程

从图中可看到,AQS设计并且实现了一个同步器/锁的完整流程;
但是将tryAcquire/tryAcquireShared和tryRelease/tryReleaseShared这些经常改动的操作设置为抽象方法,留给子类自行拓展

acquire方法剖析
这个方法采用门面设计模式,将CAS获取锁,封装线程以及添加CLH队列的操作封装成一个方法并对外提供
我们常用的ReentrantLock.lock方法实际上就是调用此方法

public final void acquire(int arg) {
    // 尝试通过CAS获取锁,若获取成功则执行同步代码块
    // 获取失败则通过addWaiter将当前线程封装成AQS的Node节点并加入CLH队列,同时中断当前线程
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter方法剖析
Node是AQS的内部类,Node是组成CLH队列的节点
申请公平/非公平锁失败都会被加入CLH队列

private Node addWaiter(Node mode) {
    // 1. 将当前线程跟Node关联起来,方便AQS根据队列顺序唤醒获取锁
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 2. 追加新建节点到双向链表尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 3. 初始化链表
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法剖析

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 1. 获得当前Node的前驱节点,也就是当前任务的前一个任务
            final Node p = node.predecessor();
            // 2. 如果前一个任务是head则尝试通过CAS来获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //3. 通过UNSAGE.park来阻塞当前线程来等待许可
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

小结
通过这小节我们可以看到AQS是如何通过模板方法设计模式大大简化了同步器/锁开发的

可重入设计

ReentrantLock是可重入锁的典型设计,在这里就基于它进行分析

ReentrantLock的锁是组合设计,通过内部类Sync、NonfairSync和FairSync来实现
这里看看非公平锁NonfairSync的实现

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果当前锁空闲,则获取锁并设置锁的Owner为当前线程
    if (c == 0) {      
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 2. 如果当前锁已经被获取,则判断锁的Owner是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}


protected final boolean tryRelease(int releases) {
    // 1. 进行锁释放时对state进行减法,由于加锁和释放锁的操作都是配对出现的。那么重入2次时状态为3,那么释放锁的时候也会释放3次直到状态state为0时才释放锁
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        // 2. 释放锁的时候先将锁的Owner置空
        setExclusiveOwnerThread(null);
    }
    // 3. 释放锁
    setState(c);
    return free;
}

锁Owner实现
AQS的锁Owner是通过继承父类AbstractOwnableSynchronizer来实现的
AbstractOwnableSynchronizer只有一个成员属性exclusiveOwnerThread,用来标识独占锁场景下是哪个线程持有锁,方便可重入判断

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
  
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

小结
通过分析我们可以看到,AQS的可重入设计是通过父类AbstractOwnableSynchronizer+volatile修饰的state作为计数器来实现的

CLH队列设计

当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH队列保证了锁可以高效进行分配,避免了无意义的唤醒阻塞未获得锁线程

CLH的入口在于AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,它是一个虚拟的双向链表,addWaiter会将请求锁的线程封装成Node节点,Node节点中通过存储前后序节点来维护双向队列

源码剖析

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 1. 获取前序节点
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
          return true;
      // 将当前节点所有直接前驱节点从CLH队列中移除
      if (ws > 0) {
          do {
              node.prev = pred = pred.prev;
          } while (pred.waitStatus > 0);
          pred.next = node;
      } else {
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
}

private final boolean parkAndCheckInterrupt() {
    //请求锁的线程加入CLH队列中,通过调用LockSupport、UNSAFE来进行线程的阻塞,直至CLH链表中上一个节点释放锁后才会主动唤醒
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport源码实现

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    // 这是做什么用的?
    setBlocker(t, blocker);
    // 最终调用UNSAFE来执行
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

// 在UNSAFE中显示这是个本地方法,是由C++来实现的
public native void park(boolean var1, long var2);

// C++通过给当前线程添加一个互斥量0,每次该线程获得CPU时间片都会判断该互斥量,为0则将当前线程切换为阻塞状态
// 直到其他线程通过unpark来将此互斥量修改为1,该方法才会继续往下执行

小结
CLH队列给AQS维护了一个高效率的锁分配/释放的基础

公平/非公平锁设计

JDK中公平锁和非公平锁的具体实现分别是FairSync和NonfairSync
FairSync的获取锁流程

NonfairSync的获取锁流程

小结
通过分析可看到公平锁和非公平锁的获取锁/释放锁逻辑几乎一致,这都要归功于AQS的模板方法设计模式
两者不同的地方在于获取公平锁的请求会直接加入到CLH队列中等待锁,获取非公平锁的请求都会尝试直接获取锁,获取失败再加入CLH队列进行等待
非公平锁的性能相对而言更好,一般也是首选,但是会存在锁饥饿的现象;公平锁可以有效解决锁饥饿的问题,但性能相对而言会差一些

总结

  1. 在AQS中可以看到不少优秀的设计,这都要归功于Doug Lea老爷子;除了AQS,在juc里还有很多优秀的设计,如并发性能最好的字典ConcurrentHashMap、无锁高性能队列ConcurrentLinkedQueue等等
    在品完源码后,你会发现其设计思想丝毫不逊色于各个大数据组件
  2. “曾经想征服全世界,到最后回头才发现,这世界点点滴滴全部都是你” ——致JDK
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/lin-/p/14719614.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!