I wrote an article about the red lock implementation in Redis a long, long time ago, but in a production environment, the distributed lock component used in my projects has always been Redisson, a Java-based Redis client framework written with features of In-Memory Data Grid Redisson is a Java-based Redis client framework (Redis Java Client) with features of In-Memory Data Grid, which extends the basic data types of Redis with a variety of advanced data structures, as shown in the official introduction.

The R(ed) Lock implementation to be analyzed in this article is only one of the very small modules, other advanced features can be selected on demand. The following section will cover the basic principles, source code analysis, and Jedis-like implementation. The Redisson source code analyzed in this article is the main branch source code of the Redisson project around 2020-01, corresponding to version 3.14.1.

Fundamentals

The basics of red lock are actually “out in the open” on the Redis website’s front page documentation (the link is https://redis.io/topics/distlock).

Roughly speaking: distributed locks are a very useful primitive in many environments where different processes must operate in a mutually exclusive manner using shared resources. This attempts to provide a more specific algorithm to implement distributed locks for Redis. We propose an algorithm called Redlock, which implements DLM (guessing that it stands for Distributed Lock Manager), which we believe is more secure than the normal single-instance approach.

Three core features of the algorithm (three minimum guarantees).

  • Safety property: Mutual exclusion. At any given moment, only one client can hold a lock.
  • Liveness property A: Deadlock free. Eventually it is always possible to acquire a lock, even if the client that locked a resource crashes or gets partitioned.
  • Liveness property B: Fault tolerance. As long as the majority of Redis nodes are up, clients are able to acquire and release locks.

The documentation also points out that the current implementation of the algorithm for failover still has obvious problems with competing conditions (described, I believe, under the Redis master-slave architecture):.

  • Client A acquires the lock in the Redis master node (assuming the locked resource is X)
  • Redis master node crashes before it can synchronize the KEY to the Redis slave node
  • Redis slave node is promoted to master node due to failure
  • At this point, Client B succeeds in acquiring the lock for resource X (the problem is that the lock for resource X has already been acquired by Client A earlier, so that there is a concurrency problem)

The implementation of the algorithm is simple, and the locking command under a single Redis instance is as follows.

1
SET $resource_name $random_value NX PX $ttl

Nx and PX here are enhanced parameters for the SET command, which since Redis version 2.6.12 has provided the optional compound operator.

  • EX: set the timeout time in seconds
  • PX: Set the timeout time in milliseconds
  • NX: abbreviation of IF NOT EXIST, K-V will be set only if KEY does not exist, set successfully to return 1, otherwise return 0
  • XX: abbreviation of IF EXIST, K-V will be set only if KEY exists, successful setting returns 1, otherwise returns 0

The unlock command under a single Redis instance is as follows.

1
2
3
4
5
6
7
# KEYS[1] = $resource_name
# ARGV[1] = $random_value
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

Using RLock in Redisson

To use RLock, you need to first instantiate Redisson, which has been adapted to Redis’ Sentinel, Cluster, Normal Master-Slave, and Standalone modes, because I have only installed standalone Redis locally, so here is a demonstration using the standalone mode configuration. Instantiating RedissonClient.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static RedissonClient REDISSON;

@BeforeClass
public static void beforeClass() throws Exception {
    Config config = new Config();
    // Standalone
    config.useSingleServer()
            .setTimeout(10000)
            .setAddress("redis://127.0.0.1:6379");
    REDISSON = Redisson.create(config);
//        // Master-Slave
//        config.useMasterSlaveServers()
//                .setMasterAddress("主节点连接地址")
//                .setSlaveAddresses(Sets.newHashSet("从节点连接地址"));
//        REDISSON = Redisson.create(config);
//        // Sentinel
//        config.useSentinelServers()
//                .setMasterName("Master名称")
//                .addSentinelAddress(new String[]{"哨兵连接地址"});
//        REDISSON = Redisson.create(config);
//        // Cluster
//        config.useClusterServers()
//                .addNodeAddress(new String[]{"集群节点连接地址"});
//        REDISSON = Redisson.create(config);
}

Locking and unlocking.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Test
public void testLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    RLock lock = REDISSON.getLock(resourceName);
    Thread threadA = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadA");
    Thread threadB = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ignore) {
    }
}

