1. 观察一下锁的基本功能

同步锁的本质 - 排队

同步的方式:独享锁-单个队列窗口,共享锁-多个队列窗口

抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)

没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待

唤酲阻塞线程的方式(叫号器):全部通知、通知下一个

2. 先手撸一个简单的独占锁

// 自己实现(独享锁) - 常用的
public class MyLock implements Lock {
     // 如何判断一个锁的状态,或者说 
    volatile  AtomicReference<Thread> owner = new AtomicReference<>();
    // 保存正在等待的线程
    volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

    @Override
    public boolean tryLock() {
        return owner.compareAndSet(null, Thread.currentThread());
    }

    @Override
    public void lock() {
        boolean addQ = true;//防止重复加入集合,这里用了最简单的方式,演示,多少有点问题,领会精神就行
        while (!tryLock()) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前线程
                LockSupport.park();//伪唤醒,就是非unpark唤醒的,所以上面用while
                // 后续,等待其他线程释放锁,收到通知之后继续循环
            }
        }
        waiters.remove(Thread.currentThread());
    }

    @Override
    public void unlock() {
        // cas 修改 owner 拥有者
        if (owner.compareAndSet(Thread.currentThread(), null)) {
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread waiter = iterator.next();
                LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
            }

        }
    }
    ...

嗯,写的很不错,查看一下java源码看看是不是差不多

public class ReentrantLock {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }

    public void lock() {
        sync.lock();
    }

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

嗯?源码怎么这么简洁,sync是什么鬼?

Sync extends AbstractQueuedSynchronizer

这就是AQS

3. AQS抽象队列同步器的概念

提供了对资源占用、释放,线程的等待、唤醒等等接口的定义和具体实现

可以用在各种需要控制资源争用的场景中。(ReentrantLock/ CountDownLatch/ Semphore)

image-20200831160051565

acquire、 acquireShared:定义了资源争用的逻辑,如果没拿到,则等待。

tryAcquire、 tryAcquireShared:实际执行占用资源的操作,内容由使用者具体去实现。

release、 releaseShared:定义释放资源的逻辑,释放之后,通知后续节点进行争抢。

tryRelease、 tryReleaseShared:实际执行资源释放的操作,具体内容由AQS使用者去实现。

4. 我们也自己整一个AQS

// 抽象队列同步器 - 还是独占的
// state, owner, waiters
public class MyAqs {
    // 1、 如何判断一个资源的拥有者
    public volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 保存 正在等待的线程
    public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
    public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean addQ = true;
        while (!tryAcquire()) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() { // 定义了 释放资源之后要做的操作
        if (tryRelease()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }
}

然后在MyLock中使用AQS

// 自己实现(独享锁) - 常用的
public class MyLock implements Lock {
	// 抽象工具类AQS
    MyAqs aqs = new MyAqs(){
        @Override
        public boolean tryAcquire() {
            return owner.compareAndSet(null, Thread.currentThread());
        }

        @Override
        public boolean tryRelease() {
            // 可重入的情况下,要判断资源的占用情况(state字段保存了资源的占用次数)
            return owner.compareAndSet(Thread.currentThread(), null);
        }
    };

    @Override
    public boolean tryLock() {
        return aqs.tryAcquire();
    }

    @Override
    public void lock() {
        aqs.acquire();
    }

    @Override
    public void unlock() {
        aqs.release();
    }
    ...

这样就和ReentrantLock源码差不多了,源码大体思路就是这样,只是源码AQS考虑了更多适用的情况,添加了更多的功能

比如公平锁和非公平锁的实现,公平锁是lock()直接进去资源争抢阶段acquire();非公平锁是lock()进来先尝试直接占用资源,然后才进入acquire()争抢

而且等待队列不是用的Queue而是用的链表,其中也使用了很多CAS,有兴趣的可以完整看一遍.

补充一份资源占用流程

image-20200901144318257

5. AQS的其他应用 - 信号量、倒计数器和回环栅栏

AQS是一种很优雅的抽象,她还可以用来实现更多的操作

再说应用之前我们还要补充一下AQS中共享的资源占用的一些方法

5.1 完善一下我们的AQS

这里增加一些共享资源争用的方法,其实就是前面提到的acquireShared,tryAcquireShared,releaseShared,tryReleaseShared

release、 releaseShared:定义释放资源的逻辑,释放之后,通知后续节点进行争抢。

tryRelease、 tryReleaseShared:实际执行资源释放的操作,具体内容由AQS使用者去实现

 	// 记录资源状态
    public volatile AtomicInteger state = new AtomicInteger(0);
    public AtomicInteger getState() { return state; }
    public void setState(AtomicInteger state) { this.state = state; }

	// 共享资源占用的逻辑,返回资源的占用情况
    public int tryAcquireShared(){
        throw new UnsupportedOperationException();
    }

