1.关于锁:
在单进程的系统中,当多个线程可以同时改变某个变量时,就需要对变量或代码块做同步。多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。
分布式最大的不同在于不是多线程而是多进程,而多个进程之间可能都不在同一台物理机上,所以这时候加锁,就需要有一个公共内存,比如Redis。
2.使用redis的setNX命令实现分布式锁
a.原理
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。redis的SETNX命令可以方便的实现分布式锁。
b.Redis基本命令解析
1)setNX(SET if Not eXists)
语法:
SETNX key value 设置成功,返回 1 ,设置失败,返回 0 。
2)getSET
语法:
GETSET key value 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。 key 不存在时,返回 nil 。
3.Redis锁:
基本锁:
原理:利用Redis的setnx如果不存在某个key则设置值,设置成功(返回1)则表示取得锁成功。如果释放锁的话,执行del操作。
缺点:如果该线程加锁后挂了(没有释放锁),则锁永远不会释放。
改进型:
改进:setnx后设置expire,保证即使获取锁的进程不主动释放锁,过一段时间后也能自动释放。
缺点:setnx与expire不是一个原子操作,可能执行完setnx该进程就挂了,则该值一直不会过期,则锁也永远不会释放了。
官方做法:利用Lua脚本,将setnx与expire变成一个原子操作,该方法可以保证原子性,但是操作起来比较麻烦。可供参考的实现库.:Redisson
再改进:
改进:在expire基础上,设置value为该锁的过期时间,多个线程都可以去判断该锁有没有过期,如果过期,则可以抢占锁。
缺点:还是锁过期的问题。如果两个线程a和b,a先获取到了锁,并设置了过期时间,但是a可能处理过程中被挂起了(直到锁过期了),这时候线程b检测到锁过期了,所以设置了新的过期时间,然后持有了锁。但是这时a线程处理完了,执行del操作时,会把b设置的值del。所以在每个线程释放锁时,还需要判断该值是不是自己设置的值。
再再改进:
改进:既要能自动过期,还要能被其他线程检测是否过期,还要在释放锁时,判断该值是否是自己设置的。所以在expire基础上,设置value为该锁的过期时间,并且要保证value唯一。
缺点:expire时间与代码执行时间的问题,还可能会存在问题,后续再观察下。
4.Redlock
Redlock是Redis的作者antirez给出的集群模式的Redis分布式锁,节点完全互相独立,不存在主从复制或者其他集群协调机制。这个目前还在研究,感兴趣的同学可以去看一下。
关于Redis分布式锁的安全性问题,在antirez给出Redlock算法后还和分布式系统专家Martin Kleppmann发生过一场争论,Martin认为Redlock则是个过重的实现,不管是为了正确性还是效率都不适用,感兴趣的同学可以了解一下。
Martin的文章:https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
中文分析:http://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA
5.代码
public class RedisLock { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class); private RedisClient redisClient; /** * Lock key path. */ private String lockKey; private String randomValue; public RedisLock(RedisClient redisClient, String lockKey) { this.redisClient = redisClient; this.lockKey = lockKey; } /** * 获取Redis锁 * * @return */ public boolean tryLock() { //尝试重新获取锁时间:10s int timeoutMsecs = RedisConstants.WAIT_LOCK_TIME; while (timeoutMsecs >= 0) { //失效时间 用来手动判断是否已失效 long expires = System.currentTimeMillis() + RedisConstants.EXPIRE_LOCK_TIME_MILLIS + 1; //redis的value:随机字符串(时间戳+三位随机数字字符串) 但是仍然不严谨,最好是客户端id+时间戳 String redisValueStr = String.valueOf(expires) + OpenApiCoreUtils.generateThreeDigitNumberStr(); LOGGER.info("Redis加锁,lockKey:" + lockKey + ",value:" + redisValueStr); if (redisClient.setnx(lockKey, redisValueStr) == 1) { //有效期三秒 redisClient.expire(lockKey, RedisConstants.EXPIRE_LOCK_TIME); randomValue = redisValueStr; LOGGER.info("加锁成功!"); return true; } String currentValueStr = redisClient.get(lockKey); LOGGER.info("Redis加锁setNX失败时,检查当前key:" + lockKey + ",当前值:" + currentValueStr); if (currentValueStr != null) { long oldExpireTime = Long.parseLong(currentValueStr) / 1000; LOGGER.info("Redis解析到过期时间:" + oldExpireTime); if (oldExpireTime < System.currentTimeMillis()) { //锁超时 String oldValueStr = redisClient.getSet(lockKey, redisValueStr); LOGGER.info("解析到旧key过期,尝试获取锁!"); //getSet是原子操作,但当多个线程同时访问时,可能其他线程又用getSet方法重置了oldValueStr的值 //所以通过判断当前线程get到的值和该线程写操作之前的值是否一致,来判断中间有没有其他线程进行过写操作 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { redisClient.expire(lockKey, RedisConstants.EXPIRE_LOCK_TIME); LOGGER.info("获取锁成功:key:" + lockKey + ",value:" + redisValueStr); return true; } LOGGER.info("竞争锁失败,等待重试!"); } } Random random = new Random(); //睡眠时间:80-120随机数,防止饥饿进程 int sleepTime = random.nextInt(40) + 80; timeoutMsecs -= sleepTime; try { Thread.sleep(sleepTime); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } } return false; } /** * Acquired lock release. */ public void unlock() { //从redis取值 String redisValue = redisClient.get(lockKey); if (redisValue != null && redisValue.equals(randomValue)) { redisClient.del(lockKey); } } }
6.效果
参考资料:
http://redis.cn/topics/distlock.html