锁是实现安全访问共享资源的一种同步机制。在单机环境下,Java原生锁可以很好地支持多线程竞争资源时的安全性,分布式环境就行不通了,因为不同的JVM无法共享锁资源。

分布式环境需要使用分布式锁,分布式锁使用独立于JVM的外部组件,对锁资源进行统一管理。常用的分布式锁组件有:MySQL、Zookeeper、Redis。本文主要介绍Redis分布式锁。

一、Redis实现分布式锁的原理

(一)使用SETNX与DEL实现锁的获取与释放

  1. 使用SETNX获取锁

Redis中获取锁可以使用SETNX命令,SETNX是SET命令的一种变体:执行SETNX时,只有当key不存在时才能成功,反之则失败。这一特性保证了多次访问的互斥性:只有第一次访问才会成功,后续所有访问都会失败。SETNX的用法如下所示:

SETNX lockKey thread-1

这里的lockKey代表了锁的名字,thread-1代表了锁的标识(一般为线程id)。thread-1线程首次访问Redis获取锁时,因为Redis中不存在lockKey这个key的记录,因此,SETNX成功。随后,由于SETNX的特性,thread-2线程再次获取锁时会失败(除非thread-1线程手动释放或者超时自动释放),从而保证多个线程只能拿到一把锁。

SETNX不支持直接设置超时时间,可以使用SET的EX属性设置超时时间,同时,为了支持互斥特性,SET必须加上NX属性。如下所示:

SET lockKey thread-1 NX EX 100

在不考虑原子性的情况下,上述命令等价于:SETNX lockKey thread-1EXPIRE lockKey 100的组合。

  1. 使用DEL释放锁

Redis中释放锁有两种方式:手动释放与超时自动释放。超时自动释放由SET的EX属性支持,手动释放则是直接删除对应的key,如下所示:

DEL lockKey

上述操作将释放lockKey这个锁。需要注意的是,获取锁和释放锁是一对操作,执行这一操作的主体应当是同一线程,在删除之前需要判断对应锁的value是否为当前线程,只有线程标识一致才能执行删除操作(即A线程不能删除B线程的锁)。具体说明详见后续的代码实现

(二)实现一个简单的Redis分布式锁

下面基于Spring实现一个简单的Redis分布式锁。

下述代码省略了Spring配置Redis的过程,具体内容详见Spring官网

  1. 定义并实现Lock接口

定义Lock接口:

public interface Lock {
    boolean tryLock(long expireTimeout);
    void unlock();
}

实现Lock接口:


public class RedisLock implements Lock {
    private String key;
    private StringRedisTemplate redisTemplate;

    public RedisLock(String key, StringRedisTemplate redisTemplate) {
        this.key = key;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean tryLock(long expireTimeout) {
        // 当前线程标识,用于标识当前锁属于哪一个线程,后面unlock释放锁会用到
        String threadFlag = getCurrentThreadFlag();
        // 使用SETNX获取锁
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, threadFlag, 10, TimeUnit.MINUTES);
        return Boolean.TRUE.equals(result);
    }

    @Override
    public void unlock() {
        // 当前线程标识,只有当前线程才能删除对应的锁(避免锁被其它线程误删)
        String threadFlagInRedis = redisTemplate.opsForValue().get(key);
        if (!Objects.equals(getCurrentThreadFlag(),threadFlagInRedis)) {
            // 锁已经由其它线程持有,代表当前锁已经释放(可能是因为超时释放),直接返回即可
            return;
        }
        // 如果是当前线程持有的锁,删除对应的记录以释放锁
        // 前面的查询操作和此处的删除操作不是原子性的,可能存在高并发安全问题,
        // 可以使用lua脚本实现原子性(实现方式可以参见Redisson源码,后续会简单介绍)
        Boolean result = redisTemplate.delete(key);
    }

    private String getCurrentThreadFlag() {
        return  String.valueOf(Thread.currentThread().getId());
    }

}

测试类:

@SpringBootTest(classes = RedisApplication.class)
@RunWith(SpringRunner.class)
public class RedisLockTest {
  @Autowired
  StringRedisTemplate redisTemplate;

  @Test
  public void test() {
    RedisLock redisLock = new RedisLock("customLockKey", redisTemplate);

    boolean lockSuccess = redisLock.tryLock(10);
    Assert.assertEquals(true, lockSuccess);

    // 不支持重入
    boolean lockAgainSuccess = redisLock.tryLock(10);
    Assert.assertEquals(false, lockAgainSuccess);

    redisLock.unlock();
  }
}
  1. 关于分布式锁误删与原子性的补充说明

在释放锁时,如果不加任何限制,直接删除,可能会导致一个线程误删另外一个线程的锁。因此,在上述代码中,在删除锁之前通过当前线程标识与锁中value的对比结果,判断锁的获取和释放是否为同一线程,以此避免误删问题。

同时,由于上述锁的获取与释放是两个操作,Java应用向Redis发送的请求不是原子性的,这就导致在高并发的极端情况下,也可能会存在误删问题。为了支持原子性,可以将锁的获取与锁的删除操作放到同一个lua脚本中,然后统一请求Redis。Redisson便是基于这种方式,详见后文

二、Redisson分布式锁的简单使用

Redisson简化了Redis在分布式环境下的使用,就分布式锁而言,Redisson自带的RLock可以完美平替上述自定义的Redis分布式锁。下面介绍一下Redisson分布式锁的简单使用。

首先,引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.20.0</version>
</dependency>

随后,定义并使用RLock

@SpringBootTest(classes = RedisApplication.class)
@RunWith(SpringRunner.class)
public class RedissonLockTest {
    @Test
    public void test() {
        RLock redisLock = getLock("customLockKey");

        boolean lockSuccess = redisLock.tryLock();
        Assert.assertEquals(true, lockSuccess);

        // 支持重入
        boolean lockAgainSuccess = redisLock.tryLock();
        Assert.assertEquals(true, lockAgainSuccess);

        redisLock.unlock();

    }

    private static RLock getLock(String key) {
        RedissonClient redissonClient = getRedissonClient();
        return redissonClient.getLock(key);
    }

    private static RedissonClient getRedissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        return Redisson.create(config);
    }
}

自定义的Redis分布式锁不同,Redisson的RLock从一开始就支持重入,即一个线程可以同时获取同一把锁多次。有关Redisson分布式锁的可重入原理详见后文

三、Redisson分布式锁的可重入原理

Redisson使用lua脚本实现了Redis锁,其源码如下所示:

public class RedissonLock extends RedissonBaseLock {

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
}

为了更便于观察,将源码中的lua脚本提取出来,如下所示:

  1. 获取锁lua脚本
if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
return redis.call('pttl', KEYS[1]);
  1. 释放锁lua脚本
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
else 
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end; 
return nil;

可以看到,与自定义的Redis分布式锁不同,Redisson锁在Redis中的记录使用Hash结构,Hash结构中除了存放线程标识外,还存放了一个重入量(这一点和ReentrantLock很像)。 上述加解锁过程中的重入量是Redisson支持可重入性的关键:

  • 当同一线程尝试获取锁时,重入量加1。
  • 当同一线程释放锁时,重入量减1。
  • 重入量的加减都会刷新锁的过期时间。
  • 当重入量为0时,代表锁已释放,就可以删除了。

至此,有关Redisson分布式锁的基本使用方法和可重入性的原理已经介绍完了,有关Redisson分布式锁以及Redisson其它更多内容,可以继续查看官方wiki及源码深入了解。

参考文档

  1. Redis官方网站
  2. 黑马程序员Redis入门到实战教程