    public void acquireShared(){
        boolean addQ = true;
        while(tryAcquireShared() < 0) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryReleaseShared(){
        throw new UnsupportedOperationException();
    }

    public void releaseShared(){
        if (tryReleaseShared()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }
 	// 独占资源相关的代码
	...

5.2 Semaphore

又称"信号量",控制多个线程争抢许可。

  • acquire:获取一个许可,如果没有就等待
  • release:释放一个许可。
  • availablePermits:方法得到可用的许可数目

使用场景示例

  1. 代码并发处理限流(hystrix)

照例手撸一个MySemaphore

// 自定义的信号量实现
public class MySemaphore {
    MyAqs aqs = new MyAqs() {
        @Override
        public int tryAcquireShared() { // 信号量获取, 数量 - 1
            for(;;) {
                int count =  getState().get();
                int n = count - 1;
                if(count <= 0 || n < 0) {
                    return -1;
                }
                if(getState().compareAndSet(count, n)) {
                    return 1;
                }
            }
        }

        @Override
        public boolean tryReleaseShared() { // state + 1
            return getState().incrementAndGet() >= 0;
        }
    };

    /** 许可数量 */
    public MySemaphore(int count) {
        aqs.getState().set(count); // 设置资源的状态
    }

    public void acquire() {
        aqs.acquireShared();
    } // 获取令牌

    public void release() {
        aqs.releaseShared();
    } // 释放令牌
}

5.3 CountDownLatch

java1.5被引入的一个工具类,常被称为:倒计数器。

创建对象时,传入指定数值作为线程参与的数量;

await:方法等待计数器值变为0,在这之前,线程进入等待状态;

countdown:计数器数值减一,直到为0;

经常用于等待其他线程执行到某一节点,再继续执行当前线程代码

使用场景示例

  1. 统计线程执行的情况
  2. 压力测试中,使用 countDownLatch实现最大程度的并发处理;
  3. 多个线程之间,相互通信,比如线程异步调用完接口,结果通知;

类似田径运动,八个赛道运动员依次准备,等到所有人就绪,裁判才吹响口哨

依旧手撸一个MyCountDownLatch

// CountDownLatch 自己实现
public class  MyCountDownLatch {
    MyAqs myAqs = new MyAqs() {
        @Override
        public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待
            return this.getState().get() == 0 ? 1 : -1;
        }

        @Override
        public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行
            return this.getState().decrementAndGet() == 0;
        }
    };

    public  MyCountDownLatch(int count) {
        myAqs.setState(new AtomicInteger(count));
    }

    public void await() {
        myAqs.acquireShared();
    }

    public void countDown() {
        myAqs.releaseShared();
    }
}

5.4 CyclicBarrier

也是1.5加入的,又称为"线程栅栏","回环栅栏"。

创建对象时,指定栅栏线程数量。

await:等指定数量的线程都处于等待状态时,继续执行后续代码。

barrierAction:线程数量到了指定量之后,自动触发执行指定任务。

栅栏没有显式的使用AQS,她使用锁,每一个线程获取一把锁,且count--,当count为0时,signalAll()且重置count

使用场景示例

  1. 数据量比较大时,实现批量插入数据到数据库;
  2. 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总

类似打游戏,比如吃鸡,每凑够100人就开始一场游戏

撸秃噜皮了的MyCyclicBarrier

public class MyCyclicBarrier {
    private int count;//计数
    private int initNum;//用于重置count
    private Runnable barrierAction;//barrierAction
    private ReentrantLock lock = new ReentrantLock();
    private Condition trip = lock.newCondition();

    public MyCyclicBarrier(int initNum, Runnable barrierAction){
        this.initNum = initNum;
        this.count = initNum;
        this.barrierAction = barrierAction;
    }

    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (--count == 0) {  // tripped 数量够了
                if (barrierAction != null)
                    barrierAction.run(); // 触发执行指定的任务
                // 唤醒等待的线程继续执行。重新计数
                trip.signalAll(); // 唤醒线程
                count = initNum; // count重置
                return;
            }
            //没到数量,进入等待
            trip.await();
        } finally {
            lock.unlock();
        }
    }
}

5.5 CountDownLatch和CyclicBarrier的区别

  1. CountDownLatch只是一次计数, CyclicBarrier对象可多次触发执行;

  2. CountDownLatch的多个参与者只是参与计数,不会阻塞本身,后面的代码继续执行,只有等待计数归零的代码在阻塞;

    而CyclicBarrier的多个参与者在执行到await()方法的时候都会被栅栏拦住,直到满足数量的参与者就绪才会开门放行

    所以CountDownLatch相当于一个人在等其他人就绪后做什么,而CyclicBarrier是所有人等其他人就绪后一起开始做什么

  3. 同时,CountDownLatch需要已知总数,当总数不固定的时候没法使用