Java 并发编程知识梳理以及常见处理模式 features and patterns
内容整理自《Java并发编程实战》《java-concurrency-patterns》 相关工具类《vjtools》
* count+=1 操作分析
* 指令 1:需要把变量 count 从内存加载到 CPU的寄存器;
* 指令 2:在寄存器中执行 +1 操作;
* 指令 3:将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。
public class Singleton { private static Singleton instance; private Singleton(){} public static Singleton getInstance(){ //一重判断 if (instance == null) { synchronized(Singleton.class) { //二重判断防止多线程同时竞争锁的情况多次创建 if (instance == null) instance = new Singleton(); } } return instance; } }
JVM 中的同步是基于进入和退出管程(Monitor)对象实现的。每个对象实例都会有一个 Monitor,Monitor 可以和对象一起创建、销毁。Monitor 是由 ObjectMonitor 实现,而 ObjectMonitor 实现,而 ObjectMonitor 是由C++ 的 ObjectMonitor.hpp 文件实现
当多个线程同时访问时,多个线程会被先放在EntryList集合,处于block状态的线程,都会被加入到该列表。接下来当线程获取到对象的Monitor时,Monitor依靠底层操作系统的Mutex Lock来实现互斥,线程申请Mutex成功则持有,其它线程无法获取到Mutex
ObjectMonitor() { _header = NULL; _count = 0; // 记录个数 _waiters = 0, _recursions = 0; _object = NULL; _owner = NULL; _WaitSet = NULL; // 处于 wait 状态的线程,会被加入到 _WaitSet _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; // 处于等待锁 block 状态的线程,会被加入到该列表 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; }
wait():如果线程调用 wait() 方法,就会释放当前持有的 Mutex,并且该线程会进入 WaitSet 集合中,等待下一次被唤醒。如果当前线程顺利执行完方法,也将释放 Mutex。
锁升级优化
jdk1.6引入Java对象头(MarkWord、指向类的指针以及数组长度三部分组成)
对象实例分为对象头、实例数据和对齐填充
64位JVM中MarkWord的存储结构
偏向锁
轻量级锁
重量级锁
class X { // 修饰非静态方法 锁对象为当前类的实例对象 this synchronized void get() { } // 修饰静态方法 锁对象为当前类的Class对象 Class X synchronized static void set() { } // 修饰代码块 Object obj = new Object(); void put() { synchronized(obj) { } } }
final int x; // 错误的构造函数 public FinalFieldExample() { x = 3; y = 4; // 此处就是讲 this 逸出, global.obj = this; }
程序前面对某个变量的修改一定是对后续操作可见的。
2. **volatile 变量规则**
对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
传递性规则
A Happens-Before B,且 B Happens-Before C,那么A Happens-Before C。
![]()
管程(synchronized)中锁的规则
对一个锁的解锁 Happens-Before 于后续对这个锁加锁
synchronized (this) { // 此处自动加锁 // x 是共享变量, 初始值 =10 if (this.x < 12) { this.x = 12; } } // 此处自动解锁
线程 start() 规则
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
Thread B = new Thread(()->{ // 主线程调用 B.start() 之前 // 所有对共享变量的修改,此处皆可见 // 此例中,var==77 }); // 此处主线程A对共享变量 var 修改 var = 77; // 主线程启动子线程 B.start();
线程 join() 规则
主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。
Thread B = new Thread(()->{ // 此处对共享变量 var 修改 var = 66; }); // 例如此处对共享变量修改, // 则这个修改结果对线程 B 可见 // 主线程启动子线程 B.start(); B.join() // 子线程所有对共享变量的修改 // 在主线程调用 B.join() 之后皆可见 // 此例中,var==66
初始状态
指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
可运行状态
指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
运行状态
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
休眠状态
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
终止状态
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
class SafeCalc { long value = 0L; long get() { synchronized (new Object()) { return value; } } void addOne() { synchronized (new Object()) { value += 1; } } }
class Account { // 账户余额
private Integer balance; // 账户密码 private String password; // 取款 void withdraw(Integer amt) { synchronized(balance) { if (this.balance > amt){ this.balance -= amt; } } } // 更改密码 void updatePassword(String pw){ synchronized(password) { this.password = pw; } } }
void addIfNotExist(Vector v, Object o){ if(!v.contains(o)) { v.add(o); } }
synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
// 支持中断的 API void lockInterruptibly() throws InterruptedException; // 支持超时的 API boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 支持非阻塞获取锁的 API boolean tryLock();
class X { private final Lock rtl =new ReentrantLock(); int value; public int get() { // 获取锁 rtl.lock(); ② try { return value; } finally { // 保证锁能释放 rtl.unlock(); } } public void addOne() { // 获取锁 rtl.lock(); try { value = 1 + get(); ① } finally { // 保证锁能释放 rtl.unlock(); } } }
// 无参构造函数:默认非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // 根据公平策略参数创建锁 public ReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
class Semaphore{ // 计数器 int count; // 等待队列 Queue queue; // 初始化操作 Semaphore(int c){ this.count=c; } // void down(){ this.count--; if(this.count<0){ // 将当前线程插入等待队列 // 阻塞当前线程 } } void up(){ this.count++; if(this.count<=0) { // 移除等待队列中的某个线程 T // 唤醒线程 T } } }
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
List list = Collections.synchronizedList(new ArrayList()); synchronized (list) { Iterator i = list.iterator(); while (i.hasNext()) foo(i.next()); }
CopyOnWriteArrayList写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁)
内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。
若遍历 array 的同时,新增元素,CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。
总结:
ConcurrentHashMap、ConcurrentSkipListMap(SkipList跳表) 区别在于Key是否有序
CopyOnWriteArraySet、ConcurrentSkipListSet
单端阻塞队列
双端阻塞队列
单端非阻塞队列
双端非阻塞队列
getAndIncrement() // 原子化 i++ getAndDecrement() // 原子化的 i-- incrementAndGet() // 原子化的 ++i decrementAndGet() // 原子化的 --i // 当前值 +=delta,返回 += 前的值 getAndAdd(delta) // 当前值 +=delta,返回 += 后的值 addAndGet(delta) //CAS 操作,返回是否成功 compareAndSet(expect, update) // 以下四个方法 // 新值可以通过传入 func 函数来计算 getAndUpdate(func) updateAndGet(func) getAndAccumulate(x,func) accumulateAndGet(x,func)
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ThreadPoolExecutor提供了三个submit方法
Future<?> submit(Runnable task);// 提交 Runnable 任务
这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
Future submit(Callable task);// 提交 Callable 任务
Callable只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
Future submit(Runnable task, T result);// 提交 Runnable 任务及结果引用
假设这个方法返回的 Future 对象是 f,f.get()=的返回值就是传给 submit() 方法的参数 result。
Future接口提供的方法
// 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否已取消 boolean isCancelled(); // 判断任务是否已结束 boolean isDone(); // 获得任务执行结果 get(); // 获得任务执行结果,支持超时 get(long timeout, TimeUnit unit);
FutureTask(Callable callable); FutureTask(Runnable runnable, V result);