文档章节

ETL采集器(负载均衡)

w
 wangshuaiJava
发布于 2015/10/12 22:49
字数 1070
阅读 121
收藏 0

     ETL集群采集器基于ETL采集器(单机版)的一个升级,主要分为五大组件:分别为JOB任务组件、采集器组件、KAFKA消息组件、ETL清洗组件、存储组件,每台节点每秒清洗3W-5W条日志。

  1. JOB任务组件

  • JOB任务组件简要介绍

    1. 技术简要说明:基于JOB管理器、zookeeper、hadoop RPC 开发

    2. JOB任务组件分三部分组成:MasterJob、SlaveJob、zookeeper

      1. MasterJob主要责任是生产任务,建立RPC服务

      2. SlaveJob主要责任是消费执行任务,通过RPC获取任务

      3. zookeeper主要责任监控MasterJob、SlaveJob 快速切换 Master,并广播任务

    3.  负载均衡消费任务

    4.  支持任务的启用、停用

    5.  支持MySql、Oracle、DB2等多种数据库管理任务

    6.  灵活便利的管理quartz任务

    7. 支持任务参数的传递 

    8. 具体实现参考单机版ETL http://my.oschina.net/u/2470985/blog/509714

  1. 支持任务参数的传递

  • JOB任务组件架构设计

  • zookeeper 监控Master 、Slave ,选举Master,选举RPC服务端

 papublic void init() throws Exception {
  client = CuratorFrameworkFactory.newClient(zookQuorum, // 服务器列表
    createTimeout, // 会话超时时间,单位毫秒
    connTimeout, // 连接创建超时时间,单位毫秒
    new ExponentialBackoffRetry(time, timeoutCount) // 重试策略
    );
  nodeFactory = NodeFactory.getNodeFactory();
  // 启动zk
  client.start();
  // 监控分发job任务监控
  jobMonitor();
  // 添加master、slave监控
  addMasterMonitor();
  // 初始化节点
  initCreateNode(client, nodePath, nodePathName, jobTaskPath,
    jobTaskPathName, nodeFactory);
 }
 /**
  * 
  * @Title: initCreateNode
  * @Description: 初始化节点
  * @param client
  * @param nodePath
  * @param nodePathName
  * @param jobTaskPath
  * @param jobTaskPathName
  * @param nodeFactory
  * @throws Exception
  * @return: void
  */
 public void initCreateNode(CuratorFramework client, String nodePath,
   String nodePathName, String jobTaskPath, String jobTaskPathName,
   NodeFactory nodeFactory) throws Exception {
  // 创建node节点
  client.create().creatingParentsIfNeeded()
    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    .forPath(nodePath + nodePathName);
  if (client.checkExists().forPath(jobTaskPath + jobTaskPathName) == null) {
   client.create()
     .creatingParentsIfNeeded()
     .forPath(jobTaskPath + jobTaskPathName,
       nodeFactory.getIp().getBytes());
  }
 }
 /**
  * 
  * @Title: jobMonitor
  * @Description: 监控JOB
  * @throws Exception
  * @return: void
  */
 public void jobMonitor() throws Exception {
  zookeeperjobTask = new PathChildrenCache(client, jobTaskPath, true);
  zookeeperjobTask.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        jobTaskPath, rpcPort, nodeFactory);
     }
    });
  zookeeperjobTask.start(StartMode.BUILD_INITIAL_CACHE);
 }
 /**
  * 
  * @Title: addmasterMonitor
  * @Description: 监控Master
  * @throws Exception
  * @return: void
  */
 public void addMasterMonitor() throws Exception {
  zookeeperNodeEvent = new PathChildrenCache(client, nodePath, true);
  zookeeperNodeEvent.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        nodePath, rpcPort, nodeFactory);
     }
    });
  zookeeperNodeEvent.start(StartMode.BUILD_INITIAL_CACHE);
 }
