图片 3

如何操作Redis和zookeeper实现分布式锁

5.使用zookeeper的创建节点node

// redis缓存key中的数据

通过一个while(true),在当前线程上进行阻塞等待,并通过一个计数器进行自减操作,防止永久等待。 

定时任务增加注解@Lockable:

@Around(“@annotation(com.records.aop.Lockable)”)

package com.ns.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
    /**
     * redis的key
     * @return
     */
    String value();
    /**
     * 持锁时间,单位毫秒,默认一分钟
     */
    long keepMills() default 60000;
    /**
     * 当获取失败时候动作
     */
    LockFailAction action() default LockFailAction.GIVEUP;

    public enum LockFailAction{
        /**
         * 放弃
         */
        GIVEUP,
        /**
         * 继续
         */
        CONTINUE;
    }
    /**
     * 睡眠时间,设置GIVEUP忽略此项
     * @return
     */
    long sleepMills() default 1000;
}

6.使用zookeeper的创建临时序列节点

//确认此注解是用在方法上

   这是使用最多的实现方式:setnx的目的同上,用来实现尝试获取锁以及判断是否获取到锁的原子性,del删除key来释放锁,与上面不同的是,使用redis自带的expire命令来防止死锁(可能出现某个客户端获得了锁,但是crash了,永不释放导致死锁)。这算是一种比较简单但粗暴的实现方式:因为,不管实际的情况如何,当你设置expire之后,它一定会在那个时间点删除key。如何当时某个客户端已获得了锁,正在执行临界区内的代码,但执行时间超过了expire的时间,将会导致另一个正在竞争该锁的客户端也获得了该锁,这个问题下面还会谈到。

4.使用redis的getset()来实现分布式锁

// 过期时间(秒),默认为一分钟

   劣势:竞态条件(race condition),死锁。

1.分布式锁

2.使用redis的getset()

图片 1

定义注解类:

if (!result) {

图片 2

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

@Retention(RetentionPolicy.RUNTIME)

多节点的部署中,对锁的控制,参考:

分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务。

MethodSignature methodSignature = (MethodSignature) signature;

 

 @Lockable(key = "DistributedLock:dealExpireRecords") public void dealExpireRecords() { }

if (!(signature instanceof MethodSignature)) {

 
 这种实现方式得益于getset命令的原子性,从而有效得避免了竞态条件。并且,通过将比对锁的过期时间作为获取锁逻辑的一部分,从而避免了死锁。

如何操作Redis和zookeeper实现分布式锁

public Object distributeLock(ProceedingJoinPoint pjp) {

定义注解,标志切入点:

定义一个aop切面LockAspect,使用@Around处理所有注解为@Lockable的方法,通过连接点确认此注解是用在方法上,通过方法获取注解信息,使用setIfAbsent来判断是否获取分布式锁,如果没有获取分布式锁,直接返回;如果获取到分布式锁,通过expire设置过期时间,并调用指定方法。

log.error(“Lockable is method annotation!”);

 

2.分布式锁的实现方式

@Aspect

使用zookeeper创建节点node,如果创建节点成功,表示获取了此分布式锁;如果创建节点失败,表示此分布式锁已经被其他程序占用(多个程序同时创建一个节点node,只有一个能够创建成功)

}

LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'lock.foo'

# 获取锁
while lock != 1:
    now = int(time.time())
    lock_timeout = now + LOCK_TIMEOUT + 1
    lock = redis_client.setnx(lock_key, lock_timeout)
    if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
        break
    else:
        time.sleep(0.001)

# 已获得锁
do_job()

# 释放锁
now = int(time.time())
if now < lock_timeout:
    redis_client.delete(lock_key)
@Component@Slf4j@Aspectpublic class LockAspect { @Autowired private RedisTemplate redisTemplate; @Around("@annotation(com.records.aop.Lockable)") public Object distributeLock(ProceedingJoinPoint pjp) { Object resultObject = null; //确认此注解是用在方法上 Signature signature = pjp.getSignature(); if (!(signature instanceof MethodSignature)) { log.error("Lockable is method annotation!"); return resultObject; } MethodSignature methodSignature = (MethodSignature) signature; Method targetMethod = methodSignature.getMethod(); //获取注解信息 Lockable lockable = targetMethod.getAnnotation(Lockable.class); String key = lockable.key(); String value = lockable.value(); long expire = lockable.expire(); // 分布式锁,如果没有此key,设置此值并返回true;如果有此key,则返回false boolean result = redisTemplate.boundValueOps(key).setIfAbsent(value); if (!result) { //其他程序已经获取分布式锁 return resultObject; } //设置过期时间,默认一分钟 redisTemplate.boundValueOps(key).expire(expire, TimeUnit.SECONDS); try { resultObject = pjp.proceed(); //调用对应方法执行 } catch (Throwable throwable) { throwable.printStackTrace(); } return resultObject; }}

分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务。

以上有些代码只符合我现在的项目场景,根据实际需要进行调整

@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public @interface Lockable{ // redis缓存key String key(); // redis缓存key中的数据 String value() default ""; // 过期时间(秒),默认为一分钟 long expire() default 60;}

public class LockAspect {

图片 3

setnx(key,value) 如果key不存在,设置为当前key的值为value;如果key存在,直接返回。expire()来设置超时时间

private RedisTemplate redisTemplate;

二、setnx / del / getset

在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储,或者做全局XA事务,也可以借助消息中间件。通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。

resultObject = pjp.proceed(); //调用对应方法执行

图片 4

使用redis的setnx()和expire() 使用redis的getset()
使用zookeeper的创建节点node 使用zookeeper的创建临时序列节点

String value() default “”;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RedisLock implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);
    public static final String REDIS_LOCK = "RedisLock:";


    private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超时时间设置长一点
    private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超时时间设置长一点
    private String key;
    private RedisTemplate redisTemplate;

    public RedisLock(RedisTemplate redisTemplate,String key) {
        this.redisTemplate = redisTemplate;
        this.key = key;
    }

    /**
     * 等待锁的时间,单位为s
     *
     * @param key
     * @param timeout s
     * @param seconds
     */
    public boolean lock(String key, long timeout, TimeUnit seconds) {
        String lockKey = generateLockKey(key);
        long nanoWaitForLock = seconds.toNanos(timeout);
        long start = System.nanoTime();

        try {
            while ((System.nanoTime() - start) < nanoWaitForLock) {
                if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) {
                    redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暂设置为80s过期,防止异常中断锁未释放
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread());
                    }
                    return true;
                }
                TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加随机时间防止活锁
            }
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
            unlock();
        }
        return false;
    }

    public void unlock() {
        try {
            String lockKey = generateLockKey(key);
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.del(key.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }

    private String generateLockKey(String key) {
        return String.format(REDIS_LOCK + "%s", key);
    }

    public boolean lock() {
        return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS);
    }

    @Override
    public void close(){
        try {
            String lockKey = generateLockKey(key);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("release RedisLock[" + lockKey + "].");
            }
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }
}

此方法使redisTemplate.boundValueOps(key).getAndSet(value)的方法,如果返回空,表示获取了分布式锁;如果返回不为空,表示分布式锁已经被其他程序占用

}

package com.redis.aop;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.ns.annotation.RedisLock;
import com.ns.annotation.RedisLock.LockFailAction;
import com.ns.redis.dao.base.BaseRedisDao;
@Aspect
public class RedisLockAspect extends BaseRedisDao<String, Long>{
  private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);
  //execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock)

  @Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)")
  private void lockPoint(){}
  @Around("lockPoint()")
  public Object arround(ProceedingJoinPoint pjp) throws Throwable{
    MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
    Method method = methodSignature.getMethod();
    RedisLock lockInfo = method.getAnnotation(RedisLock.class);
    boolean lock = false;
    Object obj = null;
    while(!lock){
      long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
      lock = setNX(lockInfo.value(), timestamp);
      //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)
      long now = System.currentTimeMillis();
      if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
        //得到锁,执行方法,释放锁
        log.info("得到锁...");
        obj = pjp.proceed();
        //不加这一行,对于只能执行一次的定时任务,时间差上不能保证另一个一定正好放弃
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          delete(lockInfo.value());
        }
      }else{
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          log.info("稍后重新请求锁...");
          Thread.currentThread().sleep(lockInfo.sleepMills());
        }else{
          log.info("放弃锁...");
          break;
        }
      }
    }
    return obj;
  }
  public boolean setNX(String key,Long value){
    return valueOperations.setIfAbsent(key, value);
  }
  public long getLock(String key){
    return valueOperations.get(key);
  }
  public Long getSet(String key,Long value){
    return valueOperations.getAndSet(key, value);
  }
  public void releaseLock(String key){
    delete(key);
  }
}

3.使用redis的setnx()和expire()来实现分布式锁

redisTemplate.boundValueOps(key).expire(expire, TimeUnit.SECONDS);

  
用索引号为0的第一个比特位来表示锁定状态,其中:0表示未获得锁,1表示已获得锁。

使用zookeeper创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过watch来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……

return resultObject;

  
redis使用缓存作为分布式锁,性能非常强劲,在一些不错的硬件上,redis可以每秒执行10w次,内网延迟不超过1ms,足够满足绝大部分应用的锁定需求。

expire()来设置超时时间

 

1.使用redis的setnx()和expire()

在高并发的使用场景下,如何让redis里的数据尽量保持一致,可以采用分布式锁。以分布式锁的方式来保证对临界资源的互斥读写。

//获取注解信息