文档章节

[奇思异想]使用Zookeeper管理数据库连接串

o
 osc_w9s1w4o0
发布于 2019/04/08 08:10
字数 1032
阅读 16
收藏 0

精选30+云产品,助力企业轻松上云!>>>

背景

  有一套特定规格的应用(程序+数据库),当有业务需求时,就需要多部署应用,并且所有的应用都使用一个共同的后台来管理。应用新增后,如何通知后台更新连接串成了一个关键的问题。于是就产生了使用ZooKeeper管理数据库连接串的奇思异想。具体方案如下:

  

 

 

  1. 运维负责搭建数据库,并执行初始化脚本,然后把对应的数据库配置刷入ZooKeeper;

  2. 运维完成App(1...N)的部署,App(1...N)从ZooKeeper读取对应的数据库配置;

  3. 后台监听ZooKeeper,更新数据库配置到后台应用内存。

 

环境准备

  1. 安装Zookeeper

  docker pull zookeeper:3.4.13

  

  

  docker run --name zookeeper -d -p 2181:2181 zookeeper:3.4.13

  

 

  2. 安装Mysql

  docker pull mysql:5.7

  

 

  docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:5.7
  docker run --name mysql2 -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 -d mysql:5.7
  docker run --name mysql3 -e MYSQL_ROOT_PASSWORD=root -p 3308:3306 -d mysql:5.7

  

 

  3. 初始化数据库

CREATE DATABASE test;
USE test;
CREATE TABLE `table` (
  `id` int(11) NOT NULL,
  `name` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
);

   

 

  分别在各个数据库插入测试数据

  mysql:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A1');
INSERT INTO `table` (id, name) VALUES (2, 'B1');
INSERT INTO `table` (id, name) VALUES (3, 'C1');

 

  mysql2:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A2');
INSERT INTO `table` (id, name) VALUES (2, 'B2');
INSERT INTO `table` (id, name) VALUES (3, 'C2');

 

  mysql3:

USE test;
INSERT INTO `table` (id, name) VALUES (1, 'A3');
INSERT INTO `table` (id, name) VALUES (2, 'B3');
INSERT INTO `table` (id, name) VALUES (3, 'C3');

 

  4. 基于数据库生成POCO

  Install-Package MySql.Data.EntityFrameworkCore -Version 8.0.13

  Scaffold-DbContext "server=127.0.0.1;port=3306;user=root;password=123456;database=test" MySql.Data.EntityFrameworkCore -OutputDir DataAccess -f

  

  

 

  5. 引用ZooKeeper相关组件

  Install-Package ZooKeeperNetEx -Version 3.4.12.1

  

核心代码

  1. ZookeeperOption:从appsettings中读取ZooKeeper相关配置

public class ZookeeperOption
    {
        public ZookeeperOption(IConfiguration config)
        {
            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            var section = config.GetSection("zookeeper");
            section.Bind(this);
        }

        public string ConnectionString { get; set; }

        public int Timeout { get; set; }
    }

 

  2. ZookeeperServiceCollectionExtensions:注册ZooKeeper服务

public static class ZookeeperServiceCollectionExtensions
    {
        public static IServiceCollection AddZookeeper(this IServiceCollection services, IConfiguration config)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            services.AddOptions();

            var option = new ZookeeperOption(config);

            var zookeeper = new org.apache.zookeeper.ZooKeeper(option.ConnectionString, option.Timeout * 1000, new DefaultWatcher());

            services.Add(ServiceDescriptor.Singleton(zookeeper));
            return services;
        }
    }

    public class DefaultWatcher : Watcher
    {
        public override Task process(WatchedEvent @event)
        {
            return Task.CompletedTask;
        }
    }

 

  3. ZookeeperHandler:ZooKeeper初始化及目录变化处理类,并把数据库连接信息写入程序内存

