锁是实现安全访问共享资源的一种同步机制。在单机环境下,Java原生锁可以很好地支持多线程竞争资源时的安全性,分布式环境就行不通了,因为不同的JVM无法共享锁资源。
分布式环境需要使用分布式锁,分布式锁使用独立于JVM的外部组件,对锁资源进行统一管理。常用的分布式锁组件有:MySQL、Zookeeper、Redis。本文主要介绍Redis分布式锁。
一、Redis实现分布式锁的原理
(一)使用SETNX与DEL实现锁的获取与释放
- 使用SETNX获取锁
Redis中获取锁可以使用SETNX命令,SETNX是SET命令的一种变体:执行SETNX时,只有当key不存在时才能成功,反之则失败。这一特性保证了多次访问的互斥性:只有第一次访问才会成功,后续所有访问都会失败。SETNX的用法如下所示:
SETNX lockKey thread-1
这里的lockKey代表了锁的名字,thread-1代表了锁的标识(一般为线程id)。thread-1线程首次访问Redis获取锁时,因为Redis中不存在lockKey这个key的记录,因此,SETNX成功。随后,由于SETNX的特性,thread-2线程再次获取锁时会失败(除非thread-1线程手动释放或者超时自动释放),从而保证多个线程只能拿到一把锁。
SETNX不支持直接设置超时时间,可以使用SET的EX属性设置超时时间,同时,为了支持互斥特性,SET必须加上NX属性。如下所示:
SET lockKey thread-1 NX EX 100
在不考虑原子性的情况下,上述命令等价于:SETNX lockKey thread-1
和EXPIRE lockKey 100
的组合。
- 使用DEL释放锁
Redis中释放锁有两种方式:手动释放与超时自动释放。超时自动释放由SET的EX属性支持,手动释放则是直接删除对应的key,如下所示:
DEL lockKey
上述操作将释放lockKey这个锁。需要注意的是,获取锁和释放锁是一对操作,执行这一操作的主体应当是同一线程,在删除之前需要判断对应锁的value是否为当前线程,只有线程标识一致才能执行删除操作(即A线程不能删除B线程的锁)。具体说明详见后续的代码实现。
(二)实现一个简单的Redis分布式锁
下面基于Spring实现一个简单的Redis分布式锁。
下述代码省略了Spring配置Redis的过程,具体内容详见Spring官网。
- 定义并实现Lock接口
定义Lock接口:
public interface Lock {
boolean tryLock(long expireTimeout);
void unlock();
}
实现Lock接口:
public class RedisLock implements Lock {
private String key;
private StringRedisTemplate redisTemplate;
public RedisLock(String key, StringRedisTemplate redisTemplate) {
this.key = key;
this.redisTemplate = redisTemplate;
}
@Override
public boolean tryLock(long expireTimeout) {
// 当前线程标识,用于标识当前锁属于哪一个线程,后面unlock释放锁会用到
String threadFlag = getCurrentThreadFlag();
// 使用SETNX获取锁
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, threadFlag, 10, TimeUnit.MINUTES);
return Boolean.TRUE.equals(result);
}
@Override
public void unlock() {
// 当前线程标识,只有当前线程才能删除对应的锁(避免锁被其它线程误删)
String threadFlagInRedis = redisTemplate.opsForValue().get(key);
if (!Objects.equals(getCurrentThreadFlag(),threadFlagInRedis)) {
// 锁已经由其它线程持有,代表当前锁已经释放(可能是因为超时释放),直接返回即可
return;
}
// 如果是当前线程持有的锁,删除对应的记录以释放锁
// 前面的查询操作和此处的删除操作不是原子性的,可能存在高并发安全问题,
// 可以使用lua脚本实现原子性(实现方式可以参见Redisson源码,后续会简单介绍)
Boolean result = redisTemplate.delete(key);
}
private String getCurrentThreadFlag() {
return String.valueOf(Thread.currentThread().getId());
}
}
测试类:
@SpringBootTest(classes = RedisApplication.class)
@RunWith(SpringRunner.class)
public class RedisLockTest {
@Autowired
StringRedisTemplate redisTemplate;
@Test
public void test() {
RedisLock redisLock = new RedisLock("customLockKey", redisTemplate);
boolean lockSuccess = redisLock.tryLock(10);
Assert.assertEquals(true, lockSuccess);
// 不支持重入
boolean lockAgainSuccess = redisLock.tryLock(10);
Assert.assertEquals(false, lockAgainSuccess);
redisLock.unlock();
}
}
- 关于分布式锁误删与原子性的补充说明
在释放锁时,如果不加任何限制,直接删除,可能会导致一个线程误删另外一个线程的锁。因此,在上述代码中,在删除锁之前通过当前线程标识与锁中value的对比结果,判断锁的获取和释放是否为同一线程,以此避免误删问题。
同时,由于上述锁的获取与释放是两个操作,Java应用向Redis发送的请求不是原子性的,这就导致在高并发的极端情况下,也可能会存在误删问题。为了支持原子性,可以将锁的获取与锁的删除操作放到同一个lua脚本中,然后统一请求Redis。Redisson便是基于这种方式,详见后文。
二、Redisson分布式锁的简单使用
Redisson简化了Redis在分布式环境下的使用,就分布式锁而言,Redisson自带的RLock
可以完美平替上述自定义的Redis分布式锁。下面介绍一下Redisson分布式锁的简单使用。
首先,引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
随后,定义并使用RLock
:
@SpringBootTest(classes = RedisApplication.class)
@RunWith(SpringRunner.class)
public class RedissonLockTest {
@Test
public void test() {
RLock redisLock = getLock("customLockKey");
boolean lockSuccess = redisLock.tryLock();
Assert.assertEquals(true, lockSuccess);
// 支持重入
boolean lockAgainSuccess = redisLock.tryLock();
Assert.assertEquals(true, lockAgainSuccess);
redisLock.unlock();
}
private static RLock getLock(String key) {
RedissonClient redissonClient = getRedissonClient();
return redissonClient.getLock(key);
}
private static RedissonClient getRedissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
与自定义的Redis分布式锁不同,Redisson的RLock
从一开始就支持重入,即一个线程可以同时获取同一把锁多次。有关Redisson分布式锁的可重入原理详见后文。
三、Redisson分布式锁的可重入原理
Redisson使用lua脚本实现了Redis锁,其源码如下所示:
public class RedissonLock extends RedissonBaseLock {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
}
为了更便于观察,将源码中的lua脚本提取出来,如下所示:
- 获取锁lua脚本
if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
- 释放锁lua脚本
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
可以看到,与自定义的Redis分布式锁不同,Redisson锁在Redis中的记录使用Hash结构,Hash结构中除了存放线程标识外,还存放了一个重入量(这一点和ReentrantLock很像)。 上述加解锁过程中的重入量是Redisson支持可重入性的关键:
- 当同一线程尝试获取锁时,重入量加1。
- 当同一线程释放锁时,重入量减1。
- 重入量的加减都会刷新锁的过期时间。
- 当重入量为0时,代表锁已释放,就可以删除了。
至此,有关Redisson分布式锁的基本使用方法和可重入性的原理已经介绍完了,有关Redisson分布式锁以及Redisson其它更多内容,可以继续查看官方wiki及源码深入了解。
参考文档