ReentrantReadWriteLock

ReentrantReadWriteLock表示可重入读写锁,ReentrantReadWriteLock中包含了两种锁,读锁ReadLock和写锁WriteLock,可以通过这两种锁实现线程间的同步。

读写锁计数方式

    // 高16位为读锁,低16位为写锁,且state此处为无符号
    static final int SHARED_SHIFT   = 16;
    // 读锁单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 读锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁最大数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地线程计数器
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存的计数器
    private transient HoldCounter cachedHoldCounter;
    // 第一个读线程
    private transient Thread firstReader = null;
    // 第一个读线程的计数
    private transient int firstReaderHoldCount;

读锁

// +1
state+SHARED_UNIT
// -1
state+SHARED_UNIT
// 获取当前
int readCount = state >>> SHARED_SHIFT;    // 读计数

读锁为1时

/// state的值示例,低位,即:HHHH这些是低16的写锁计数
state = 0000 0000 0000 0001 HHHH HHHH HHHH HHHH
// state >>> SHARED_SHIFT:无符号右移16,高位补0
state = 0000 0000 0000 0000 0000 0000 0000 0001
// 即:得到1

写锁

// +1
state+1
// -1
state-1
// 获取当前
// SHARED_SHIFT = 16
// EXCLUSIVE_MASK= ((1 << SHARED_SHIFT) - 1)
state & EXCLUSIVE_MASK

当写锁为1时

/// state的值示例,高位,即:HHHH这些无关紧要的
state = HHHH HHHH HHHH HHHH 0000 0000 0000 0001
// 1 << SHARED_SHIFT 即: 左移16,低位补0: 得到 65536
0000 0000 0000 0001 0000 0000 0000 0000
// 减 1 得掩码: 即: 65535
(1 << 16) - 1 = 0000 0000 0000 0000 1111 1111 1111 1111
// c & ((1 << SHARED_SHIFT) - 1): 即 c& 65535
c:      HHHH HHHH HHHH HHHH 0000 0000 0000 0000 0001
65535:  0000 0000 0000 0000 0000 1111 1111 1111 1111
result: 0000 0000 0000 0000 0000 0000 0000 0000 0001
// 即:得到1

当写锁为0时

/// state的值示例,高位,即:HHHH这些是高16的读锁计数
state = HHHH HHHH HHHH HHHH 0000 0000 0000 0000
// 1 << SHARED_SHIFT 即: 1 << 16 得到 65536
0000 0000 0000 0001 0000 0000 0000 0000
// 减 1 得掩码: 即: 65535
(1 << 16) - 1 = 0000 0000 0000 0000 1111 1111 1111 1111
// c & ((1 << SHARED_SHIFT) - 1): 即 c& 65535
c:      HHHH HHHH HHHH HHHH 0000 0000 0000 0000 0000
65535:  0000 0000 0000 0000 0000 1111 1111 1111 1111
result: 0000 0000 0000 0000 0000 0000 0000 0000 0000
// 即:得到1

核心函数

    //  获取读锁数量
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    // 获取写锁数量
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

获取共享锁(读锁)

tryReadLock()


/**
 * Performs tryLock for read, enabling barging in both modes.
 * This is identical in effect to tryAcquireShared except for
 * lack of calls to readerShouldBlock.
 */
@ReservedStackAccess
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        // 获取当前状态
        int c = getState();
        // 当前写锁不为0,且当前线程不为持有线程
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            // 失败
            return false;
        // 只有没有写锁,且总数量不得越界时,才能尝试上锁
        // 获取读锁数量
        int r = sharedCount(c);
        // 超出最大值 uint16,会导致无法正确存储和计算
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 尝试CAS数量+1
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            //成功
            if (r == 0) {
                // 如果之前没有读锁,设置为第一个读锁
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 如果有读锁,但就是当前线程,则+1
                firstReaderHoldCount++;
            } else {
                // 获取已缓存的锁
                HoldCounter rh = cachedHoldCounter;
                // 如果没有或者id不一致
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    // 更新当前 cachedHoldCounter为当前线程的hold对象
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    // 如果rh.count==0,则说明没有进入线程变量中,则放入
                    readHolds.set(rh);
                // 计数+1
                rh.count++;
            }
            // 成功获取读锁,返回即可
            return true;
        }
        // 继续循环
    }
}
  • tryReadLock() 不会阻塞,因为是尝试,且会返回结果
  • firstReaderHold一组对象,为第一个线程重入等提供快速处理
  • 对于后续频繁重入的同一线程,cachedHoldCounter 提供了第二道快速通路

释放共享锁(读锁)

tryReleaseShared

@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
    // 此处 `unused`永远为`1`。
    Thread current = Thread.currentThread();
    //  判断是否和第一个线程一致
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            // 置为空,表明第一个线程,即当前线程不再持有读锁
            firstReader = null;
        else
            // 计数-1
            firstReaderHoldCount--;
    } else {
        // 获取 cachedHoldCounter
        HoldCounter rh = cachedHoldCounter;
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        // 数量计算
        int count = rh.count;
        if (count <= 1) {
            // 移除线程变量
            readHolds.remove();
            if (count <= 0)
                // 未获取锁(或者重入次数不对),但释放了
                throw unmatchedUnlockException();
        }
        // 减少当前
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // netxc是如果减一读锁后的实际值
        // CAS尝试
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            // 如果nextc==0,则表明当前没有锁了。
            return nextc == 0;
    }
}

获取独占锁(写锁)

tryWriteLock()

    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c != 0) {
            // 如果锁数量不为0(包含读写锁)
            int w = exclusiveCount(c);
            // 获取写锁数量
            if (w == 0 || current != getExclusiveOwnerThread())
            // 由于锁总数不为0,此时只判断写锁数量,如果没有写锁,则说明只有读锁,由于此处需要拿独占锁,所以无法获取锁
            // 而如果w!=0,说明有线程拿到了写锁,则需要判断是不是重入,如果不是重入,则也无法拿到锁-> 这是为了避免线程先拿写锁,再拿读锁,又拿写锁的情况
                return false;
            // 锁越界
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
        // 走到这里,说明要不之前没有锁,要不锁被当前线程持有,且未越界
        // 尝试CAS更新锁状态
        if (!compareAndSetState(c, c + 1))
            return false;
        // 设置当前线程为持有线程
        setExclusiveOwnerThread(current);
        return true;
    }

释放独占锁(写锁)

tryRelease

/**
    * The synchronization state.
*/
private volatile int state;
...
/*
    * Note that tryRelease and tryAcquire can be called by
    * Conditions. So it is possible that their arguments contain
    * both read and write holds that are all released during a
    * condition wait and re-established in tryAcquire.
*/
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    // 是否是当前线程持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取下一次应该更新的state数据
    int nextc = getState() - releases;
    // 是否更新后被释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    // 
    setState(nextc);
    return free;
}
...
protected final void setState(int newState) {
    state = newState;
}

没有使用比较并交换(CAS)是有原因的。因为在释放独占锁(写锁)的路径上,只有持有该写锁的线程才有权执行释放操作;既然只有“唯一的持有者”会修改状态,简单的算术更新加上对 volatile 字段的写入就足够保证正确性与可见性,无需用 CAS 做并发竞争保护。

results matching ""

    No results matching ""