public interface IZookeeperHandler
    {
        Task InitAsync();

        Task RefreshAsync();
    }

    public class ZookeeperHandler: IZookeeperHandler
    {
        private readonly org.apache.zookeeper.ZooKeeper _zooKeeper;
        private readonly IMemoryCache _cache;

        public ZookeeperHandler(org.apache.zookeeper.ZooKeeper zooKeeper, IMemoryCache cache)
        {
            _zooKeeper = zooKeeper;
            _cache = cache;
        }

        public async Task InitAsync()
        {
            await RefreshAsync();
        }

        public async Task RefreshAsync()
        {
            var connDic = new Dictionary<string, string>();

            var isExisted = await _zooKeeper.existsAsync("/connections");
            if (isExisted == null)
            {
                await _zooKeeper.createAsync("/connections", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

            var connResult = await _zooKeeper.getChildrenAsync("/connections", new ConnectionWatcher(this));

            foreach (var conn in connResult.Children)
            {
                var connData = await _zooKeeper.getDataAsync($"/connections/{conn}/value");
                var connStr = Encoding.UTF8.GetString(connData.Data);
                connDic[conn] = connStr;
            }

            _cache.Set("connections", connDic);
        }
    }

 

  4. ConnectionWatcher:监听者,内容变化时调用ZookeeperHandler的RefreshAsync()方法,其中,变化只通知一次,因此需要再次建立监听

public class ConnectionWatcher : Watcher
    {
        private readonly IZookeeperHandler _zookeeperService;

        public ConnectionWatcher(IZookeeperHandler zookeeperService)
        {
            _zookeeperService = zookeeperService;
        }

        public override async Task process(WatchedEvent @event)
        {
            var type = @event.get_Type();

            if (type != Event.EventType.None)
            {
                await _zookeeperService.RefreshAsync();
            }
        }
    }

 

  5. ZookeeperApplicationBuilderExtensions:初始化

public static class ZookeeperApplicationBuilderExtensions
    {
        public static IApplicationBuilder UseZookeeper(this IApplicationBuilder app)
        {
            var service = app.ApplicationServices.GetRequiredService<IZookeeperHandler>();
            service.InitAsync().Wait();
            return app;
        }
    }

 

  6. ContextProvider:根据Id从内存中读取对应的数据库连接串,并提供DbContext实例

public interface IContextProvider
    {
        TestContext GetContext(string id);
    }

    public class ContextProvider : IContextProvider
    {
        private readonly IMemoryCache _cache;

        public ContextProvider(IMemoryCache cache)
        {
            _cache = cache;
        }

        public TestContext GetContext(string id)
        {
            var dic = _cache.Get<Dictionary<string, string>>("connections");
            var connectionStr = dic[id];

            var optionsBuilder = new DbContextOptionsBuilder<TestContext>();
            optionsBuilder.UseMySQL(connectionStr);
            return new TestContext(optionsBuilder.Options);
        }
    }

 

效果演示

  1. 刚开始没有任何连接信息

  

 

  2. 添加一个连接信息

  

  

 

  3. 查询连接对应的数据

  

 

  4. 再添加两个连接信息

  

  

  

  

  

 

  5. 查看ZooKeeper的信息

  docker run -it --rm --link zookeeper:zookeeper zookeeper:3.4.13 zkCli.sh -server zookeeper

  ls /connections

  get /connections/1/value

  get /connections/2/value

  get /connections/3/value

  

 

补充说明

  因为把数据库连接信息写到了程序内存中,因此,如果当ZooKeeper出现了故障:

  1. 老的(正在运行)应用正在使用的数据库不会受到影响,但无法监听到数据库信息的变化;

  2. 新的应用无法启动。

  ZooKeeper恢复后:

  1. 老的(正在运行)应用会重连,重新监听到数据库信息的变化

  2. 新的应用可以成功启动。

 

源码地址

  https://github.com/ErikXu/zookeeper-connection-management

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。

暂无文章

物联网开发服务开发虚拟设备需要几步?

云栖号快速入门:【点击查看更多云产品快速入门】 不知道怎么入门?这里分分钟解决新手入门等基础问题,可快速完成产品配置操作! 物联网平台设备的正常开发流程是:设备端开发完成,设备上报...

osc_2axit9df
6分钟前
0
0
互联网互联网必看文章墙裂推荐

后端必看文章系列 大型项目架构演进过程及思考的点

code-ortaerc
7分钟前
11
0
ACL2020论文整理 - 知乎

ACL2020录取文章已经放出,链接如下: ACL2020论文集合 www.aclweb.org 为了以后更加方便地阅读论文,也本着一颗开源之心,花一个下午的时间整理了一下相关论文。鉴于本人精力有限,并且也只...

osc_5w65ebjo
8分钟前
0
0
SU(N) Hubbard 模型平均场

osc_31d5oo2i
10分钟前
13
0
Python语言及其应用PDF高清完整版百度云盘免费下载|python基础教程PDF电子书推荐

编辑推荐 本书内容易于理解,而且读起来生动有趣,是编程和Python初学者不可多得的教程。书中首先介绍了Python的基础知识,然后逐渐深入多种主题,结合教程和攻略式风格来讲解Python 3中的概...

osc_nbg2lo7i
10分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部