// 某次执行的输出结果
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁

More often than not, we choose APIs with a wait time period and a maximum lock holding time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void testTryLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    int waitTime = 500;
    int leaseTime = 1000;
    Thread threadA = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadA");
    Thread threadB = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName, int waitTime, int leaseTime) {
    RLock lock = REDISSON.getLock(resourceName);
    try {
        String threadName = Thread.currentThread().getName();
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
                Thread.sleep(800);
            } finally {
                lock.unlock();
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        } else {
            System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime));
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// 某次执行的输出结果
线程threadA获取到资源resource:x的锁
线程threadB获取资源resource:x的锁失败,等待时间:500 ms
线程threadA释放资源resource:x的锁

To make it easier to use, you can refer to the programmatic transactions in spring-tx for a light wrapper like this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@RequiredArgsConstructor
private static class RedissonLockProvider {

    private final RedissonClient redissonClient;

    public <T> T executeInLock(String resourceName, LockAction lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            return lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }

    public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                return lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
        return null;
    }

    public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    }

    public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }
}

@FunctionalInterface
interface LockAction {

    default void onAcquire(String resourceName) {

    }

    <T> T doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

@FunctionalInterface
interface LockActionWithoutResult {

    default void onAcquire(String resourceName) {

    }

    void doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

Using RedissonLockProvider (for information only).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Test
public void testRedissonLockProvider() throws Exception {
    RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
    String resourceName = "resource:x";
    Thread threadA = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadA");
    Thread threadB = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}
// 某次执行结果
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁

The principle of RLock implementation in Redisson

The implementation of RLock in Redisson is basically a reference to the Redis red lock algorithm, but with improvements to the original red lock algorithm, including the following features.

  • Mutual Exclusion
  • Deadlock-free
  • Reentrant, similar to ReentrantLock, the same thread can repeatedly obtain the same resource lock (generally implemented using a counter), the reentrant feature of the lock is generally conducive to improving the utilization of resources
  • Renewal, this is a relatively avant-garde solution, that is, if a client to the resource X permanently locked, then not directly to the KEY survival period set to -1, but through a daemon thread every fixed period to extend the KEY expiration time, so as to achieve the premise of the daemon thread is not killed, to avoid the collapse of the client caused by the lock can not be released for a long time to occupy the resources of the problem
  • LockPubSub, which relies on org.redisson.pubsub, is used to subscribe and notify lock release events.
  • Not exactly refer to the red lock algorithm implementation, the data type chosen is HASH, with Lua script to complete the atomicity of multiple commands

Renewal or extension of the KEY expiration time in Redisson using watch dog implementation, understanding for the renewal of the guard thread, the underlying reliance on Netty’s time wheel HashedWheelTimer and task io.netty.util.Timeout implementation, commonly known as watchdog, the following will be analyzed in detail.

First look at the class diagram of RLock.

There is a point of confusion here, the annotation of RedissonRedLock (a subclass of RedissonMultiLock) mentions RedLock locking algorithm implementation for multiple locks. But intuitively, RedissonLock is the core of the locking system, and the implementation follows the red locking algorithm.

RedissonLock is a direct implementation of RLock, which is also the core class of the distributed lock implementation, as seen in the source code Redisson#getLock() is a direct instantiation of RedissonLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class Redisson implements RedissonClient {
    
    // ...... 省略其他代码

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }

    // ...... 省略其他代码
}

The class inheritance diagram of RedissonLock is as follows.

