一、简介

分布式同步是Zookeeper的功能之一,但Zookeeper的分布式同步(分布式锁)功能并非以直接的接口呈现的。开发者需要额外地基于Zookeeper的节点(node)的分布式同步特性来实现分布式锁的接口。

本文将提供基于基于ZooKeeper的分布式锁的C++接口及实现,实现中用到了部分C++11特性。另外,本实现使用的是经过封装的ZooKeeper接口,本文只列出这些ZooKeeper封装接口,而不提供ZooKeeper封装接口的实现代码。读者可以依理使用ZooKeeper原生接口,或自己封装ZooKeeper接口。

二、基于ZooKeeper分布式锁的实现流程

1、在ZooKeeper指定节点下创建临时顺序节点node_n;

2、获取指定节点下所有子节点(children node);

3、对子节点按节点自增序号从小到大排序;

4、判断第1步创建的节点是否为排序子节点的第一个子节点,如是,则获取锁,如不是,则监听比第1步创建的节点小的那个节点的删除事件;

5、若监听事件生效,则回到第2步重新进行判断,直到获取到锁。

三、已封装的ZooKeeper接口声明
int32_t Init(const std::string &hosts, const std::string &root_path = "/",
const clientid_t *client_id = NULL);//ZooKeeper始化接口 int32_t
Connect(std::shared_ptr<WatcherFunType> global_watcher_fun, int32_t
recv_timeout_ms, uint32_t conn_timeout_ms = 30000);//连接ZooKeeper服务 int32_t
Exists(const std::string &path, Stat *stat, std::shared_ptr<WatcherFunType>
watcher_fun);//判断ZooKeeper节点是否存在,并设置监控函数及就是监控 int32_t Create(const std::string
&path, const std::string &value, std::string *p_real_path = NULL, const
ACL_vector *acl = &ZOO_OPEN_ACL_UNSAFE, int flags = 0, bool
ephemeral_exist_skip = false);//创建节点 int32_t Delete(const std::string &path,
int version);//删除节点
以上是封装的ZooKeeper接口,本文将不列出封装的实现,读者可以自己尝试封装,或直接使用ZooKeeper原生接口。

四、分布式锁实现的接口

分布式锁接口包括:

1、InitLock,指定ZooKeeper服务地址(IP及端口字符串,之间以“:”隔开)及节点名,相同节点名的ZooKeeper客户端间互斥;

2、Lock,请求互斥锁,如果其他ZooKeeper客户端已获取同一节点名的互斥锁,则线程等待互斥锁,直到获取到互斥锁,或超时,这个接口有两个版本,一个可以设置等待超时时间,另一个不设置等待超时时间,只到等到互斥锁才会返回;

3、UnLock,释放互斥锁。

