redisson RLock分布式锁源码解析(基于V1.0.0源码)

内容纲要

目录

介绍

锁获取

锁信息 LockValue

Lock()

tryLock()

tryLock(long time, TimeUnit unit)

unlock()

subscribe()

问题

介绍

  • 这里的代码是Redisson v1.0.0代码,因为这里的代码还比较简单更容易阅读和理解。
    RLock接口继承了JUC的Lock接口,所以把RLock当做JUC的Lock一样使用就可以。

    public interface RLock extends Lock, RObject

RLock的实现是RedissonLock。

锁获取

  1. 检查locksMap中是否已经有了锁
  2. 没有则进行创建,并将锁放入locksMap中(并发处理:lock = oldLock,因为可能有其他线程先创建了锁)
  3. 启动锁消息订阅

    /**
     * Returns distributed lock instance by name.
     *
     * @param name of the distributed lock
     * @return distributed lock
     */
    public RLock getLock(String name) {
        RedissonLock lock = locksMap.get(name);
        if (lock == null) {
            lock = new RedissonLock(connectionManager, name, id);
            RedissonLock oldLock = locksMap.putIfAbsent(name, lock);
            if (oldLock != null) {
                lock = oldLock;
            }
        }
    
        lock.subscribe();
        return lock;
    }

LockValue

LockValue是RedissonLock的内部类,主要是定义了锁key对应的Value值,三个成员变量为:

private UUID id;
// 使用锁的线程ID
private Long threadId;
// Counter是为了可重入添加,同一线程加锁计数加1,释放锁减1。
private int counter;

lock()

获取锁,如果锁被其他线程占用就死等直到收到锁释放的消息会再次尝试。
lock的实现是调用tryLock(),如果tryLock返回false就等待释放锁的消息,收到释放锁的消息后会再次调用tryLock(),重复以上过程直到获取到锁。

while (!tryLock()) {
    // waiting for message
    msg.acquire();
}

tryLock()

这个是尝试获取一下锁,成功返回true失败返回false.

  1. 获取锁之前先创建锁Key对应的Value,Id为Redisson中定义的UUID,线程ID,先将counter+1
  2. 从Redis连接池中获取一个连接
  3. 向Redis发送set nx命令获取锁
  4. 如果锁已经有线程占用锁,则获取锁的Value,检查是否为自己(lock.equals(currentLock)主要对比id和threadId是否相同)
  5. 如果是自己计数器+1否则返回false,加锁不成功

LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();

RedisConnection<Object, Object> connection = connectionManager.connection();
try {
//
Boolean res = connection.setnx(getKeyName(), currentLock);
if (!res) {
LockValue lock = (LockValue) connection.get(getKeyName());
if (lock != null && lock.equals(currentLock)) {
lock.incCounter();
connection.set(getKeyName(), lock);
return true;
}
}
return res;
} finally {
connectionManager.release(connection);
}


# tryLock(long time, TimeUnit unit) throws InterruptedException
具有等待时间的tryLock实现,tryLock()这个无参数的函数获取不到就立即返回,而这个是获取不到等待到你指定的时间才会返回。
它的实现基本上和lock()差不多,只是加入了从计时功能:先调用tryLock()获取一下锁,获取不到就等待锁释放消息,当收到锁释放消息后会检查一下从调用到现在经过了多长时间,如果经过的时间已经超过(大于等于)了设置的时间就返回false,如果还没有超过就继续等。
```java
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    time = unit.toMillis(time);
    while (!tryLock()) {
        if (time <= 0) {
            return false;
        }
        long current = System.currentTimeMillis();
        // waiting for message
        msg.tryAcquire(time, TimeUnit.MILLISECONDS);
        long elapsed = System.currentTimeMillis() - current;
        time -= elapsed;
    }
    return true;
}

unlock

  1. 从Redis连接池中获取一个连接
  2. 根据锁key获取对应的Value
  3. 首先查看当前持有锁的线程是不是自己,如果不是自己肯定不行,抛出一个异常
  4. 如果是自己那么检查一下自己是不是重入了:每调用一次unlock计数器就减1,
  5. 当计数器为0时就真正的去连接redis删除锁key然后发布释放消息

    public void unlock() {
    LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
    
    RedisConnection<Object, Object> connection = connectionManager.connection();
    try {
        LockValue lock = (LockValue) connection.get(getKeyName());
        if (lock != null && lock.equals(currentLock)) {
            if (lock.getCounter() > 1) {
                lock.decCounter();
                connection.set(getKeyName(), lock);
            } else {
                unlock(connection);
            }
        } else {
            throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
    } finally {
        connectionManager.release(connection);
    }
    }

subscribe()

订阅的是释放锁消息。

  1. 一把锁只执行一次订阅
  2. 注册Redis订阅
  3. 收到释放锁的消息就发送一个信号让等待锁的线程们有机会争抢一下锁

    public void subscribe() {
    if (subscribeOnce.compareAndSet(false, true)) {
        msg.acquireUninterruptibly();
    
        RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
    
            @Override
            public void subscribed(String channel, long count) {
                if (getChannelName().equals(channel)) {
                    subscribeLatch.countDown();
                }
            }
    
            @Override
            public void message(String channel, Integer message) {
                if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
                    msg.release();
                }
            }
    
        };
    
        pubSubEntry = connectionManager.subscribe(listener, getChannelName());
    }
    
    try {
        subscribeLatch.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    }

问题

  1. JVM挂掉了如何解锁?
    JVM挂掉后,该JVM线程持有的锁则无法正确释放。

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部