Several points of awareness are needed here.

  • RedissonLock implements all the methods of the java.util.concurrent.locks.Lock interface except for the newCondition() method, which means that it can be seamlessly adapted to the Lock interface, which is a boon for users who are used to the Lock interface API.
  • RedissonLock basically all synchronous API depends on the implementation of the asynchronous API, that is, the implementation of RLock depends on the implementation of RLockAsync, the underlying dependency is Netty’s io.netty.util.concurrent.Promise, see RedissonPromise, if developers who have used Future in JUC should be more familiar with Future#get(), which is similar here
  • The simple functions of the several parent classes on the right are described as follows.
    • RObjectAsync: The base interface for all Redisson objects, providing some asynchronous methods for memory measurement, object copying, moving, etc.
    • RObject: synchronous version of RObjectAsync
    • RExpirableAsync: provides asynchronous methods related to object TTL
    • RExpirable: synchronous version of RExpirableAsync
    • RedissonObject: directly implements the methods in the class RObject interface
    • RedissonExpirable: mainly implements the methods in the RExpirable interface

Moving on to the constructor and core properties of RedissonLock first.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

// 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期
protected long internalLockLeaseTime;

// ID,唯一标识,是一个UUID
final String id;

// 
final String entryName;

// 锁释放事件订阅发布相关
protected final LockPubSub pubSub;

// 命令异步执行器实例
final CommandAsyncExecutor commandExecutor;

/**
 * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等
 * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY
 */
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    // 这里的ID为外部初始化的UUID实例,调用toString()
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
    this.entryName = id + ":" + name;
    // 初始化LockPubSub实例,用于订阅和发布锁释放的事件
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

// RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务
public static class ExpirationEntry {
    
    // 线程ID -> 线程重入的次数
    private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
    private volatile Timeout timeout;
    
    public ExpirationEntry() {
        super();
    }
    
    // 这个方法主要记录线程重入的计数
    public void addThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            counter = 1;
        } else {
            counter++;
        }
        threadIds.put(threadId, counter);
    }

    public boolean hasNoThreads() {
        return threadIds.isEmpty();
    }

    public Long getFirstThreadId() {
        if (threadIds.isEmpty()) {
            return null;
        }
        return threadIds.keySet().iterator().next();
    }

    public void removeThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            return;
        }
        counter--;
        if (counter == 0) {
            threadIds.remove(threadId);
        } else {
            threadIds.put(threadId, counter);
        }
    }
    
    public void setTimeout(Timeout timeout) {
        this.timeout = timeout;
    }
    public Timeout getTimeout() {
        return timeout;
    }
}

Here you need to pay attention to the lockWatchdogTimeout parameter in the Config.

Here is a list of methods to get names in RedissonLock for later use when analyzing these names as KEY for K-V structures.

  • id: generated by the UUID instance instantiated at the time of configuration instantiation, from the source code analysis of each connection method of Redisson instance has a unique UUID, ConnectionManager initialization will call UUID id = UUID.randomUUUID(), I think it can be interpreted as the Redisson instance in a After all, in general, an application should only apply one type of Redisson connection
  • getEntryName(): returns the UUID + : + $KEY, e.g. 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
  • getName(): returns $KEY, e.g. resource:x
  • getChannelName(): returns redisson_lock__channel:{$KEY}, e.g. redisson_lock__channel:{resource:x}
  • getLockName(long threadId): returns the UUID + : + $threadId, e.g. 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1

Moving on to the locking method, the core implementation is mainly.

  • private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException: lock method system
  • public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException: tryLock method system

Let’s first look at the lock() method system, which contains only the maximum holding time of the lock.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
/**
 * 获取锁,不指定等待时间,只指定锁的最大持有时间
 * 通过interruptibly参数配置支持中断
 */
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件
    // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器
    // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
    try {
        while (true) {
            // 死循环中尝试获取锁,这个是后面会分析的方法
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式
            if (ttl == null) {
                break;
            }

            // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断
            // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
            // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
        unsubscribe(future, threadId);
    }
}

// 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

