产品展示 Dynamic News

Curator分布式锁

发布于2025-02-19 17:37    文章作者:碧巧

分散式锁效劳宕机,ZooKeeper普通所以散群陈设,即使呈现ZooKeeper宕机,那末只需以后平常的效劳器超越散群的对折,依旧能够寻常供给效劳持有锁资本效劳器宕机,假设1台效劳器获得锁以后便宕机了, 那末便会致使其余效劳器没法再获得该锁. 便会形成逝世锁题目, 正在Curator中, 锁的疑息皆是保管正在姑且节面上, 借使持有锁资本的效劳器宕机, 那末ZooKeeper 便会移除它的疑息, 那时其余效劳器便能停止获得锁掌握。

zookeeper安设单机形式

http://www.javacui.com/opensource/445.html

SpringBoot散成Curator完成Zookeeper根本操纵

http://www.javacui.com/tool/615.html

SpringBoot散成Curator告竣Watch变乱监闻

http://www.javacui.com/tool/616.html

Zookeeper告终疏散式锁的体制

应用zk的姑且节面战有序节面,每一个线程获得锁便是正在zk建立1个姑且有序的节面,例如正在/lock/目次停。

创制节面乐成后,获得/lock目次停的全部姑且节面,再判定以后线程创办的节面能否是全部的节面的序号最小的节面。

假使以后线程缔造的节面是全部节面序号最小的节面,则觉得获得锁乐成。

例如以后线程获得到的节面序号为/lock/003,而后全部的节面列表为[/lock/001,/lock/002,/lock/003],则对于/lock/002那个节面加添1个事务监闻器。

倘使锁开释了,会叫醒停1个序号的节面,而后从头施行第3步,判定能否本身的节面序号是最小。

例如/lock/001开释了,/lock/002监闻到功夫,此节令面聚拢为[/lock/002,/lock/003],则/lock/002为最弁言号节面,获得到锁。

锁分类

InterProcessSemaphoreMutex:疏散式不行沉进排它锁

InterProcessMutex:分散式可沉进排它锁

InterProcessReadWriteLock:疏散式读写锁

InterProcessMultiLock:多沉同享锁,将多个锁当作单个真体办理的容器

InterProcessSemaphoreV2:同享旌旗灯号量

Shared Lock 疏散式非可沉进锁

民网天址:http://curator.apache.org/curator-recipes/shared-lock.html

InterProcessSemaphoreMutex是1种不行沉进的互斥锁,也便表示着便使是统一个线程也没法正在持有锁的环境停再次得到锁,因而须要注重,不行沉进的锁很简单正在少许环境致使逝世锁,譬如您写了1个递回。

Shared Reentrant Lockf疏散式可沉进锁

民网天址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

此锁能够沉进,然则沉进频频须要开释频频。

InterProcessMutex经由过程正在zookeeper的某途径节面停缔造姑且序列节面去完成疏散式锁,便每一个线程(跨经过的线程)获得统一把锁前,皆须要正在一样的途径停创立1个节面,节面实字由uuid+递加序列构成。而经由过程比照自己的序列数能否正在全部子节面的第一名,去判定能否乐成获得到了锁。当获得锁障碍时,它会加添watcher去监闻前1个节面的调动环境,而后停止期待形态。曲到watcher的变乱奏效将本身叫醒,大概超经常间同常前往。

Shared Reentrant Read Write Lock可沉进读写锁

民网天址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html

读写锁保卫1对于联系的锁,1个用于只读操纵,1个用于写操纵。只需不写锁,读锁能够被多个用户共时持有,而写锁是独有的。

读写锁听任从写锁落级为读锁,办法是先获得写锁,而后便能够获得读锁。然则,没法从读锁晋级到写锁。

Multi Shared Lock 多同享锁

民网天址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

多个锁看成1个锁,能够共时正在多个资本上添锁。1个维持多个锁对于象的容器。当挪用acquire()时,获得容器中全部的锁对于象,哀告朽败时,开释全部锁对于象。一样挪用release()也会开释全部的锁。

Shared Semaphore同享旌旗灯号量

民网天址:http://curator.apache.org/curator-recipes/shared-semaphore.html

1个计数的旌旗灯号量近似JDK的Semaphore,全部应用相反锁定途径的jvm中全部过程皆将达成过程间无限的租约。另外,那个旌旗灯号量年夜多是“公道的” - 每一个用户将依照央求的挨次得到租约。

有二种体例决意旌旗灯号号的最年夜租约数。1种是由用户指定的途径去决意最年夜租约数,1种是经由过程SharedCountReader去决意。

即使已应用SharedCountReader,则没有会停止里面查抄譬如A显示为有10个租约,过程B显示为有20个。所以,请保证全部过程中的全部真例皆应用相反的numberOfLeases值。

acuquire()办法前往的是Lease对于象,客户端正在应用完后必需要闭关该lease对于象(普通正在finally中停止闭关),不然该对于象会丧失。即使过程session丧失(如破产),该客户端具有的全部lease会被主动闭关,此时其余端不妨应用那些lease。

编码尝试

