新闻中心
你的位置:kaiyun在线登录网址 > 新闻中心 >散布式:这里的散布式指的是散布式系统kaiyun体育,触及到好多时刻和表面,包括CAP 表面、散布式存储、散布式事务、散布式锁...
散布式系统是由一组通过收罗进行通讯、为了完成共同的任务而连络使命的计较机节点构成的系统。
散布式系统的出现是为了用低价的、普通的机器完成单个计较机无法完成的计较、存储任务。其策画是诳骗更多的机器,处理更多的数据。
锁:对对,等于你想的阿谁,Javer 学的第一个锁应该等于 synchronizedJava 低级口试问题,来拼写下 赛克瑞纳挨日的
从锁的使用场景有来看下边这 3 种锁:
线程锁:synchronized 是用在方法或代码块中的,咱们把它叫『线程锁』,线程锁的好意思满其实是靠线程之间分享内存好意思满的,说白了等于内存中的一个整型数,有酣畅、上锁这类景况,比如 synchronized 是在对象头中的 Mark Word 有个锁景况标志,Lock 的好意思满类大部分都有个叫 volatile int state 的分享变量来作念景况标志。
程度锁:为了律例兼并操作系统中多个程度拜访某个分享资源,因为程度具有寂然性,各个程度无法拜访其他程度的资源,因此无法通过 synchronized 等线程锁好意思满程度锁。比如说,咱们的兼并个 linux 劳动器,部署了好几个 Java 神态,有可能同期拜访或操作劳动器上的换取数据,这就需要程度锁,一般不错用『文献锁』来达到程度互斥。
散布式锁:随着用户越来越多,咱们上了好多劳动器,正本有个定时给客户发邮件的任务,若是不加以律例的话,到点后每台机器跑一次任务,客户就会收到 N 条邮件,这就需要通过散布式锁来互斥了。
书面解释:散布式锁是律例散布式系统或不同系统之间共同拜访分享资源的一种锁好意思满,若是不同的系统或兼并个系统的不同主机之间分享了某个资源时,往往需要互斥来督察相互侵犯来保证一致性。
知谈了什么是散布式锁,接下来就到了时刻选型要津
二、散布式锁要怎样搞要好意思满一个散布式锁,咱们一般取舍集群机器都不错操作的外部系统,然后各个机器都去这个外部系统恳求锁。
这个外部系融合般需要称心如下条目智商胜任:
互斥:在职意时刻,只可有一个客户端能执有锁。 督察死锁:即使有一个客户端在执有锁的时间崩溃而莫得主动解锁,也能保证后续其他客户端能加锁。是以锁一般要有一个过时时候。 独占性:解铃还须系铃东谈主,加锁息争锁必须是兼并个客户端,一把锁只可有一把钥匙,客户端我方的锁不可被别东谈主给解开,固然也不可去开别东谈主的锁。 容错:外部系统不可太“脆弱”,要保证外部系统的正常运行,客户端才不错加锁息争锁。我以为不错这样类比:
好多商贩要租用某个仓库,兼并时刻,只可给一个商贩租用,且只可有一把钥匙,还得有固定的“租期”,到期后要回收的,固然最紧迫的是仓库门不可坏了,要不锁都锁不住。这不等于散布式锁吗?
感触我方确切个爱时刻爱生存的圭臬猿~~
其实锁,本色上等于用来进行防重操作的(数据一致性),像查询这种幂等操作,就不需要费这劲
奏凯上论断:
散布式锁一般有三种好意思满方法:1. 数据库乐不雅锁;2. 基于 Redis 的散布式锁;3. 基于 ZooKeeper 的散布式锁。
但为了追求更好的性能,咱们宽泛会取舍使用 Redis 或 Zookeeper 来作念。
想必也有可爱问为什么的同学,那数据库客不雅锁怎样就性能不好了?
使用数据库乐不雅锁,包括主键防重,版块号律例。然而这两种方法各故意弊。
使用主键突破的计谋进行防重,在并发量格外高的情况下对数据库性能会有影响,尤其是应用数据表和主键突破表在一个库的时候,弘扬愈加显然。还有等于在 MySQL 数据库中聘用主键突破防重,在大并发情况下有可能会形成锁表步地,相比好的倡导是在圭臬中出产主键进行防重。
使用版块号计谋
这个计谋源于 MySQL 的 MVCC 机制,使用这个计谋其实本人莫得什么问题,唯独的问题等于对数据表侵入较大,咱们要为每个表设想一个版块号字段,然后写一条判断 SQL 每次进行判断。
第三趴,编码
三、基于 Redis 的散布式锁其实 Redis 官网照旧给出了好意思满:https://redis.io/topics/distlock,说多样竹素和博客用了多样技能去用 Redis 好意思满散布式锁,建议用 Redlock 好意思满,这样更方法、更安全。咱们循序渐进来看
咱们默许指定环球用的是 Redis 2.6.12 及更高的版块,就不再去讲 setnx、expire 这种了,奏凯 set 敕令加锁
set kaiyun体育key value[expiration EX seconds|PX milliseconds] [NX|XX]
eg:
SET resource_name my_random_value NX PX 30000
SET 敕令的活动不错通过一系列参数来修改
EX second :诞生键的过时时候为 second 秒。SET key value EX second 遵循等同于 SETEX key second value 。 PX millisecond :诞生键的过时时候为 millisecond 毫秒。SET key value PX millisecond 遵循等同于 PSETEX key millisecond value 。 NX :只在键不存在时,才对键进行诞生操作。SET key value NX 遵循等同于 SETNX key value 。 XX :只在键照旧存在时,才对键进行诞生操作。这条提示的敬爱:当 key——resource_name 不存在时创建这样的key,设值为 my_random_value,并诞生过时时候 30000 毫秒。
别看这干了两件事,因为 Redis 是单线程的,这一条提示不会被打断,是以是原子性的操作。
Redis 好意思满散布式锁的主要要津:
指定一个 key 行为锁象征,存入 Redis 中,指定一个 唯独的标志 行为 value。 当 key 不存在时智商诞生值,确保兼并时候唯有一个客户端程度得到锁,称心 互斥性 特质。 诞生一个过时时候,督察因系统特别导致没能删除这个 key,称心 防死锁 特质。 当处理完业务之后需要断根这个 key 来开释锁,断根 key 时需要校验 value 值,需要称心 解铃还须系铃东谈主 。诞生一个立地值的敬爱是在解锁时候判断 key 的值和咱们存储的立地数是不是相通,相通的话,才是我方的锁,奏凯 del 解锁就行。
固然这个两个操作要保证原子性,是以 Redis 给出了一段 lua 剧本(Redis 劳动器会单线程原子性履行 lua 剧本,保证 lua 剧本在处理的过程中不会被纵情其它请求打断。):
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end问题:
咱们先抛出两个问题想考:
获取锁时,过时时候要诞生若干适合呢?
预估一个适合的时候,其实没那么容易,比如操作资源的时候最慢可能要 10 s,而咱们只诞生了 5 s 就过时,那就存在锁提前过时的风险。这个问题先记下,咱们先看下 Javaer 要怎样在代码顶用 Redis 锁。
容错性如何保证呢?
Redis 挂了怎样办,你可能会说上主从、上集群,但也会出现这样的顶点情况,当咱们上锁后,主节点就挂了,这个时候还没来的急同步到从节点,主从切换后锁如故丢了
带着这两个问题,咱们接着看
Redisson 好意思满代码redisson 是 Redis 官方的散布式锁组件。GitHub 地址:https://github.com/redisson/redisson
Redisson 是一个在 Redis 的基础上好意思满的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的散布式的 Java 常用对象,还好意思满了可重入锁(Reentrant Lock)、公谈锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了很多散布式劳动。Redisson 提供了使用 Redis 的最毛糙和最毛糙的方法。Redisson 的倡导是促进使用者对 Redis 的柔和分离(Separation of Concern),从而让使用者粗略将元气心灵更聚首地放在处理业务逻辑上。
redisson 当今照旧很浩大了,github 的 wiki 也很详备,散布式锁的先容奏凯戳 Distributed locks and synchronizers
Redisson 营救单点模式、主从模式、哨兵模式、集群模式,仅仅建树的不同,咱们以单点模式来看下怎样使用,代码很毛糙,都照旧为咱们封装好了,奏凯拿来用就好,详备的demo,我放在了 github: starfish-learn-redisson 上,这里就不一步步来了
RLock lock = redisson.getLock("myLock");
RLock 提供了多样锁方法,咱们来解读下这个接口方法,
注:代码为 3.16.2 版块,不错看到剿袭自 JDK 的 Lock 接口,和 Reddsion 的异步锁接口 RLockAsync(这个咱们先不磋商)
RLock
public interface RLock extends Lock, RLockAsync { /** * 获取锁的名字 */ String getName(); /** * 这个叫结尾锁操作,暗意该锁不错被中断 假如A和B同期调这个方法,A获取锁,B为获取锁,那么B线程不错通过 * Thread.currentThread().interrupt(); 方法确实中断该线程 */ void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException; /** * 这个应该是最常用的,尝试获取锁 * waitTimeout 尝试获取锁的最大恭候时候,卓绝这个值,则认为获取锁失败 * leaseTime 锁的执随机候,卓绝这个时候锁会自动失效(值应诞生为大于业务处理的时候,确保在锁有用期内业务能处理完) */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 锁的有用期诞生为 leaseTime,过时后自动失效 * 若是 leaseTime 诞生为 -1, 暗意不主动过时 */ void lock(long leaseTime, TimeUnit unit); /** * Unlocks the lock independently of its state */ boolean forceUnlock(); /** * 检讨是否被另一个线程锁住 */ boolean isLocked(); /** * 检讨现时列线程是否执有该锁 */ boolean isHeldByCurrentThread(); /** * 这个就明证实,检讨指定线程是否执有锁 */ boolean isHeldByThread(long threadId); /** * 复返现时列程执有锁的次数 */ int getHoldCount(); /** * 复返锁的剩余时候 * @return time in milliseconds * -2 if the lock does not exist. * -1 if the lock exists but has no associated expire. */ long remainTimeToLive(); }
Demo
等于这样毛糙,Redisson 照旧作念好了封装,使用起来 so easy,若是使用主从、哨兵、集群这种也仅仅建树不同。
道理
看源码小 tips,最佳是 fork 到我方的仓库,然后拉到腹地,边看边扫视,然后提交到我方的仓库,也方便之后再看,不想这样难题的,也不错奏凯看我的 Jstarfish/redisson
先看下 RLock 的类干系
随着源码,不错发现 RedissonLock 是 RLock 的奏凯好意思满,亦然咱们加锁、解锁操作的中枢类
加锁
主要的加锁方法就下边这两个,区别也很毛糙,一个有恭候时候,一个莫得,是以咱们挑个复杂的看(源码包含了另一个的绝大部分)
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; void lock(long leaseTime, TimeUnit unit);
RedissonLock.tryLock
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 获取等锁的最万古候 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); //取适现时列程id(判断是否可重入锁的枢纽) long threadId = Thread.currentThread().getId(); // 【中枢点1】尝试获取锁,若复返值为null,则暗意已获取到锁,复返的ttl等于key的剩孑遗活时候 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } // 还不错容忍的恭候时长 = 获取锁能容忍的最大恭候时长 - 履行完上述操作经过的时候 time -= System.currentTimeMillis() - current; if (time <= 0) { //等不到了,奏凯复返失败 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); /** * 【中枢点2】 * 订阅解锁音信 redisson_lock__channel:{$KEY},并通过await方法遏制恭候锁开释,管制了无效的锁恳求奢华资源的问题: * 基于信息量,当锁被其它资源占用时,现时列程通过 Redis 的 channel 订阅锁的开释事件,一朝锁开释会发音信告知待恭候的线程进行竞争 * 当 this.await复返false,评释恭候时候照旧超出获取锁最大恭候时候,取消订阅并复返获取锁失败 * 当 this.await复返true,参加轮回尝试获取锁 */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //await 方法里面是用CountDownLatch来好意思满遏制,获取subscribe异步履行的终结(应用了Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } // ttl 不为空,暗意照旧有这样的key了,只可遏制恭候 try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 来个死轮回,陆续尝试着获取锁 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } currentTime = System.currentTimeMillis(); /** * 【中枢点3】字据锁TTL,调治遏制恭候时长; * 1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让现时列程遏制一段时候,幸免在while轮回中往往请求获锁; * 当其他线程开释了占用的锁,会播送解锁音信,监听器选择解锁音信,并开释信号量,最终会叫醒遏制在这里的线程 * 2、该Semaphore的release方法,会在订阅解锁音信的监听器音信处理方法org.redisson.pubsub.LockPubSub#onMessage调用; */ //调用信号量的方法来遏制线程,时长为锁恭候时候和租期时候中较小的阿谁 if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 获取到锁或者抛出中断特别,退订redisson_lock__channel:{$KEY},不再柔息争锁事件 unsubscribe(subscribeFuture, threadId); } }
接着看扫视中提到的 3 个中枢点
中枢点1-尝试加锁:RedissonLock.tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // leaseTime != -1 评释没过时 if (leaseTime != -1) { // 实质是异步履行加锁Lua剧本 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 不然,照旧过时了,传参变为新的时候(续期后) ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 续期 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
异步履行加锁 Lua 剧本:RedissonLock.tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 1.若是缓存中的key不存在,则履行 hincrby 敕令(hincrby key UUID+threadId 1), 设值重入次数1 // 然后通过 pexpire 敕令诞生锁的过时时候(即锁的租约时候) // 复返空值 nil ,暗意获取锁告捷 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 若是key照旧存在,而且value也匹配,暗意是现时列程执有的锁,则履行 hincrby 敕令,重入次数加1,而且诞生失效时候 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //若是key照旧存在,然而value不匹配,评释锁照旧被其他线程执有,通过 pttl 敕令获取锁的剩孑遗活时候并复返,至此获取锁失败 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }KEYS[1] 等于 Collections.singletonList(getName()),暗意散布式锁的key; ARGV[1] 等于internalLockLeaseTime,即锁的租约时候(执有锁的有用时候),默许30s; ARGV[2] 等于getLockName(threadId),是获取锁时set的唯独值 value,即UUID+threadId
看门狗续期:RedissonBaseLock.scheduleExpirationRenewal
// 基于线程ID定时革新和续期 protected void scheduleExpirationRenewal(long threadId) { // 新建一个ExpirationEntry纪录线程重入计数 ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 现时进行确现时列程重入加锁 oldEntry.addThreadId(threadId); } else { // 现时进行确现时列程初次加锁 entry.addThreadId(threadId); // 初次新建ExpirationEntry需要触发续期方法,纪录续期的任务句柄 renewExpiration(); } } // 处理续期 private void renewExpiration() { // 字据entryName获取ExpirationEntry实例,若是为空,评释在cancelExpirationRenewal()方法照旧被移除,一般是解锁的时候触发 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 新建一个定时任务,这个等于看门狗的好意思满,io.netty.util.Timeout是Netty结合时候轮使用的定时任求实例 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 这里是相通外面的阿谁逻辑, ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } // 获取ExpirationEntry中首个线程ID,若是为空评释调用过cancelExpirationRenewal()方法清空执有的线程重入计数,一般是锁照旧开释的场景 Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 向Redis异步发送续期的敕令 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { // 抛出特别,续期失败,只打印日记和奏凯圮绝任务 if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 复返true解说续期告捷,则递归调用续期方法(再行革新我方),续期失败评释对应的锁照旧不存在,奏凯复返,不再递归 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); }// 这里的履行频率为leaseTime编削为ms单元下的三分之一,由于leaseTime运转值为-1的情况下才会参加续期逻辑,那么这里的履行频率为lockWatchdogTimeout的三分之一 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // ExpirationEntry实例执有革新任求实例 ee.setTimeout(task); }
中枢点2-订阅解锁音信:RedissonLock.subscribe
protected final LockPubSub pubSub; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); //在构造器中运升沉pubSub,随着这几个get方法会发现他们都是在构造器中运升沉的,在PublishSubscribeService中会有 // private final AsyncSemaphore[] locks = new AsyncSemaphore[50]; 这样一段代码,运升沉了一组信号量 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } protected RFuture<RedissonLockEntry> subscribe(long threadId) { return pubSub.subscribe(getEntryName(), getChannelName()); } // 在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>终结,一个信号量神志的锁和订阅方法重入计数器 public RFuture<E> subscribe(String entryName, String channelName) { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); RPromise<E> newPromise = new RedissonPromise<>(); semaphore.acquire(() -> { if (!newPromise.setUncancellable()) { semaphore.release(); return; } E entry = entries.get(entryName); if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.acquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } RedisPubSubListener<Object> listener = createListener(channelName, value); service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); }); return newPromise; }
中枢点 3 相比毛糙,就不说了
解锁
RedissonLock.unlock()
@Override public void unlock() { try { // 获取现时调用解锁操作的线程ID get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { // IllegalMonitorStateException一般是A线程加锁,B线程解锁,里面判断线程景况不一致抛出的 if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } }
RedissonBaseLock.unlockAsync
@Override public RFuture<Void> unlockAsync(long threadId) { // 构建一个终结RedissonPromise RPromise<Void> result = new RedissonPromise<>(); // 复返的RFuture若是执有的终结为true,评释解锁告捷,复返NULL评释线程ID特别,加锁息争锁的客户端线程不是兼并个线程 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消看门狗的续期任务 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }
RedissonLock.unlockInnerAsync
// 确实的里面解锁的方法,履行解锁的Lua剧本 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //若是散布式锁存在,然而value不匹配,暗意锁照旧被其他线程占用,无权开释锁,那么奏凯复返空值(解铃还须系铃东谈主) "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + //若是value匹配,则等于现时列程占有散布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //重入次数减1后的值若是大于0,暗意散布式锁有重入过,那么只可更新失效时候,还不可删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + //重入次数减1后的值若是为0,这时就不错删除这个KEY,并发布解锁音信,复返1 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", //这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
我只列出了一小部分代码,更多的内容如故得我方来源
从源码中,咱们不错看到 Redisson 帮咱们管制了抛出的第一个问题:失效时候诞生多万古候为好?
Redisson 提供了看门狗,每得到一个锁时,只诞生一个很短的超通常候,同期起一个线程在每次将近到超通常候时去刷新锁的超通常候。在开释锁的同期终结这个线程。
然而莫得管制节点挂掉,丢失锁的问题,接着来~
四、RedLock咱们上边先容的散布式锁,在某些顶点情况下仍然是有颓势的
客户端万古候内遏制导致锁失效
客户端 1 得到了锁,因为收罗问题或者 GC 等原因导致万古候遏制,然后业务圭臬还没履行完锁就过时了,这时候客户端 2 也能正常拿到锁,可能会导致线程安全的问题。
Redis 劳动器时钟漂移
若是 Redis 劳动器的机器时候发生了上前越过,就会导致这个 key 过早超时失效,比如说客户端 1 拿到锁后,key 还莫得到过时时候,然而 Redis 劳动器的时候比客户端快了 2 分钟,导致 key 提前就失效了,这时候,若是客户端 1 还莫得开释锁的话,就可能导致多个客户端同期执有兼并把锁的问题。
单点实例安全问题
若是 Redis 是单机模式的,若是挂了的话,那所有的客户端都获取不到锁了,假定你是主从模式,但 Redis 的主从同步是异步进行的,若是 Redis 主宕机了,这个时候从机并莫得同步到这一把锁,那么机器 B 再次恳求的时候就会再次恳求到这把锁,这亦然问题
为了管制这些个问题 Redis 作家建议了 RedLock 红锁的算法,在 Redission 中也对 RedLock 进行了好意思满。
Redis 官网对 redLock 算法的先容简短如下:The Redlock algorithm
在散布式版块的算法里咱们假定咱们有 N 个 Redis master 节点,这些节点都是王人备寂然的,咱们不必任何复制或者其他隐含的散布式连络机制。之前咱们照旧描述了在 Redis 单实例下怎样安全地获取和开释锁。咱们确保将在每(N) 个实例上使用此方法获取和开释锁。在咱们的例子里面咱们诞生 N=5,这是一个相比合理的诞生,是以咱们需要在 5 台机器或者捏造机上头运行这些实例,这样保证他们不会同期都宕掉。为了取到锁,客户端应该履行以下操作:
获取现时 Unix 时候,以毫秒为单元。
轮番尝试从 5 个实例,使用换取的 key 和具有唯独性的 value(举例UUID)获取锁。当向 Redis 请求获取锁时,客户端应该诞生一个尝试从某个 Reids 实例获取锁的最大恭候时候(卓绝这个时候,则立马商榷下一个实例),这个超通常候应该小于锁的失效时候。举例你的锁自动失效时候为 10 秒,则超通常候应该在 5-50 毫秒之间。这样不错幸免劳动器端 Redis 照旧挂掉的情况下,客户端还在死死地恭候反应终结。若是劳动器端莫得在端正时候内反应,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁。
客户端使用现时时候减去开端获取锁时候(要津1纪录的时候)就得到获取锁蹧跶的时候。当且仅当从大无数(N/2+1,这里是3个节点)的 Redis 节点都取到锁,而且使用的总耗时小于锁失效时候时,锁才算获取告捷。
若是取到了锁,key 的确实有用时候 = 有用时候(获取锁时诞生的 key 的自动超通常候) - 获取锁的总耗时(商榷各个 Redis 实例的总耗时之和)(要津 3 计较的终结)。
若是因为某些原因,最终获取锁失败(即莫得在至少 “N/2+1 ”个 Redis 实例取到锁或者“获取锁的总耗时”卓绝了“有用时候”),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例压根就莫得加锁告捷,这样不错督察某些节点获取到锁然而客户端莫得得到反应而导致接下来的一段时候不可被再行获取锁)。
回来下等于:
客户端在多个 Redis 实例上恳求加锁,必须保证大无数节点加锁告捷
管制容错性问题,部分实例特别,剩下的还能加锁告捷
大无数节点加锁的总耗时,要小于锁诞生的过时时候
多实例操作,可能存在收罗延伸、丢包、超时等问题,是以就算是大无数节点加锁告捷,若是加锁的鸠集耗时卓绝了锁的过时时候,那有些节点上的锁可能也照旧失效了,如故莫得道理的
开释锁,要向一齐节点发起开释锁请求
若是部分节点加锁告捷,但临了由于特别导致大部分节点没加锁告捷,就要开释掉所有的,各节点要保执一致
对于 RedLock,两位散布式大佬,Antirez 和 Martin 还进行过一场争论,感兴味的也不错望望
Config config1 = new Config(); config1.useSingleServer().setAddress("127.0.0.1:6379"); RedissonClient redissonClient1 = Redisson.create(config1); Config config2 = new Config(); config2.useSingleServer().setAddress("127.0.0.1:5378"); RedissonClient redissonClient2 = Redisson.create(config2); Config config3 = new Config(); config3.useSingleServer().setAddress("127.0.0.1:5379"); RedissonClient redissonClient3 = Redisson.create(config3); /** * 获取多个 RLock 对象 */ RLock lock1 = redissonClient1.getLock(lockKey); RLock lock2 = redissonClient2.getLock(lockKey); RLock lock3 = redissonClient3.getLock(lockKey); /** * 字据多个 RLock 对象构建 RedissonRedLock (最中枢的判袂就在这里) */ RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); try { /** * 4.尝试获取锁 * waitTimeout 尝试获取锁的最大恭候时候,卓绝这个值,则认为获取锁失败 * leaseTime 锁的执随机候,卓绝这个时候锁会自动失效(值应诞生为大于业务处理的时候,确保在锁有用期内业务能处理完) */ boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { //告捷得到锁,在这里处理业务 } } catch (Exception e) { throw new RuntimeException("aquire lock fail"); }finally{ //无论如何, 临了都要解锁 redLock.unlock(); }
最中枢的变化等于需要构建多个 RLock ,然后字据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建设在多个相互寂然的 Redis 环境之上的(为了鉴识不错叫为 Redission node),Redission node 节点既不错是单机模式(single),也不错是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不可跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 特别搭建多几套寂然的 Redission 节点。
RedissonMultiLock.tryLock
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // try { // return tryLockAsync(waitTime, leaseTime, unit).get(); // } catch (ExecutionException e) { // throw new IllegalStateException(e); // } long newLeaseTime = -1; if (leaseTime != -1) { if (waitTime == -1) { newLeaseTime = unit.toMillis(leaseTime); } else { newLeaseTime = unit.toMillis(waitTime)*2; } } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); //允许加锁失败节点个数甩掉(N-(N/2+1)) int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 遍历所有节点通过EVAL敕令履行lua加锁 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { // 对节点尝试加锁 if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { // 若是抛出这类特别,为了督察加锁告捷,然而反应失败,需要解锁所有节点 unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; } if (lockAcquired) { acquiredLocks.add(lock); } else { /* * 计较照旧恳求锁失败的节点是否照旧到达 允许加锁失败节点个数甩掉 (N-(N/2+1)) * 若是照旧到达, 就认定最终恳求锁失败,则莫得必要陆续从后头的节点恳求了 * 因为 Redlock 算法条目至少N/2+1 个节点都加锁告捷,才算最终的锁恳求告捷 */ if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } //计较 面前从各个节点获取锁照旧蹧跶的总时候,若是照旧等于最大恭候时候,则认定最终恳求锁失败,复返false if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { acquiredLocks.stream() .map(l -> (RedissonLock) l) .map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)) .forEach(f -> f.syncUninterruptibly()); } return true; }
参考与感谢
《Redis —— Distributed locks with Redis》
《Redisson —— Distributed locks and synchronizers》
慢谈 Redis 好意思满散布式锁 以及 Redisson 源码解析
解析Redisson中散布式锁的好意思满
- 2024/09/19kaiyun体育这背后是乳成品行业正受需求颓落、奶源富裕双重夹攻-kaiyun在线登录网址
- 2024/09/19kaiyun体育该基金将进行收益分派-kaiyun在线登录网址
- 2024/09/19kaiyun官方网站报37.155好意思元-kaiyun在线登录网址
- 2024/09/19kaiyun官方网站和当下的品牌在会员营销流于体式作念名义著作不同样-kaiyun在线登录网址
- 2024/09/19kaiyun官方网站“潜在买家在评估股权时-kaiyun在线登录网址