接口代码如下:
#ifndef _ZOOKEEPER_LOCK_H_ #define _ZOOKEEPER_LOCK_H_ #include
"cppzookeeper.h" enum en_LockRet { LOCK_RET_SUCC = 0, LOCK_RET_TIMEOUT = 1,
LOCK_RET_ERR = 2 }; class ZookeeperLock { public: ZookeeperLock();
~ZookeeperLock(); bool InitLock(const std::string& host, const std::string&
lock_name); en_LockRet Lock(unsigned int millisec); en_LockRet Lock(void); void
UnLock(void); private: int TryLock(void); int WaitForLock(const unsigned int
millisec); bool CreateNode(void); bool DeleteNode(void); bool
ZookeeperWatcher(zookeeper::ZookeeperManager &zk_mamager, int type, int state,
const char *path); unsigned int GetTickCount(void); std::string m_strLockName;
zookeeper::ZookeeperManager m_zkManager; std::string m_selfNode; std::string
m_preNode; std::mutex m_mtx; std::condition_variable m_cv; //bool m_bnotify; };
#endif
五、分布式锁实现

实现及上面接口头文件的cpp文件部分:
#include "zookeeperlock.h" #include <algorithm> #include <time.h> #include
<chrono> #include <mutex> #include <condition_variable> #define ZOOKEEPERLOCK
"/zookeeperlock" #define LOCKCHILDNODE "lock_" ZookeeperLock::ZookeeperLock() {
//m_bnotify = false; } ZookeeperLock::~ZookeeperLock() { } bool
ZookeeperLock::InitLock(const std::string& host, const std::string& lock_name)
{ if (host.empty() || lock_name.empty()) return false; if ('/' == lock_name[0])
return false; int32_t ret = m_zkManager.Init(host, ZOOKEEPERLOCK); if (ZOK !=
ret) { return false; } zookeeper::WatcherFunType fnc_zk_watcher =
std::bind(&ZookeeperLock::ZookeeperWatcher, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); ret =
m_zkManager.Connect(std::make_shared<zookeeper::WatcherFunType>(fnc_zk_watcher),
2000, 3000); if (ZOK != ret) { return false; } if (ZOK !=
m_zkManager.Exists(ZOOKEEPERLOCK)) { ret = m_zkManager.Create(ZOOKEEPERLOCK,
""); if (ZOK != ret) { return false; } } if (ZOK !=
m_zkManager.Exists(lock_name)) { ret = m_zkManager.Create(lock_name, ""); if
(ZOK != ret) { return false; } } m_strLockName = lock_name; return true; }
en_LockRet ZookeeperLock::Lock(unsigned int millisec) { if
(m_strLockName.empty()) return LOCK_RET_ERR; if (!CreateNode()) { return
LOCK_RET_ERR; } unsigned int et = GetTickCount() + millisec; unsigned int ct =
0; bool must_do = false; do { must_do = false; int ret = TryLock(); if (0 ==
ret) { return LOCK_RET_SUCC; } else if (1 == ret) { int ret1 = WaitForLock(ct
>0 ? (et > ct ? (et - ct) : 1) : millisec); if (0 == ret1) { must_do = true; }
else if (1 == ret1) { DeleteNode(); return LOCK_RET_TIMEOUT; } else {
DeleteNode(); return LOCK_RET_ERR; } } else { DeleteNode(); return
LOCK_RET_ERR; } ct = GetTickCount(); } while (ct < et || must_do);
DeleteNode(); return LOCK_RET_TIMEOUT; } en_LockRet ZookeeperLock::Lock(void) {
en_LockRet ret = LOCK_RET_SUCC; while (LOCK_RET_TIMEOUT == (ret = Lock(2000)))
{ ; } return ret; } void ZookeeperLock::UnLock(void) { bool ret = DeleteNode();
return; } int ZookeeperLock::TryLock(void) { std::string parent_path =
std::string(ZOOKEEPERLOCK) + std::string("/") + m_strLockName;
zookeeper::ScopedStringVector children; std::vector<std::string> vchildren; int
ret = m_zkManager.GetChildren(m_strLockName, children); if (ZOK != ret) {
return 2; } for (int32_t i = 0; i < children.GetSize(); ++i) {
vchildren.push_back(std::string(children.GetData(i))); }
std::sort(vchildren.begin(), vchildren.end()); if (0 == vchildren.size()) {
return 2; } std::string firstnode = parent_path + "/" + vchildren[0]; ret =
m_selfNode.compare(firstnode); if (0 == ret) { return 0; } std::string
self_path_child; std::size_t pos = m_selfNode.rfind('/'); if (std::string::npos
!= pos) { self_path_child = m_selfNode.substr(pos + 1); } if
(self_path_child.empty()) { return 2; }
std::vector<std::string>::const_iterator itor = std::find(vchildren.begin(),
vchildren.end(), self_path_child); if (vchildren.end() == itor ||
vchildren.begin() == itor) { return 2; } m_preNode = *(itor - 1); return 1; }
int ZookeeperLock::WaitForLock(const unsigned int millisec) { std::string
wait_path = m_strLockName + "/" + m_preNode; Stat stat;
zookeeper::WatcherFunType fnc_zk_watcher =
std::bind(&ZookeeperLock::ZookeeperWatcher, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); int32_t
ret = m_zkManager.Exists(wait_path, &stat,
std::make_shared<zookeeper::WatcherFunType>(fnc_zk_watcher)); if (ZOK != ret &&
ZNONODE != ret) { return 2; } if (ZNONODE == ret) { return 0; }
std::unique_lock<std::mutex> lck(m_mtx); //m_bnotify = false;
m_cv.wait_for(lck, std::chrono::milliseconds(millisec)); if
(std::cv_status::timeout == m_cv.wait_for(lck,
std::chrono::milliseconds(millisec))) { return 1; } else { return 0; } /* if
(m_bnotify) { return 0; } else { return 1; }*/ } bool
ZookeeperLock::CreateNode(void) { std::string child_node = m_strLockName + "/"
+ LOCKCHILDNODE; std::string self_path; self_path.resize(128); int32_t ret =
m_zkManager.Create(child_node, "", &self_path, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL | ZOO_SEQUENCE); if (ZOK != ret) { return false; } m_selfNode =
self_path.c_str(); return true; } bool ZookeeperLock::DeleteNode(void) {
int32_t ret = m_zkManager.Delete(m_selfNode, -1); m_selfNode = ""; m_preNode =
""; if (ZOK != ret) { return false; } return true; } bool
ZookeeperLock::ZookeeperWatcher(zookeeper::ZookeeperManager &zk_mamager, int
type, int state, const char *path) { if (ZOO_DELETED_EVENT == type) {
std::string strpath = path; std::string wait_path = std::string(ZOOKEEPERLOCK)
+ std::string("/") + m_strLockName + std::string("/") + m_preNode; if
(wait_path == strpath) { std::unique_lock<std::mutex> lck(m_mtx); //m_bnotify =
true; m_cv.notify_one(); } } return false; } unsigned int
ZookeeperLock::GetTickCount(void) { struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts); return (ts.tv_sec * 1000 + ts.tv_nsec /
1000000); }

技术
©2019-2020 Toolsou All rights reserved,
华为在线编程练习(试题加答案)C++标准库Jsp+Ajax+Servlet+Mysql实现增删改查(一)免费下载文档:给你介绍几个实用的免费下载网址微软翻译、Office现开始支持因纽特语Unity 场景异步加载(加载界面的实现)随机森林篇 R语言实现Java基础知识之笔记总结分享(超详细)PYTHON入门期末复习汇总vue组件页面高度根据屏幕大小自适应