1.第一种实现思路,多个线程去zk下创建一个临时结点,如果一个线程创建结点成功,就代表获得锁了,如果创建失败,就进入等待状态,监听临时结点,直到释放锁,就是临时节点消失,再重新执行获取锁的操作。此种方式会有一个问题产生,就是如果在并发比较大的情况下,一个临时结点的消失,会造成很多线程同时会试图创建临时结点,这种方式会影响zk的稳定性,这个效应称为羊群效应。
package com.tech.demo.zookeeper; import org.apache.zookeeper.*; import
org.apache.zookeeper.data.Stat; import java.io.IOException; import
java.util.concurrent.*; import java.util.concurrent.locks.Condition; import
java.util.concurrent.locks.Lock; /** * @author xxx_xx * @date 2018/8/13 */
public class DistedLock extends Thread implements Lock, Watcher { private
ZooKeeper zk; private String root = "/lock"; private String childPath =
"/lock/child"; private CountDownLatch latch = new CountDownLatch(1); /** *
初始化如果没有根结点,创建根结点 */ public DistedLock(String threadName) { super(threadName);
try { this.zk = new ZooKeeper("localhost:2181", 2000, this); Stat stat =
zk.exists(root, true); if (stat == null) { zk.create(root, "root".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e)
{ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace();
} catch (KeeperException e) { e.printStackTrace(); } } @Override public void
lock() { if (tryLock()) { System.out.println(Thread.currentThread().getName() +
"得到锁"); } else { waitLock(); } } public void waitLock() { try { //注册监听
zk.exists(childPath, true); //等待 latch.await();
System.out.println(Thread.currentThread().getName() + "线程重新抢锁"); lock();
Thread.sleep(1000); } catch (KeeperException e) { e.printStackTrace(); } catch
(InterruptedException e) { e.printStackTrace(); } } @Override public void
lockInterruptibly() throws InterruptedException { } @Override public boolean
tryLock() { try { Stat stat = zk.exists(childPath, true); if (stat != null) {
return false; } else { String val = zk.create(childPath, "child1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(Thread.currentThread().getName() + val + "节点创建"); } return
true; } catch (KeeperException e) { e.printStackTrace(); } catch
(InterruptedException e) { e.printStackTrace(); } return false; } @Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false; } @Override public void unlock() { try { zk.delete(childPath,
-1); } catch (InterruptedException e) { e.printStackTrace(); } catch
(KeeperException e) { e.printStackTrace(); } } @Override public Condition
newCondition() { return null; } @Override public void process(WatchedEvent
watchedEvent) { if (watchedEvent != null) { this.latch.countDown(); } } public
static void main(String[] args) { new DistedLock("线程").start(); new
DistedLock("线程1").start(); new DistedLock("线程2").start(); new
DistedLock("线程3").start(); } @Override public void run() { this.lock(); try {
Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
this.unlock(); } }
2.第二种实现思路,每个线程在zk下创建一个临时序列结点,如果当前结点是临时序列结点中最小的一个就代表自己得到了锁,如果不是

就监听比自己小的一个结点,举例说明,如果三个线程都创建了三个临时序列结点,序号分别是0,1,2,线程1发现自己的结点是最小的,则认为自己得到了锁,返回true创建成功,线程2和线程3发现自己的结点不是最小的,于是线程2监听序列号为0的结点,线程3监听序列号为1的结点,这样每个线程只监听一个节点就行了。
package com.tech.demo.zookeeper; import org.apache.zookeeper.*; import
org.apache.zookeeper.data.Stat; import java.io.IOException; import
java.util.ArrayList; import java.util.Collections; import java.util.List;
import java.util.concurrent.CountDownLatch; import
java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; /** * @author xxx_xx * @date 2018/8/19
*/ public class TestLock implements Lock, Watcher { private ZooKeeper zk =
null; private String ROOT_LOCK = "/lo"; private String lockName; private String
WAIT_LOCK; private String CURRENT_LOCK; private CountDownLatch countDownLatch;
private int sessionTimeout = 30000; private List<Exception> exceptionList = new
ArrayList<Exception>(); /** * 配置分布式锁 * * @param config 连接的url * @param lockName
竞争资源 */ public TestLock(String config, String lockName) { this.lockName =
lockName; try { // 连接zookeeper zk = new ZooKeeper(config, sessionTimeout,
this); Stat stat = zk.exists(ROOT_LOCK, false); if (stat == null) { //
如果根节点不存在,则创建根节点 zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); }
catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException
e) { e.printStackTrace(); } } // 节点监视器 public void process(WatchedEvent event)
{ if (this.countDownLatch != null) { this.countDownLatch.countDown(); } }
public void lock() { if (exceptionList.size() > 0) { throw new
LockException(exceptionList.get(0)); } try { if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
return; } else { // 等待锁 waitForLock(WAIT_LOCK, sessionTimeout); } } catch
(InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) {
e.printStackTrace(); } } public boolean tryLock() { try { String splitStr =
"_lock_"; if (lockName.contains(splitStr)) { throw new LockException("锁名有误"); }
// 创建临时有序节点 CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(CURRENT_LOCK + " 已经创建"); // 取所有子节点 List<String> subNodes =
zk.getChildren(ROOT_LOCK, false); // 取出所有lockName的锁 List<String> lockObjects =
new ArrayList<String>(); for (String node : subNodes) { String _node =
node.split(splitStr)[0]; if (_node.equals(lockName)) { lockObjects.add(node); }
} Collections.sort(lockObjects);
System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
// 若当前节点为最小节点,则获取锁成功 if (CURRENT_LOCK.equals(ROOT_LOCK + "/" +
lockObjects.get(0))) { return true; } // 若不是最小节点,则找到自己的前一个节点 String prevNode =
CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1); WAIT_LOCK =
lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); } catch
(InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) {
e.printStackTrace(); } return false; } public boolean tryLock(long timeout,
TimeUnit unit) { try { if (this.tryLock()) { return true; } return
waitForLock(WAIT_LOCK, timeout); } catch (Exception e) { e.printStackTrace(); }
return false; } // 等待锁 private boolean waitForLock(String prev, long waitTime)
throws KeeperException, InterruptedException { // 监听比自己小的结点 Stat stat =
zk.exists(ROOT_LOCK + "/" + prev, true); if (stat != null) {
System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/"
+ prev); this.countDownLatch = new CountDownLatch(1); //
计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); this.countDownLatch
= null; System.out.println(Thread.currentThread().getName() + " 等到了锁"); }
return true; } public void unlock() { try { System.out.println("释放锁 " +
CURRENT_LOCK); zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); }
catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException
e) { e.printStackTrace(); } } public Condition newCondition() { return null; }
public void lockInterruptibly() throws InterruptedException { this.lock(); }
public class LockException extends RuntimeException { private static final long
serialVersionUID = 1L; public LockException(String e) { super(e); } public
LockException(Exception e) { super(e); } } }
 

测试代码
package com.tech.demo.zookeeper; /** * @author xxx_xx * @date 2018/8/19 */
public class Test { static int n = 500; public static void main(String[] args)
{ Runnable runnable = new Runnable() { public void run() { TestLock lock =
null; try { lock = new TestLock("localhost:2181", "test1"); lock.lock();
System.out.println(Thread.currentThread().getName() + "正在运行"); } finally { if
(lock != null) { lock.unlock(); } } } }; for (int i = 0; i < 3; i++) { Thread t
= new Thread(runnable); t.start(); } } }
 

 

 

 

 

 

 

技术
©2019-2020 Toolsou All rights reserved,
华为认证HCIA-AI人工智能NOI2019 游记消息质量平台系列文章|全链路排查篇过拟合和欠拟合的形象解释Unity 场景异步加载(加载界面的实现)Faster RCNN系列算法原理讲解(笔记)纽约年轻人计划“重新占领华尔街”:维护散户利益用C++跟你聊聊“原型模式” (复制/拷贝构造函数)初识python之技巧总结篇中级JAVA程序员应该掌握的数据结构知识