ReentrantReadWriteLock 读写锁分析总结
针对这种场景,JAVA的并发包提供了读写锁 ReentrantReadWriteLock,它内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁,描述如下:线程进入读锁的前提条件:
- 没有其他线程的写锁
- 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
- 没有其他线程的读锁
- 没有其他线程的写锁
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
- 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
- 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。
看了上面的描述大家可能有点晕,我就举一个之前开发订单的例子,辅助大家理解。 我们的订单有一个主单和子单的概念:主单编码为 orderCode
, 子单编码为 subOrderCode
对应关系是 1:N。 我在退款的时候,需要支持子单,主单退款。 子单退款,的维度是 subOrderCode
主单退款,的维度是 orderCode
可能出现并发的情况,我们可以对 orderCode
- 如果是主单退款的情况,是不是子单退款就是互斥的
- 如果是子单退款的情况,其实就可以并行的,但是子单是
如何同时存储读写锁,可以通过 state 的值进行存储,高 16 位表示读锁,低 16 位表示写锁。 比如: 0000 0000 0000 0000 (1<<16) 0000 0000 0000 0000 高 16 位不为0: 有读锁 c >>>16 低 16 位不为0: 有写锁 5
ReadWriteLock 接口
我们可以看到 ReentranReadWriteLock 有两把锁,一把读锁,一把写锁。
public class ReentrantReadWriteLockCacheTest { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); // 获取一个key对应的value public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } // 设置key对应的value,并返回旧的value public static final Object put(String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } // 清空所有的内容 public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } }
上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的 读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这 使得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法, 在更新 HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被 阻塞,而 只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发 性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock readLock = rwl.readLock(); private final Lock writeLock = rwl.writeLock(); private volatile boolean update = false; public void processData() { readLock.lock(); if (!update) { // 必须先释放读锁 readLock.unlock(); // 锁降级从写锁获取到开始 writeLock.lock(); try { if (!update) { // TODO 准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 锁降级完成,写锁降级为读锁 } try { //TODO 使用数据的流程(略) } finally { readLock.unlock(); } }
- 读锁不支持条件变量
- 重入时不升级不支持:持有读锁的情况下,去获取写锁,会导致永久等待
- 重入时支持降级:持有写锁的情况下可以去获取读锁
四、ReentranReadWriteLock 结构
方法 tryAcquire
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); // 获取写锁状态 int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入 setState(c + acquires); return true; } // 获取写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置写锁 owner setExclusiveOwnerThread(current); return true; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 首次获取读锁 if (r == 0) { firstReader = current; // 第一个线程重入 firstReaderHoldCount = 1; } else if (firstReader == current) { // 重入 firstReaderHoldCount++; } else { // 后续线程,通过 ThreadLocal 获取重入次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
到此这篇关于ReentrantReadWriteLock 读写锁分析总结的文章就介绍到这了,更多相关ReentrantReadWriteLock 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
