澳门金沙vipRedis实现分布式锁的方法示例

RedLock 的实现

关于安全性的讨论

关于RedLock的安全性问题, Martin Kleppmann和作者antirez进行了一些讨论:

  • Martin Kleppmann: How to do distributed
    locking
  • antirez:[Is Redlock
    safe?](http://antirez.com/news/101)

关于这场讨论的分析可以参考:

  • 基于Redis的分布式锁到底安全吗?

客户端A获取锁,后发线程挂起了。时间大于锁的过期时间。
锁过期后,客户端B获取锁。 客户端A恢复以后,处理完相关事件,向redis发起
del命令。锁被释放
客户端C获取锁。这个时候一个系统中同时两个客户端持有锁。

RedLock

基于单点的分布式锁无法解决redis故障的问题.
为了保证redis的可用性我们通常采用主从备份的方法, 即
使用一个master实例和至少一个slave实例.

当有写入请求时先写入master然后写入到所有slave,
当master实例故障时选择一个slave实例升级为master实例继续提供服务.

其中存在的问题是, 写入master和写入slave存在时间差.
若线程A成功将锁记录写入了master, 随后在同步写入slave之前,
master故障转移到slave.

因为slave(新master)中没有锁记录, 因此线程B也可以成功加锁,
因此可能出现A和B同时持有锁的错误.

为了解决redis失效可能造成的问题,
redis的作者antirez提出了RedLock实现方案:

  1. 客户端获取当前时间

  2. 客户端尝试获取N个节点的锁, 每个节点使用相同的key和value.
    请求超时时间要远小于锁超时时间, 避免在节点或者网络故障时浪费时间.

  3. 客户端计算在加锁时消耗的时间,
    只有客户端成功获得超过一半节点的锁且总时间小于锁超时间时才能成功加锁.
    客户端持有锁的时间为锁超时时间减去加锁消耗的时间.

  4. 澳门金沙vip,若获取锁失败则访问所有节点, 发起释放锁的请求.

释放锁时需要向所有Redis节点发出释放锁的请求,
原因在于可能某个Redis实例中成功写入了锁记录, 但是没有响应没有到达客户端.

为了保证所有锁记录都被正确释放, 所以需要向所有Redis实例发送释放请求.

注意了以上细节,一个单redis节点的分布式锁就达成了。

unlock

解锁过程相对简单:

@Override
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

unlockInnerAsync方法实现了具体的解锁逻辑:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁,可能锁已被超时释放
                "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁被释放的消息
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 锁的持有者不是自己,抛出异常
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 自己持有锁,因为锁是可重入的将计数器减1
            "if (counter > 0) then " + // 计数器大于0,锁未被完全释放,刷新锁过期时间
                "redis.call('pexpire', KEYS[1], ARGV[2]); " + 
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " + // 锁被完全释放,删除锁记录,发布锁被释放的消息
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

总结

基于单点Redis的分布式锁

Redis实现分布式锁的原理非常简单,
节点在访问共享资源前先查询redis中是否有该资源对应的锁记录,
若不存在锁记录则写入一条锁记录(即获取锁)随后访问共享资源.
若节点查询到redis中已经存在了资源对应的锁记录, 则放弃操作共享资源.

下面给出一个非常简单的分布式锁示例:

import redis.clients.jedis.Jedis;

import java.util.Random;
import java.util.UUID;


public class MyRedisLock {

    private Jedis jedis;

    private String lockKey;

    private String value;

    private static final Integer DEFAULT_TIMEOUT = 30;

    private static final String SUFFIX = ":lock";

    public MyRedisLock(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean acquire(String key, long time) throws InterruptedException {
        Long outdatedTime = System.currentTimeMillis() + time;
        lockKey = key + SUFFIX;
        while (true) {
            if (System.currentTimeMillis() >= outdatedTime) {
                return false;
            }
            value = UUID.randomUUID().toString(); // 1
            return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
        }
    }

    public boolean check() {
        return value != null && value.equals(jedis.get(lockKey)); // 3
    }

    public boolean release() {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
    }
}

加锁后所有对共享资源的操作都应该先检查当前线程是否仍持有锁。

在分布式锁的实现中有几点需要注意:

  1. 加锁过程:
    1. 锁的过期时间应设置到redis中,保证在加锁客户端故障的情况下锁可以被自动释放
    2. 使用set key value EX seconds NX命令进行加锁,不要使用setnx和expire两个命令加锁。
      若setnx执行成功而expire失败(如执行setnx后客户端崩溃),则可能造成死锁。
    3. 锁记录的值不能使用固定值。 使用固定值可能导致严重错误:
      线程A的锁因为超时被释放, 随后线程B成功加锁。
      B写入的锁记录与A的锁记录没有区别,
      因此A在检查时会误判为自己仍持有锁。
  2. 解锁过程:
    1. 解锁操作使用lua脚本执行get和del两个操作,为了保证两个操作的原子性。若两个操作不具有原子性则可能出现错误时序:
      线程A执行get操作判断自己仍持有锁 -> 锁超时释放 ->
      线程B成功加锁 ->
      线程A删除锁记录(线程A认为删除了自己的锁记录,实际上删除了线程B的锁记录)。

上文只是提供了简单示例,还有一些重要功能没有实现:

  1. 阻塞加锁:可以使用redis的发布订阅功能,获取锁失败的线程订阅锁被释放的消息再次尝试加锁
  2. 无限期锁:应写入有TTL的锁记录,设置定时任务在锁失效前刷新锁过期的时间。这种方式可以避免持有锁的线程崩溃导致的死锁
  3. 可重入锁(持有锁的线程可以再次加锁):示例中持有锁的线程无法对同一个资源再次加锁,即不可重入锁。实现可重入锁需要锁记录由(key:资源标记,
    value:持有者标记)的键值对结构变为(key:资源标记, field:持有者标记,
    value:计数器)这样的hash结构。持有锁的线程每次重入锁计数器加1,每次释放锁计数器减1,计数器为0时删除锁记录。

总结来看实现Redis分布式锁有几点需要注意:

  1. 加解锁操作应保证原子性,避免多个线程同时操作出现异常
  2. 应考虑进程崩溃、Redis崩溃、操作成功执行但未收到成功响应等异常状况,避免死锁
  3. 解锁操作必须避免 某个线程释放了不属于自己的锁 的异常
SET resource_name my_random_value NX PX 30000

Redis数据库基于内存,具有高吞吐量、便于执行原子性操作等特点非常适合开发对一致性要求不高的锁服务。

锁的释放必须使用lua脚本,保证操作的原子性。锁的释放包含了get,判断,del三个步骤。如果不能保证三个步骤的原子性,分布式锁就会有并发问题。

本文介绍了简单分布式锁、Redisson分布式锁的实现以及解决单点服务的RedLock分布式锁概念。

这样由于Master的宕机,造成了同时多人持有锁。如果你的系统可用接受短时时间内,有多人持有锁。这个简单的方案就能解决问题。

Redisson

这里我们以基于Java的Redisson为例讨论一下成熟的Redis分布式锁的实现。

redisson实现了java.util.concurrent.locks.Lock接口,可以像使用普通锁一样使用redisson:

RLock lock = redisson.getLock("key"); 
lock.lock(); 
try {
    // do sth.
} finally {
    lock.unlock(); 
}

分析一下RLock的实现类org.redisson.RedissonLock:

锁的释放:

在多线程开发中我们使用锁来避免线程争夺共享资源。在分布式系统中,程序在多个节点上运行无法使用单机锁来避免资源竞争,因此我们需要一个锁服务来避免多个节点上的进程争夺资源。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

加锁操作

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

再看等待加锁的方法lockInterruptibly:

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

lockInterruptibly
方法会尝试获取锁,若获取失败则会订阅释放锁的消息。收到锁被释放的通知后再次尝试获取锁,直到成功或者超时。

接下来分析tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId)); // 调用异步获得锁的实现,使用get(future)实现同步
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 设置了超时时间
    if (leaseTime != -1) {
        // tryLockInnerAsync 加锁成功返回 null, 加锁失败在 Future 中返回锁记录剩余的有效时间
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 未设置超时时间,尝试获得无限期的锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 避免对共享资源操作完成前锁就被释放掉,定期刷新锁失效的时间
                // 默认锁失效时间的三分之一即进行刷新
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync中主要逻辑是无限期锁的实现,Redisson并非设置了永久的锁记录,而是定期刷新锁失效的时间。

这种方式避免了持有锁的进程崩溃无法释放锁导致死锁。

真正实现获取锁逻辑的是tryLockInnerAsync方法:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(
        getName(),
        LongCodec.INSTANCE, 
        command,
          "if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁
              "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 写入锁记录, 锁记录是一个hash; key:共享资源名称, field:锁实例名称(Redisson客户端ID:线程ID), value: 1(value是一个计数器,记录当前线程获取该锁的次数,实现可重入锁)
              "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置锁记录过期时间
              "return nil; " +
          "end; " +
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若当前线程已经持有该资源的锁
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将锁计数器加1, 
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return nil; " +
          "end; " +
          "return redis.call('pttl', KEYS[1]);", // 资源已被其它线程加锁,加锁失败。获取锁剩余生存时间后返回
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上述操作使用eval命令执行lua脚本保证了操作的原子性。

了解了Redis分布式的实现以后,其实觉得大多数的分布式系统其实原理很简单,但是为了保证分布式系统的可靠性需要注意很多的细节,琐碎异常。

Redis是一致性较低的数据库,若对锁服务的一致性要求较高建议使用zookeeper等中间件开发锁服务。

单机版分布式锁 SETNX

使用 setNX 命令,保证查询和写入两个步骤是原子的

但是RedLock就一定安全么?我还会写一篇文章来讨论这个问题。敬请大家期待。

如果某master节点故障之后,回复的时间间隔应当大于锁的有效时间。

之前我们使用的定时任务都是只部署在了单台机器上,为了解决单点的问题,为了保证一个任务,只被一台机器执行,就需要考虑锁的问题,于是就花时间研究了这个问题。到底怎样实现一个分布式锁呢?

几个细节需要注意:

 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end

锁的本质就是互斥,保证任何时候能有一个客户端持有同一个锁,如果考虑使用redis来实现一个分布式锁,最简单的方案就是在实例里面创建一个键值,释放锁的时候,将键值删除。但是一个可靠完善的分布式锁需要考虑的细节比较多,我们就来看看如何写一个正确的分布式锁。

相关文章