/**
 * 通过传入锁持有的最大时间和线程ID异步获取锁
 */
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 执行异常场景直接返回
        if (e != null) {
            return;
        }

        // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期
        if (ttlRemaining == null) {
            // 定时调度进行续期操作
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

/**
 * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 
 * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 底层向Redis服务执行LUA脚本
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

First, note the property internalLockLeaseTime, which is reassigned within the tryLockInnerAsync() method, and it is assigned to lockWatchdogTimeout if releaseTime == -1L. This detail is important and determines the scheduling of the renewal method (watchdog) later This detail is important and determines the frequency of scheduling of later renewal methods (watchdogs). Also, the releaseTime ! = -1L will not be renewed, i.e., the watchdog mechanism will not be activated.

Next, we need to carefully analyze the LUA script executed in tryLockInnerAsync(), which I have extracted and described through comments.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功
if (redis.call('exists', KEYS[1]) == 0) then
    -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数
    redis.call('hset', KEYS[1], ARGV[2], 1);
    -- 设置KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加1,记录延长ttl的次数
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 延长KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间
return redis.call('pttl', KEYS[1]);

Here is a diagram demonstrating the logic of the three pieces of code that appear in this Lua script.

The remaining scheduleExpirationRenewal(threadId) method has not been analyzed, and the logic inside is the periodic renewal logic of the watchdog:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 基于线程ID定时调度和续期
private void scheduleExpirationRenewal(long threadId) {
    // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 当前进行的当前线程重入加锁
        oldEntry.addThreadId(threadId);
    } else {
        // 当前进行的当前线程首次加锁
        entry.addThreadId(threadId);
        // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄
        renewExpiration();
    }
}

// 处理续期
private void renewExpiration() {
    // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 这里是重复外面的那个逻辑,
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 向Redis异步发送续期的命令
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                // 抛出异常,续期失败,只打印日志和直接终止任务
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, 
    // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一
    internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
    
    // ExpirationEntry实例持有调度任务实例
    ee.setTimeout(task);
}

// 调用Redis,执行Lua脚本,进行异步续期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        //  这里根据前面的分析,internalLockLeaseTime在leaseTime的值为-1的前提下,对应值为lockWatchdogTimeout
        internalLockLeaseTime, getLockName(threadId));  
}

Based on the source code, it is inferred that the mechanism of renewal is determined by the entry reference to releaseTime.

  • When the releaseTime == -1 (usually the lock() and lockInterruptibly() method calls), the scheduling period of the renewal task is lockWatchdogTimeout / 3 and the maximum holding time of the lock (KEY expiration time) is refreshed to lockWatchdogTimeout
  • When the releaseTime ! = -1 (usually such method calls as lock(long leaseTime, TimeUnit unit) and lockInterruptibly(long leaseTime, TimeUnit unit) specify that the leaseTime is not -1), the expiration time of the lock is set directly in this case as The input value is converted to a time unit of ms and no renewal mechanism is started

The Lua script for extracting the renewal is as follows.

1
2
3
4
5
6
7
8
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

So far, the locking and renewal logic without the waitTime parameter is basically analyzed, and the tryLock(long waitTime, long leaseTime, TimeUnit unit) implementation with the waitTime parameter is actually the same as the lock(long leaseTime, TimeUnit unit, boolean interruptibly) implementation with only the leaseTime parameter. leaseTime, TimeUnit unit, boolean interruptibly) implementation of the underlying call method is the same, the biggest difference is that after trying to obtain the lock operation based on the before and after the System.currentTimeMillis() calculate the time difference and waitTime to do a comparison, to decide the need to Blocking wait or direct timeout to get the lock failed to return, the logic to deal with blocking wait is the logic of the client itself, here do not do a detailed expansion, because the source code implementation is not very elegant (too much long currentTime = System.currentTimeMillis() code segment). Then spend some effort to analyze the implementation of unlocking, including the general case of unlock unlock() and forceUnlockAsync().

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
//  一般情况下的解锁
@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync() {
    // 获取当前调用解锁操作的线程ID
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 构建一个结果RedissonPromise
    RPromise<Void> result = new RedissonPromise<Void>();
    // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        // 走到这里说明正常解锁,取消看门狗的续期任务
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });
    return result;
}

// 真正的内部解锁的方法,执行解锁的Lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

// 取消续期任务
void cancelExpirationRenewal(Long threadId) {
    // 这里说明ExpirationEntry已经被移除,一般是基于同一个线程ID多次调用解锁方法导致的(并发解锁)
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    // 传入的线程ID不为NULL,从ExpirationEntry中移除线程ID,如果持有的线程ID对应的线程重入计数不为0,会先递减到0,等于0的前提下才会进行删除
    if (threadId != null) {
        task.removeThreadId(threadId);
    }
    // 这里threadId == null的情况是为了满足强制解锁的场景,强制解锁需要直接删除锁所在的KEY,不需要理会传入的线程ID(传入的线程ID直接为NULL)
    // 后者task.hasNoThreads()是为了说明当前的锁没有被任何线程持有,对于单线程也确定在移除线程ID之后重入计数器已经为0,从ExpirationEntry中移除,这个时候获取ExpirationEntry的任务实例进行取消即可
    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        // EntryName -> ExpirationEntry映射中移除当前锁的相关实例ExpirationEntry
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

// 强制解锁
@Override
public boolean forceUnlock() {
    return get(forceUnlockAsync());
}

@Override
public RFuture<Boolean> forceUnlockAsync() {
    // 线程ID传入为NULL,取消当前的EntryName对应的续期任务
    cancelExpirationRenewal(null);
    // 执行Lua脚本强制删除锁所在的KEY并且发布解锁消息
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('del', KEYS[1]) == 1) then "
            + "redis.call('publish', KEYS[2], ARGV[1]); "
            + "return 1 "
            + "else "
            + "return 0 "
            + "end",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}

The Lua scripts for unlocking and forcing unlocking in general are listed here and analyzed as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- unlockInnerAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- ARGV[2] == internalLockLeaseTime --> 30000或者具体的锁最大持有时间
-- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一个IF分支判断如果锁所在的哈希的field不存在,说明当前线程ID未曾获取过对应的锁,返回NULL表示解锁失败
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
-- 走到这里通过hincrby进行线程重入计数-1,返回计数值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 计数值大于0,说明线程重入加锁,这个时候基于internalLockLeaseTime对锁所在KEY进行续期
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 计数值小于或等于0,说明可以解锁,删除锁所在的KEY,并且向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
-- 最后的return nil;在IDEA中提示是不会到达的语句,估计这里是开发者笔误写上去的,前面的if-else都有返回语句,这里应该是不可达的
return nil;

-------------------------------------------------- 不怎么华丽的分割线 -------------------------------------------------

-- forceUnlockAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- 强制删除锁所在的KEY,如果删除成功向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
if (redis.call('del', KEYS[1]) == 1) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1
else
    return 0
end

Other auxiliary methods are relatively simple, so here is a simple “running account” to record some.

  • isLocked(): calls Redis’s EXISTS $KEY command based on getName() to determine if the lock is applied.
  • isHeldByThread(long threadId) and isHeldByCurrentThread(): calls Redis’ HEXISTS $KEY $LOCK_NAME command based on - getName() and getLockName(threadId) to determine if the corresponding field-value exists in the HASH, and if it does, the lock is held by the thread with the corresponding thread ID
  • getHoldCount(): calls Redis’ HGET $KEY $LOCK_NAME command based on getName() and getLockName(threadId), which is used to get the number of threads holding a lock (the comment is called holds, which is actually the number of times the KEY of a lock has been renewed by the same thread)

The subscription and publishing sections are designed to use a large number of Netty components related to the source code, which is not expanded in detail here, and the logic of this section is simply appended to this flowchart later. Finally, a more detailed diagram analyzes the Redisson locking and unlocking process.

  • The locking process without the waitTime parameter.

  • The locking process with the waitTime parameter (the process on the right side of the diagram remains essentially the same, mainly because the process on the left calculates the time interval at each step).

  • Unlocking process

Assuming that two different threads X and Y of different processes go to compete for the lock on the resource RESOURCE, the possible flow is as follows.

A final overview of the HASH data types used to implement the red lock algorithm in Redisson.

  • KEY represents the resource or lock, creation, existence judgment, extended life cycle and deletion operations are always performed for KEY
  • FIELD represents the lock name lockName(), but in fact it consists of the initialization UUID of the Redisson connection manager instance spliced with the client thread ID, which is strictly speaking the unique identification of the client thread that acquired the lock.
  • VALUE represents the amount of locks held by the client thread, which from the source code should be the number of times the KEY has been renewed

Jedis-based implementation of Redisson-like distributed locking capabilities

The previous sections have analyzed in some detail the principles of distributed lock implementation in Redisson, and a similar implementation is done here using Jedis and multi-threading techniques. For simplicity, only a non-referenced lock() method (similar to the Redisson scenario where releaseTime == -1) and an unlock() method are implemented here. Define the interface RedLock.

1
2
3
4
5
6
public interface RedLock {

    void lock(String resource) throws InterruptedException;

    void unlock(String resource);
}

For the sake of simplicity, I have written all the implementation logic in the implementation class RedisRedLock.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
@RequiredArgsConstructor
public class RedisRedLock implements RedLock {

    private final JedisPool jedisPool;
    private final String uuid;

    private static final String WATCH_DOG_TIMEOUT_STRING = "30000";
    private static final long WATCH_DOG_TASK_DURATION = 10000L;
    private static final String CHANNEL_PREFIX = "__red__lock:";
    private static final String UNLOCK_STATUS_STRING = "0";

    private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" +
            "    redis.call('hset', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" +
            "    redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "return redis.call('pttl', KEYS[1]);";

    private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" +
            "    return nil;\n" +
            "end;\n" +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" +
            "if (counter > 0) then\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[2]);\n" +
            "    return 0;\n" +
            "else\n" +
            "    redis.call('del', KEYS[1]);\n" +
            "    redis.call('publish', KEYS[2], ARGV[1]);\n" +
            "    return 1;\n" +
            "end;";

    private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return 0;";

    private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool();
    private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() * 2
    );

    private static class ThreadEntry {

        private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap();

        private volatile WatchDogTask watchDogTask;

        public synchronized void addThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                counter = 1;
            } else {
                counter++;
            }
            threadCounter.put(threadId, counter);
        }

        public synchronized boolean hasNoThreads() {
            return threadCounter.isEmpty();
        }

        public synchronized Long getFirstThreadId() {
            if (threadCounter.isEmpty()) {
                return null;
            }
            return threadCounter.keySet().iterator().next();
        }

        public synchronized void removeThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                return;
            }
            counter--;
            if (counter == 0) {
                threadCounter.remove(threadId);
            } else {
                threadCounter.put(threadId, counter);
            }
        }

        public void setWatchDogTask(WatchDogTask watchDogTask) {
            this.watchDogTask = watchDogTask;
        }

        public WatchDogTask getWatchDogTask() {
            return watchDogTask;
        }
    }

    @Getter
    private static class SubPubEntry {

        private final String key;
        private final Semaphore latch;
        private final SubscribeListener subscribeListener;

        public SubPubEntry(String key) {
            this.key = key;
            this.latch = new Semaphore(0);
            this.subscribeListener = new SubscribeListener(key, latch);
        }
    }

    private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap();

    @Override
    public void lock(String resource) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        String lockName = uuid + ":" + threadId;
        String entryName = uuid + ":" + resource;
        // 获取锁
        Long ttl = acquire(resource, lockName, threadId, entryName);
        // 加锁成功直接返回
        if (Objects.isNull(ttl)) {
            return;
        }
        // 订阅
        SubPubEntry subPubEntry = subscribeAsync(resource);
        try {
            for (; ; ) {
                ttl = acquire(resource, lockName, threadId, entryName);
                // 加锁成功直接返回
                if (Objects.isNull(ttl)) {
                    return;
                }
                if (ttl > 0L) {
                    subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            }
        } finally {
            unsubscribeSync(subPubEntry);
        }
    }

    private Long acquire(String key, String lockName, long threadId, String entryName) {
        Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key),
                Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
        if (Objects.nonNull(result)) {
            return Long.parseLong(String.valueOf(result));
        }
        // 启动看门狗
        ThreadEntry entry = new ThreadEntry();
        ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key),
                    Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
            WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction));
            entry.setWatchDogTask(watchDogTask);
            WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS);
        }
        return null;
    }

    private SubPubEntry subscribeAsync(String key) {
        SubPubEntry subPubEntry = new SubPubEntry(key);
        SUB_PUB_POOL.submit(() -> {
            SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
            executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName()));
            return null;
        });
        return subPubEntry;
    }

    private void unsubscribeSync(SubPubEntry subPubEntry) {
        SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
        subscribeListener.unsubscribe(subscribeListener.getChannelName());
    }

    @Override
    public void unlock(String resource) {
        long threadId = Thread.currentThread().getId();
        String entryName = uuid + ":" + resource;
        String lockName = uuid + ":" + threadId;
        String channelName = CHANNEL_PREFIX + resource;
        Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName),
                Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName)));
        ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName);
        if (Objects.nonNull(threadEntry)) {
            threadEntry.removeThreadId(threadId);
            if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) {
                threadEntry.getWatchDogTask().cancel();
            }
        }
        if (Objects.isNull(result)) {
            throw new IllegalMonitorStateException();
        }
    }

    private static class SubscribeListener extends JedisPubSub {

        @Getter
        private final String key;
        @Getter
        private final String channelName;
        @Getter
        private final Semaphore latch;

        public SubscribeListener(String key, Semaphore latch) {
            this.key = key;
            this.channelName = CHANNEL_PREFIX + key;
            this.latch = latch;
        }

        @Override
        public void onMessage(String channel, String message) {
            if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) {
                latch.release();
            }
        }
    }

    @RequiredArgsConstructor
    private static class WatchDogTask implements Runnable {

        private final AtomicBoolean running = new AtomicBoolean(true);
        private final AtomicReference<Runnable> actionReference;

        @Override
        public void run() {
            if (running.get() && Objects.nonNull(actionReference.get())) {
                actionReference.get().run();
            } else {
                throw new WatchDogTaskStopException("watch dog cancel");
            }
        }

        public void cancel() {
            actionReference.set(null);
            running.set(false);
        }
    }

    private <T> T execute0(Function<Jedis, T> function) {
        try (Jedis jedis = jedisPool.getResource()) {
            return function.apply(jedis);
        }
    }

    interface Action {

        void apply(Jedis jedis);
    }

    private void executeWithoutResult(Action action) {
        try (Jedis jedis = jedisPool.getResource()) {
            action.apply(jedis);
        }
    }

    private static class WatchDogTaskStopException extends RuntimeException {

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    public static void main(String[] args) throws Exception {
        String resourceName = "resource:x";
        RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString());
        Thread threadA = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadA");
        Thread threadB = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }

    private static void process(String resourceName) {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignore) {
        }
    }
}

The above implementation was written in a short time, without doing detailed DEBUG, there may be mistakes. The result of a certain implementation is as follows.

1
2
3
4
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁

Summary

The red lock implementation in Redisson applies to the following core technologies.

  • Rational application of Redis’ basic data type HASH
  • Redis subscription publishing
  • Atomicity of Lua scripts
  • Promise implementation in Netty
  • Time wheel HashedWheelTimer in Netty and the corresponding timed task (HashedWheel) Timeout Semaphore for deadline, permanent or interruptible blocking and wake-up, replacing the no-wait deadline blocking in CountDownLatch

The above core technologies are applied in a reasonable manner to achieve an efficient and fault-tolerant distributed locking scheme, but from the current point of view, Redisson has not yet solved the failover defect in the red lock algorithm, which I believe may be an underlying defect in the Redis distributed locking scheme. Once applied to a Redis cluster (common master-slave, sentry or Cluster), there is a chance that the aforementioned node role switching will lead to the problem of multiple different clients acquiring the same lock corresponding to the same resource. There is no solution for now.


Reference https://www.throwx.cn/2021/01/14/learn-about-redisson/