博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Zookeeper系列四:Zookeeper实现分布式锁、Zookeeper实现配置中心
阅读量:6545 次
发布时间:2019-06-24

本文共 23232 字,大约阅读时间需要 77 分钟。

一、Zookeeper实现分布式锁

分布式锁主要用于在分布式环境中保证数据的一致性。

包括跨进程、跨机器、跨网络导致共享资源不一致的问题。

1. 分布式锁的实现思路

说明:

这种实现会有一个缺点,即当有很多进程在等待锁的时候,在释放锁的时候会有很多进程就过来争夺锁,这种现象称为 “惊群效应”

2. 分布式锁优化后的实现思路

 

3. Zookeeper分布式锁的代码实现

准备工作:

1)安装Zookeeper,具体参考我前面的我文章

2)新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖

com.101tec
zkclient
0.10

3.1 Zookeeper分布式锁的核心代码实现

实现逻辑参考“2. 分布式锁优化后的实现思路”中的流程图

package com.study.demo.lock;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.SerializableSerializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/*** * @Description: Zookeeper分布式锁的核心代码实现* @author leeSmall* @date 2018年9月4日**/public class DistributedLock implements Lock {    private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);    private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";    private static final String LOCK_PATH = "/LOCK";    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());    private CountDownLatch cdl;    private String beforePath;// 当前请求的节点前一个节点    private String currentPath;// 当前请求的节点    // 判断有没有LOCK目录,没有则创建    public DistributedLock() {        if (!this.client.exists(LOCK_PATH)) {            this.client.createPersistent(LOCK_PATH);        }    }    public void lock() {        //尝试去获取分布式锁失败        if (!tryLock()) {            //对次小节点进行监听            waitForLock();            lock();        }         else {            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");        }    }        public boolean tryLock() {        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath        if (currentPath == null || currentPath.length() <= 0) {            // 创建一个临时顺序节点            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");            System.out.println("---------------------------->" + currentPath);        }        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400        List
childrens = this.client.getChildren(LOCK_PATH); //由小到大排序所有子节点 Collections.sort(childrens); //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁 if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) { return true; } //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath else { int wz = Collections.binarySearch(childrens, currentPath.substring(6)); beforePath = LOCK_PATH + '/' + childrens.get(wz - 1); } return false; } //等待锁,对次小节点进行监听 private void waitForLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------"); if (cdl != null) { cdl.countDown(); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher this.client.subscribeDataChanges(beforePath, listener); if (this.client.exists(beforePath)) { cdl = new CountDownLatch(1); try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } this.client.unsubscribeDataChanges(beforePath, listener); } //完成业务逻辑以后释放锁 public void unlock() { // 删除当前临时节点 client.delete(currentPath); } // ========================================== public void lockInterruptibly() throws InterruptedException { } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public Condition newCondition() { return null; }}

 3.2 在业务里面使用分布式锁

package com.study.demo.lock;import java.util.concurrent.CountDownLatch;import java.util.concurrent.locks.Lock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/*** * @Description: 在业务里面使用分布式锁* @author leeSmall* @date 2018年9月4日**/public class OrderServiceImpl implements Runnable {    private static OrderCodeGenerator ong = new OrderCodeGenerator();    private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);    // 同时并发的线程数    private static final int NUM = 10;    // 按照线程数初始化倒计数器,倒计数器    private static CountDownLatch cdl = new CountDownLatch(NUM);    private Lock lock = new DistributedLock();    // 创建订单接口    public void createOrder() {        String orderCode = null;        //准备获取锁        lock.lock();        try {            // 获取订单编号            orderCode = ong.getOrderCode();        } catch (Exception e) {            // TODO: handle exception        } finally {            //完成业务逻辑以后释放锁            lock.unlock();        }        // ……业务代码        logger.info("insert into DB使用id:=======================>" + orderCode);    }        public void run() {        try {            // 等待其他线程初始化            cdl.await();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        // 创建订单        createOrder();    }    public static void main(String[] args) {        for (int i = 1; i <= NUM; i++) {            // 按照线程数迭代实例化线程            new Thread(new OrderServiceImpl()).start();            // 创建一个线程,倒计数器减1            cdl.countDown();        }    }}

工具类:

package com.study.demo.lock;import java.text.SimpleDateFormat;import java.util.Date;public class OrderCodeGenerator {    // 自增长序列    private static int i = 0;    // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号    public String getOrderCode() {        Date now = new Date();        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");        return sdf.format(now) + ++i;    }}

二、Zookeeper实现配置中心

 1. 首先在zookeeper里面创建一个Jdbc的节点,在下面分别创建4个子节点/Jdbc/url、/Jdbc/uname、/Jdbc/password、/Jdbc/driver

create /Jdbc ''create /Jdbc/url jdbc.mysql://192.168.152.1/dbspread create /Jdbc/uname rootcreate /Jdbc/password 123456create /Jdbc/driver com.mysql.jdbc.Driver

注意:/Jdbc/url这个节点的值是错的 

 

 

 2. 新建一个zkdemo的maven的web项目

项目结构如下:

2.1 在pom.xml文件里面引入下面依赖:

4.0.0
com.study.demo
zkdemo
war
0.0.1-SNAPSHOT
zkdemo Maven Webapp
http://maven.apache.org
4.3.8.RELEASE
junit
junit
3.8.1
test
org.apache.zookeeper
zookeeper
3.4.10
com.101tec
zkclient
0.10
org.apache.curator
curator-framework
4.0.0
org.apache.curator
curator-recipes
4.0.0
javax.servlet
javax.servlet-api
3.1.0
org.apache.tomcat
tomcat-catalina
7.0.39
org.springframework
spring-core
${spring.version}
org.springframework
spring-beans
${spring.version}
org.springframework
spring-jdbc
${spring.version}
org.springframework
spring-context
${spring.version}
org.springframework
spring-web
${spring.version}
org.springframework
spring-webmvc
${spring.version}
com.zaxxer
HikariCP
2.7.1
mysql
mysql-connector-java
5.1.41
org.slf4j
slf4j-api
1.7.25
org.slf4j
jcl-over-slf4j
1.7.25
log4j
log4j
1.2.17
com.fasterxml.jackson.core
jackson-core
2.9.1
com.fasterxml.jackson.core
jackson-databind
2.9.1
javax.servlet
jstl
1.2
zkdemo

2.2 新建一个zookeeper配置中心类,从zookeeper动态获取数据库配置

package com.study.demo.config;import java.util.List;import java.util.Properties;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.TreeCache;import org.apache.curator.framework.recipes.cache.TreeCacheEvent;import org.apache.curator.framework.recipes.cache.TreeCacheListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.springframework.web.context.ContextLoader;import org.springframework.web.context.WebApplicationContext;import com.zaxxer.hikari.HikariDataSource;/*** * @Description: zookeeper配置中心类,从zookeeper动态获取数据库配置* @author leeSmall* @date 2018年9月10日**/public class ZookeeperConfigurerCentral {    //curator客户端    private CuratorFramework zkClient;        //curator事件监听    private TreeCache treeCache;    //zookeeper的ip和端口    private String zkServers;        //zookeeper上的/Jdbc路径    private String zkPath;        //超时设置    private int sessionTimeout;        //读取zookeeper上的数据库配置文件放到这里    private Properties props;    public ZookeeperConfigurerCentral(String zkServers, String zkPath, int sessionTimeout) {        this.zkServers = zkServers;        this.zkPath = zkPath;        this.sessionTimeout = sessionTimeout;        this.props = new Properties();        //初始化curator客户端        initZkClient();        //从zookeeper的Jdbc节点下获取数据库配置存入props        getConfigData();        //对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props        addZkListener();    }    //初始化curator客户端    private void initZkClient() {        zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();        zkClient.start();    }    //从zookeeper的Jdbc节点下获取数据库配置存入props    private void getConfigData() {        try {            List
list = zkClient.getChildren().forPath(zkPath); for (String key : list) { String value = new String(zkClient.getData().forPath(zkPath + "/" + key)); if (value != null && value.length() > 0) { props.put(key, value); } } } catch (Exception e) { e.printStackTrace(); } } //对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props private void addZkListener() { TreeCacheListener listener = new TreeCacheListener() { public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) { getConfigData(); WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext(); HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource"); System.out.println("================"+props.getProperty("url")); dataSource.setJdbcUrl(props.getProperty("url")); dataSource.setUsername(props.getProperty("uname")); dataSource.setPassword(props.getProperty("password ")); dataSource.setDriverClassName(props.getProperty("driver ")); } } }; treeCache = new TreeCache(zkClient, zkPath); try { treeCache.start(); treeCache.getListenable().addListener(listener); } catch (Exception e) { e.printStackTrace(); } } public Properties getProps() { return props; } public void setZkServers(String zkServers) { this.zkServers = zkServers; } public void setZkPath(String zkPath) { this.zkPath = zkPath; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; }}

2.3 新建一个加载props里面的数据库配置的类

package com.study.demo.config;import java.util.Properties;import org.springframework.beans.BeansException;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;/*** * @Description: 加载props里面的数据库配置,这个类等价于以前在xml文件里面的配置:* 
* @author leeSmall* @date 2018年9月10日**/public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer { private ZookeeperConfigurerCentral zkConfigurerCentral; @Override protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException { System.out.println(zkConfigurerCentral.getProps()); super.processProperties(beanFactoryToProcess, zkConfigurerCentral.getProps()); } public void setzkConfigurerCentral(ZookeeperConfigurerCentral zkConfigurerCentral) { this.zkConfigurerCentral = zkConfigurerCentral; }}

2.4 在/zkdemo/src/main/webapp/WEB-INF/config/applicationContext.xml配置2.2和2.3新建的两个主类

2.5 在com.study.demo.controller新建测试类

测试类1:

package com.study.demo.controller;import java.io.Serializable;public class OrderModel implements Serializable {    private static final long serialVersionUID = 1L;    private int orderId;    private int brandId;    public int getOrderId() {        return orderId;    }    public void setOrderId(int orderId) {        this.orderId = orderId;    }    public int getBrandId() {        return brandId;    }    public void setBrandId(int brandId) {        this.brandId = brandId;    }}
View Code

测试类2:

package com.study.demo.controller;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.jdbc.core.RowMapper;import org.springframework.stereotype.Repository;@Repositorypublic class OrderDao {    @Autowired    private JdbcTemplate jdbcTemplate;    public OrderModel findById() {        String sql = "select * from tbl_order where order_id = 1";        return jdbcTemplate.queryForObject(sql, new RowMapper
() { public OrderModel mapRow(ResultSet rs, int rowNum) throws SQLException { OrderModel payment = new OrderModel(); payment.setOrderId(rs.getInt("order_id")); payment.setBrandId(rs.getInt("brand_id")); return payment; } }); }}
View Code

测试类3:

package com.study.demo.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class OrderService {    @Autowired    private OrderDao dao;    public OrderModel getById() {        return dao.findById();    }}
View Code

测试类4:

package com.study.demo.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;@Controllerpublic class OrderController {    @Autowired    private OrderService service;    @ResponseBody    @RequestMapping(value = "/test", method = RequestMethod.GET)    public String test() {        OrderModel p = service.getById();        return p.getBrandId() + "";    }}
View Code

2.6 其他附加配置和数据库脚本

/zkdemo/src/main/webapp/WEB-INF/config/log4j.properties

log4j.rootLogger=INFO,consolelog4j.logger.org.apache.zookeeper=DEBUGlog4j.logger.org.apache.curator=DEBUGlog4j.logger.java.lang.Exception=INFOlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{MM-dd HH:mm:ss.SSS} [%c:%p] %m%n
View Code

/zkdemo/src/main/webapp/WEB-INF/config/spring-mvc.xml

View Code

/zkdemo/src/main/webapp/WEB-INF/web.xml

zkdemo
Zookeeper Demo Application
webAppRootKey
zkdemo.root
log4jConfigLocation
/WEB-INF/config/log4j.properties
log4jRefreshInterval
60000
contextConfigLocation
/WEB-INF/config/applicationContext.xml
spring
org.springframework.web.servlet.DispatcherServlet
contextConfigLocation
/WEB-INF/config/spring-mvc.xml
1
spring
/
org.springframework.web.util.Log4jConfigListener
org.springframework.web.context.ContextLoaderListener
characterEncodingFilter
org.springframework.web.filter.CharacterEncodingFilter
encoding
UTF-8
forceEncoding
true
characterEncodingFilter
/*
20
403
/403.jsp
404
/404.jsp
500
/500.jsp
index.htm
index.jsp
View Code

数据库脚本:

CREATE TABLE `tbl_order` (  `order_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单id',  `brand_id` int(11) DEFAULT NULL COMMENT '品牌id',  PRIMARY KEY (`order_id`)) ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='订单表';INSERT INTO tbl_order VALUES('1','1')

 2.7 启动项目在浏览器输入地址http://localhost:8080/zkdemo/test查看效果

 

可以看到报错了,这是因为我们之前设置了错误的url

create /Jdbc/url jdbc.mysql://192.168.152.1/dbspread

修改url为正确的 

set /Jdbc/url jdbc:mysql://192.168.152.1:3306/dbspread

 

 再次输入地址访问查看效果:

http://localhost:8080/zkdemo/test

可以看到在没有重启服务的情况下,可以正常访问获取到值了,这是因为zookeeper的数据库的配置动态刷新到服务了!

 

转载于:https://www.cnblogs.com/leeSmall/p/9614601.html

你可能感兴趣的文章
网站爬取工具
查看>>
amazeui学习笔记--css(HTML元素5)--表格Table
查看>>
JavaScript&jQuery.强制类型转换
查看>>
Workgroup下开启Win7的Admin share
查看>>
搜索营销推广学习资料
查看>>
HDOJ 1698
查看>>
linux里安装redis以及redis的安全设置
查看>>
Mysql Procudure
查看>>
作业第六次
查看>>
构建之法 第一章 概论
查看>>
Hadoop编译安装
查看>>
汇编字符串拷贝
查看>>
Lambda的前世今生
查看>>
黑马程序员-张老师基础加强3-内省
查看>>
TCP/IP模型简介和/etc/hosts文件说明
查看>>
UIButton常用属性
查看>>
主键自增归0
查看>>
杨辉三角
查看>>
mysql之 [ERROR] InnoDB: Unable to lock ./ibdata1, error: 11
查看>>
如何批量修改文件后缀的方法
查看>>