@Override
 public void masterMonitor(CuratorFramework client, String nodePath,
   int port, NodeFactory nodeFactory) throws Exception {
  // 状态
  Map<String, String> nodeState = nodeFactory.getNodeState();
  // 获取当前IP
  String ip = nodeFactory.getIp();
  // 获取masterNode
  String masterNode = nodeFactory.getMasterNode();
  // 设置job节点
  List<String> nodeList = client.getChildren().forPath(nodePath);
  Collections.sort(nodeList);
  String master = (new String(client.getData().forPath(
    nodePath + "/" + nodeList.get(0))));
  // 选本机为master
  if (ip.equals(master)) {
   // 判断本机是否新的master null则为新的master
   if (nodeFactory.getRpcServiceNode() == null) {
    nodeFactory.setMasterNode(master);
    RpcServiceNode rpcServiceNode = new RpcServiceNode();
    rpcServiceNode.setPort(port);
    rpcServiceNode.setHost(ip);
    rpcServiceNode.start();
    // 设置rpc对象
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    // 设置RPC客户端
    nodeFactory.setRpcServiceNode(rpcServiceNode);
    // 开启策略
    BeanFactory.getBean().getLoadingService().init();
    // 设置服务
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_MASTER,
      PROCESS_START);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  } else {
   // 假如本机不是当前master则关闭RPC服务
   if (nodeFactory.getRpcServiceNode() != null) {
    nodeFactory.getRpcServiceNode().stop();
    nodeFactory.setRpcServiceNode(null);
    //关闭JOB
    BeanFactory.getBean().getLoadingService().stop();
    nodeState.remove(ip + PROCESS_CUT + StartNode.HOST_MASTER);
   }
   // 假如当前master发生改变则切换
   if ((!masterNode.equals(master))) {
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    nodeFactory.setMasterNode(master);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  }
 }
 @Override
 public RpcCommand getRpcCommandProxy(String ip, int port) throws Exception {
  return RPC.getProxy(RpcCommand.class, RpcCommand.versionID,
    new InetSocketAddress(ip, port), new Configuration());
 }
  • 采集器组件

  • 采集组件简要介绍

    1. 采集层支持DB并发采集、FTP并发采集、syslog接收、本地文件采集

    2. 支持FTP、DB 异常补采

    3. 采集层支持JOB任务阀值配置,DB连接池设置、Ftp连接设置、syslog 批量生产文件等

    4. 提供采集层开发者模式,标准API接口

    5. 数据库表管理采集任务

    6. 将采集的数据负载均衡到KAFKA中间件中

  • kafka组件

  • kafka组件简要介绍

    1. 接收采集器消息存放分区中

    2. kafka负载均衡消息,ETL负载均衡消费分区中消息

    3. kafka支持订阅、消费消息、ETL实时分析消息

  • etl组件

  • etl组件简要介绍

    1. 清洗层支持数据追加、数据汇总、数据补全、过滤、映射、转换、拆分、解析 

    2. 清洗层支持清洗任务阀值配置

    3. 清洗层清洗开发者模式 ,标准API接口

    4. 清洗层支持库表管理清洗流程

    5. 接收清洗完成的数据,自定义存储,库、表、hive 等

    6. 存储层支持自定义多库存储、自定义表存储

    7. 提供存储层开发者模式,标准API接口

    8. 存储异常保存文件,监控异常文件重新存储。

    9. 支持实时分析,支持开发etl函数库

ETL集群采集器设计

 

ETL采集清洗应用(审计系统架构)

 

 

 

© 著作权归作者所有

共有 人打赏支持
w
粉丝 1
博文 9
码字总数 11465
作品 0
武汉
日志服务(原SLS)2018-2 月功能发布

[新地域] 亚太东南5(雅加达)访问入口 [新功能] 新增窗口/逻辑运算/二进制处理/Lamdba等分析函数 文档 分析功能新增窗口、逻辑运算、二进制处理、Lamdba等分析函数,提升日志处理分析能力。...

简志
03/01
0
0
案例|服务化架构系统监控难题解决方案

原文网址链接:http://url.cn/kVjUVO 众所周知,系统监控一直是拥有复杂IT架构的企业所面临的一个重要问题,而这也并不是每家企业都能够轻松解决的技术挑战。OPPO作为一家国际智能终端设备及...

数通畅联
2015/11/02
0
0
SQL Server数据同步的研究(单向/双向)

思路: 1、做中间件(简单:定时采集;复杂:分布式,订阅中心的形式,如微信的中间件:https://github.com/tencent-wechat/phxsql) 2、采用触发器的形式,有数据触发是进行多服务器的来回数...

easonjim
2016/11/29
0
0
Nginx 配置实践

nginx 一般用作请求转发,用作服务器集群的负载均衡 典型的高并发集群是 nginx+tomcat(多个) nginx可以高效处理对静态文件的请求,tomcat 负责动态请求 配置范例: #user nobody;worker_pr...

HZCoder
2016/03/29
49
0
爬虫面试遇到外行面试官

爬虫岗位很少,我总共也就面过五六家,其中某金融互联网公司技术最好,虽然他们的爬虫人员也是后来转的 问题一:如果连接断了怎么办? 什么连接呢,猜是tcp 连接吧,tcp连接不是时时联通的,...

HZCoder
2016/01/18
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

原型模式

1、原型模式-定义 用原型实例指定创建对象的种类,并且通过拷贝这些原型创建新的对象 克隆(浅度克隆->拷贝值类型或者引用,深度克隆->创建新的对象,开辟新的内存) 例如客户端知道抽象Pro...

阿元
今天
55
0
awk命令扩展使用操作

awk 中使用外部shell变量 示例1 [root@centos01 t1022]# A=888[root@centos01 t1022]# echo "" | awk -v GET_A=$A '{print GET_A}'888[root@centos01 t1022]# echo "aaaaaaaaaaaaa" | aw......

野雪球
今天
47
0
深入解析MySQL视图VIEW

Q:什么是视图?视图是干什么用的? A:视图(view)是一种虚拟存在的表,是一个逻辑表,本身并不包含数据。作为一个select语句保存在数据字典中的。   通过视图,可以展现基表的部分数据;...

IT--小哥
今天
52
0
虚拟机学习之二:垃圾收集器和内存分配策略

1.对象是否可回收 1.1引用计数算法 引用计数算法:给对象中添加一个引用计数器,每当有一个地方引用它时,计数器值就加1;当引用失效时,计数器值就减1;任何时候计数器值为0的对象就是不可能...

贾峰uk
今天
54
0
smart-doc功能使用介绍

smart-doc从8月份底开始开源发布到目前为止已经迭代了几个版本。在这里非常感谢那些敢于用smart-doc去做尝试并积极提出建议的社区用户。因此决定在本博客中重要说明下smart-doc的功能,包括使...

上官胡闹
昨天
50
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部