Redis(3) 分布式锁
2024-02-18 19:49:29 # Backend # Redis

1. Redisson 分布式锁

image-20240218192041327

可重入:利用hash结构记录线程id和重入次数

可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制

超时续约:利用watchDog,开启一个定时任务,每隔一段时间(releaseTime / 3),重置超时时间

主从一致性问题:利用MultiLock锁,Redis不再区分主从节点,只有所有的节点都写入成功,才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性

MultiLock原理

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
RLock lock1 = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
lock = redissonClient.getMultiLock(lock1, lock2, lock3);

public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
} else {
this.locks.addAll(Arrays.asList(locks));
}
}

联锁的tryLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1L;
//如果传入了释放时间
if (leaseTime != -1L) {
//再判断一下是否有等待时间
if (waitTime == -1L) {
//如果没传等待时间,不重试,则只获得一次
newLeaseTime = unit.toMillis(leaseTime);
} else {
//想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二
newLeaseTime = unit.toMillis(waitTime) * 2L;
}
}
//获取当前时间
long time = System.currentTimeMillis();
//剩余等待时间
long remainTime = -1L;
if (waitTime != -1L) {
remainTime = unit.toMillis(waitTime);
}
//锁等待时间,与剩余等待时间一样
long lockWaitTime = this.calcLockWaitTime(remainTime);
//锁失败的限制,源码返回是的0
int failedLocksLimit = this.failedLocksLimit();
//已经获取成功的锁
List<RLock> acquiredLocks = new ArrayList(this.locks.size());
//迭代器,用于遍历
ListIterator<RLock> iterator = this.locks.listIterator();

while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();

boolean lockAcquired;
try {
//没有等待时间和释放时间,调用空参的tryLock
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
//否则调用带参的tryLock
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException var21) {
this.unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception var22) {
lockAcquired = false;
}
//判断获取锁是否成功
if (lockAcquired) {
//成功则将锁放入成功锁的集合
acquiredLocks.add(lock);
} else {
//如果获取锁失败
//判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环
if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
break;
}
//否则将拿到的锁都释放掉
if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
//如果等待时间为-1,则不想重试,直接返回false
if (waitTime == -1L) {
return false;
}

failedLocksLimit = this.failedLocksLimit();
//将已经拿到的锁都清空
acquiredLocks.clear();
//将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
--failedLocksLimit;
}
}
//如果剩余时间不为-1,很充足
if (remainTime != -1L) {
//计算现在剩余时间
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
//如果剩余时间为负数,则获取锁超时了
if (remainTime <= 0L) {
//将之前已经获取到的锁释放掉,并返回false
this.unlockInner(acquiredLocks);
//联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败
return false;
}
}
}
//如果设置了锁的有效期
if (leaseTime != -1L) {
List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
//迭代器用于遍历已经获取成功的锁
Iterator var24 = acquiredLocks.iterator();

while(var24.hasNext()) {
RLock rLock = (RLock)var24.next();
//设置每一把锁的有效期
RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

var24 = futures.iterator();

while(var24.hasNext()) {
RFuture<Boolean> rFuture = (RFuture)var24.next();
rFuture.syncUninterruptibly();
}
}
//但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期
return true;
}