1. 观察一下锁的基本功能
同步锁的本质 - 排队
同步的方式:
独享锁-单个队列窗口,共享锁-多个队列窗口
抢锁的方式:
插队抢(不公平锁)、先来后到抢锁(公平锁)
没抢到锁的处理方式:
快速尝试多次(CAS自旋锁)、阻塞等待
唤酲阻塞线程的方式(叫号器):
全部通知、通知下一个
2. 先手撸一个简单的独占锁
// 自己实现(独享锁) - 常用的
public class MyLock implements Lock {
// 如何判断一个锁的状态,或者说
volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 保存正在等待的线程
volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
@Override
public boolean tryLock() {
return owner.compareAndSet(null, Thread.currentThread());
}
@Override
public void lock() {
boolean addQ = true;//防止重复加入集合,这里用了最简单的方式,演示,多少有点问题,领会精神就行
while (!tryLock()) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前线程
LockSupport.park();//伪唤醒,就是非unpark唤醒的,所以上面用while
// 后续,等待其他线程释放锁,收到通知之后继续循环
}
}
waiters.remove(Thread.currentThread());
}
@Override
public void unlock() {
// cas 修改 owner 拥有者
if (owner.compareAndSet(Thread.currentThread(), null)) {
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread waiter = iterator.next();
LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
}
}
}
...
嗯,写的很不错,查看一下java源码看看是不是差不多
public class ReentrantLock {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public void unlock() {
sync.release(1);
}
嗯?源码怎么这么简洁,sync
是什么鬼?
Sync extends AbstractQueuedSynchronizer
这就是AQS
3. AQS抽象队列同步器的概念
提供了对资源占用、释放,线程的等待、唤醒等等接口的定义和具体实现
可以用在各种需要控制资源争用的场景中。(ReentrantLock/ CountDownLatch/ Semphore)
acquire、 acquireShared:
定义了资源争用的逻辑,如果没拿到,则等待。
tryAcquire、 tryAcquireShared:
实际执行占用资源的操作,内容由使用者具体去实现。
release、 releaseShared:
定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
tryRelease、 tryReleaseShared:
实际执行资源释放的操作,具体内容由AQS使用者去实现。
4. 我们也自己整一个AQS
// 抽象队列同步器 - 还是独占的
// state, owner, waiters
public class MyAqs {
// 1、 如何判断一个资源的拥有者
public volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 保存 正在等待的线程
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
throw new UnsupportedOperationException();
}
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() { // 定义了 释放资源之后要做的操作
if (tryRelease()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
}
然后在MyLock中使用AQS
// 自己实现(独享锁) - 常用的
public class MyLock implements Lock {
// 抽象工具类AQS
MyAqs aqs = new MyAqs(){
@Override
public boolean tryAcquire() {
return owner.compareAndSet(null, Thread.currentThread());
}
@Override
public boolean tryRelease() {
// 可重入的情况下,要判断资源的占用情况(state字段保存了资源的占用次数)
return owner.compareAndSet(Thread.currentThread(), null);
}
};
@Override
public boolean tryLock() {
return aqs.tryAcquire();
}
@Override
public void lock() {
aqs.acquire();
}
@Override
public void unlock() {
aqs.release();
}
...
这样就和ReentrantLock源码差不多了,源码大体思路就是这样,只是源码AQS考虑了更多适用的情况,添加了更多的功能
比如公平锁和非公平锁的实现,公平锁是lock()直接进去资源争抢阶段acquire();非公平锁是lock()进来先尝试直接占用资源,然后才进入acquire()争抢
而且等待队列不是用的Queue而是用的链表,其中也使用了很多CAS,有兴趣的可以完整看一遍.
补充一份资源占用流程
5. AQS的其他应用 - 信号量、倒计数器和回环栅栏
AQS是一种很优雅的抽象,她还可以用来实现更多的操作
再说应用之前我们还要补充一下AQS中共享的资源占用的一些方法
5.1 完善一下我们的AQS
这里增加一些共享资源争用的方法,其实就是前面提到的acquireShared,tryAcquireShared,releaseShared,tryReleaseShared
release、 releaseShared:
定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
tryRelease、 tryReleaseShared:
实际执行资源释放的操作,具体内容由AQS使用者去实现
// 记录资源状态
public volatile AtomicInteger state = new AtomicInteger(0);
public AtomicInteger getState() { return state; }
public void setState(AtomicInteger state) { this.state = state; }
// 共享资源占用的逻辑,返回资源的占用情况
public int tryAcquireShared(){
throw new UnsupportedOperationException();
}
public void acquireShared(){
boolean addQ = true;
while(tryAcquireShared() < 0) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryReleaseShared(){
throw new UnsupportedOperationException();
}
public void releaseShared(){
if (tryReleaseShared()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
// 独占资源相关的代码
...
5.2 Semaphore
又称"信号量",控制多个线程争抢许可。
acquire:
获取一个许可,如果没有就等待release:
释放一个许可。availablePermits:
方法得到可用的许可数目
使用场景示例
- 代码并发处理限流(hystrix)
照例手撸一个MySemaphore
// 自定义的信号量实现
public class MySemaphore {
MyAqs aqs = new MyAqs() {
@Override
public int tryAcquireShared() { // 信号量获取, 数量 - 1
for(;;) {
int count = getState().get();
int n = count - 1;
if(count <= 0 || n < 0) {
return -1;
}
if(getState().compareAndSet(count, n)) {
return 1;
}
}
}
@Override
public boolean tryReleaseShared() { // state + 1
return getState().incrementAndGet() >= 0;
}
};
/** 许可数量 */
public MySemaphore(int count) {
aqs.getState().set(count); // 设置资源的状态
}
public void acquire() {
aqs.acquireShared();
} // 获取令牌
public void release() {
aqs.releaseShared();
} // 释放令牌
}
5.3 CountDownLatch
java1.5被引入的一个工具类,常被称为:倒计数器。
创建对象时,传入指定数值作为线程参与的数量;
await
:方法等待计数器值变为0,在这之前,线程进入等待状态;
countdown
:计数器数值减一,直到为0;
经常用于等待其他线程执行到某一节点,再继续执行当前线程代码
使用场景示例
- 统计线程执行的情况
- 压力测试中,使用 countDownLatch实现最大程度的并发处理;
- 多个线程之间,相互通信,比如线程异步调用完接口,结果通知;
类似田径运动,八个赛道运动员依次准备,等到所有人就绪,裁判才吹响口哨
依旧手撸一个MyCountDownLatch
// CountDownLatch 自己实现
public class MyCountDownLatch {
MyAqs myAqs = new MyAqs() {
@Override
public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待
return this.getState().get() == 0 ? 1 : -1;
}
@Override
public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行
return this.getState().decrementAndGet() == 0;
}
};
public MyCountDownLatch(int count) {
myAqs.setState(new AtomicInteger(count));
}
public void await() {
myAqs.acquireShared();
}
public void countDown() {
myAqs.releaseShared();
}
}
5.4 CyclicBarrier
也是1.5加入的,又称为"线程栅栏","回环栅栏"。
创建对象时,指定栅栏线程数量。
await
:等指定数量的线程都处于等待状态时,继续执行后续代码。
barrierAction
:线程数量到了指定量之后,自动触发执行指定任务。
栅栏没有显式的使用AQS,她使用锁,每一个线程获取一把锁,且count--,当count为0时,signalAll()且重置count
使用场景示例
- 数据量比较大时,实现批量插入数据到数据库;
- 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总
类似打游戏,比如吃鸡,每凑够100人就开始一场游戏
撸秃噜皮了的MyCyclicBarrier
public class MyCyclicBarrier {
private int count;//计数
private int initNum;//用于重置count
private Runnable barrierAction;//barrierAction
private ReentrantLock lock = new ReentrantLock();
private Condition trip = lock.newCondition();
public MyCyclicBarrier(int initNum, Runnable barrierAction){
this.initNum = initNum;
this.count = initNum;
this.barrierAction = barrierAction;
}
public void await() throws InterruptedException {
lock.lock();
try {
if (--count == 0) { // tripped 数量够了
if (barrierAction != null)
barrierAction.run(); // 触发执行指定的任务
// 唤醒等待的线程继续执行。重新计数
trip.signalAll(); // 唤醒线程
count = initNum; // count重置
return;
}
//没到数量,进入等待
trip.await();
} finally {
lock.unlock();
}
}
}
5.5 CountDownLatch和CyclicBarrier的区别
-
CountDownLatch只是一次计数, CyclicBarrier对象可多次触发执行;
-
CountDownLatch的多个参与者只是参与计数,不会阻塞本身,后面的代码继续执行,只有等待计数归零的代码在阻塞;
而CyclicBarrier的多个参与者在执行到await()方法的时候都会被栅栏拦住,直到满足数量的参与者就绪才会开门放行
所以CountDownLatch相当于一个人在等其他人就绪后做什么,而CyclicBarrier是所有人等其他人就绪后一起开始做什么
-
同时,CountDownLatch需要已知总数,当总数不固定的时候没法使用