packagecom.example.springboot;importcom.example.springboot.tool.ZkConfiguration;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.locks.*;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importjava.util.Arrays;importjava.util.Collection;importjava.util.concurrent.TimeUnit;/***@Auther:Java小强*@Date:2022/2/4-19:33*@Decsription:com.example.springboot*@Version:1.0*/@SpringBootTest(classes=Application.class)publicclassCuratorTest{@AutowiredprivateZkConfigurationzk;//同享旌旗灯号量,多个旌旗灯号量@TestpublicvoidtestInterProcessSemaphoreV22()throwsException{CuratorFrameworkclient=zk.curatorFramework();//创办1个旌旗灯号量,Curator以公道锁的体例停止完成finalInterProcessSemaphoreV2semaphore=newInterProcessSemaphoreV2(client,"/lock",3);newThread(newRunnable(){@Overridepublicvoidrun(){//获得锁对于象try{StringthreadName=Thread.currentThread().getName();//获得2个答应Collection<Lease>acquire=semaphore.acquire(2);System.out.println(threadName+"获得旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2*1000);semaphore.returnAll(acquire);System.out.println(threadName+"开释旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){//获得锁对于象try{StringthreadName=Thread.currentThread().getName();//获得1个答应Leaselease=semaphore.acquire();System.out.println(threadName+"获得旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2*1000);semaphore.returnLease(lease);System.out.println(threadName+"开释旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}}//同享旌旗灯号量@TestpublicvoidtestInterProcessSemaphoreV2()throwsException{CuratorFrameworkclient=zk.curatorFramework();//创办1个旌旗灯号量,Curator以公道锁的体例停止达成finalInterProcessSemaphoreV2semaphore=newInterProcessSemaphoreV2(client,"/lock",1);newThread(newRunnable(){@Overridepublicvoidrun(){//获得锁对于象try{StringthreadName=Thread.currentThread().getName();//获得1个答应Leaselease=semaphore.acquire();System.out.println(threadName+"获得旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2*1000);semaphore.returnLease(lease);System.out.println(threadName+"开释旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){//获得锁对于象try{StringthreadName=Thread.currentThread().getName();//获得1个答应Leaselease=semaphore.acquire();System.out.println(threadName+"获得旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2*1000);semaphore.returnLease(lease);System.out.println(threadName+"开释旌旗灯号量>>>>>>>>>>>>>>>>>>>>>");}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}}//多沉同享锁@TestpublicvoidtestInterProcessMultiLock()throwsException{CuratorFrameworkclient=zk.curatorFramework();//可沉进锁finalInterProcessLockinterProcessLock1=newInterProcessMutex(client,"/lock");//不行沉进锁finalInterProcessLockinterProcessLock2=newInterProcessSemaphoreMutex(client,"/lock");//创造多沉锁对于象finalInterProcessLocklock=newInterProcessMultiLock(Arrays.asList(interProcessLock1,interProcessLock2));newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();//获得参数聚合中的全部锁lock.acquire();//由于保存1个不行沉进锁,因此全部InterProcessMultiLock不行沉进System.out.println(threadName+"----->"+lock.acquire(2,TimeUnit.SECONDS));//interProcessLock1是可沉进锁,因此能够持续获得锁System.out.println(threadName+"----->"+interProcessLock1.acquire(2,TimeUnit.SECONDS));//interProcessLock2是不行沉进锁,因而获得锁朽败System.out.println(threadName+"----->"+interProcessLock2.acquire(2,TimeUnit.SECONDS));}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}}//分散式读写锁@TestpublicvoidtestReadWriteLock()throwsException{CuratorFrameworkclient=zk.curatorFramework();//创制同享可沉进读写锁finalInterProcessReadWriteLocklocl1=newInterProcessReadWriteLock(client,"/lock");finalInterProcessReadWriteLocklock2=newInterProcessReadWriteLock(client,"/lock");newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();locl1.writeLock().acquire();//获得锁对于象System.out.println(threadName+"获得写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);locl1.readLock().acquire();//获得读锁,锁落级System.out.println(threadName+"获得读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);locl1.readLock().release();System.out.println(threadName+"开释读锁<<<<<<<<<<<<<<<<<<<<<");locl1.writeLock().release();System.out.println(threadName+"开释写锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();lock2.writeLock().acquire();//获得锁对于象System.out.println(threadName+"获得写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);lock2.readLock().acquire();//获得读锁,锁落级System.out.println(threadName+"获得读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);lock2.readLock().release();System.out.println(threadName+"开释读锁<<<<<<<<<<<<<<<<<<<<<");lock2.writeLock().release();System.out.println(threadName+"开释写锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}}//分散式可沉进排它锁@TestpublicvoidtestInterProcessMutex()throwsException{CuratorFrameworkclient=zk.curatorFramework();//分散式可沉进排它锁finalInterProcessLocklock=newInterProcessMutex(client,"/lock");finalInterProcessLocklock2=newInterProcessMutex(client,"/lock");newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();lock.acquire();//获得锁对于象System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire();//尝试锁沉进System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);lock.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();lock.acquire();//获得锁对于象System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire();//尝试锁沉进System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1*1000);lock.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}//依序没有必定,然则统一个线程能够屡次获得,获得频频便必需开释频频,其余线程才干获得到锁}//分散式不行沉进排它锁@TestvoidtestInterProcessSemaphoreMutex()throwsException{CuratorFrameworkclient=zk.curatorFramework();//分散式不行沉进排它锁finalInterProcessLocklock=newInterProcessSemaphoreMutex(client,"/lock");finalInterProcessLocklock2=newInterProcessSemaphoreMutex(client,"/lock");newThread(newRunnable(){@Overridepublicvoidrun(){try{StringthreadName=Thread.currentThread().getName();lock.acquire();//获得锁对于象System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");//尝试锁沉进Thread.sleep(2*1000);lock.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){//获得锁对于象try{StringthreadName=Thread.currentThread().getName();lock2.acquire();System.out.println(threadName+"获得锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2*1000);lock2.release();System.out.println(threadName+"开释锁<<<<<<<<<<<<<<<<<<<<<");}catch(Exceptione){e.printStackTrace();}}}).start();while(true){}//按次没有必定,然则必需是获得后再开释其余线程才干获得到锁}}

END

推举您浏览更多相关于“ 锁分散式Curator排它锁读写锁旌旗灯号量 ”的作品

上海莫尔丽信息科技有限公司