文档章节

ETL采集器(负载均衡)

w
 wangshuaiJava
发布于 2015/10/12 22:49
字数 1070
阅读 118
收藏 0

     ETL集群采集器基于ETL采集器(单机版)的一个升级,主要分为五大组件:分别为JOB任务组件、采集器组件、KAFKA消息组件、ETL清洗组件、存储组件,每台节点每秒清洗3W-5W条日志。

  1. JOB任务组件

  • JOB任务组件简要介绍

    1. 技术简要说明:基于JOB管理器、zookeeper、hadoop RPC 开发

    2. JOB任务组件分三部分组成:MasterJob、SlaveJob、zookeeper

      1. MasterJob主要责任是生产任务,建立RPC服务

      2. SlaveJob主要责任是消费执行任务,通过RPC获取任务

      3. zookeeper主要责任监控MasterJob、SlaveJob 快速切换 Master,并广播任务

    3.  负载均衡消费任务

    4.  支持任务的启用、停用

    5.  支持MySql、Oracle、DB2等多种数据库管理任务

    6.  灵活便利的管理quartz任务

    7. 支持任务参数的传递 

    8. 具体实现参考单机版ETL http://my.oschina.net/u/2470985/blog/509714

  1. 支持任务参数的传递

  • JOB任务组件架构设计

  • zookeeper 监控Master 、Slave ,选举Master,选举RPC服务端

 papublic void init() throws Exception {
  client = CuratorFrameworkFactory.newClient(zookQuorum, // 服务器列表
    createTimeout, // 会话超时时间,单位毫秒
    connTimeout, // 连接创建超时时间,单位毫秒
    new ExponentialBackoffRetry(time, timeoutCount) // 重试策略
    );
  nodeFactory = NodeFactory.getNodeFactory();
  // 启动zk
  client.start();
  // 监控分发job任务监控
  jobMonitor();
  // 添加master、slave监控
  addMasterMonitor();
  // 初始化节点
  initCreateNode(client, nodePath, nodePathName, jobTaskPath,
    jobTaskPathName, nodeFactory);
 }
 /**
  * 
  * @Title: initCreateNode
  * @Description: 初始化节点
  * @param client
  * @param nodePath
  * @param nodePathName
  * @param jobTaskPath
  * @param jobTaskPathName
  * @param nodeFactory
  * @throws Exception
  * @return: void
  */
 public void initCreateNode(CuratorFramework client, String nodePath,
   String nodePathName, String jobTaskPath, String jobTaskPathName,
   NodeFactory nodeFactory) throws Exception {
  // 创建node节点
  client.create().creatingParentsIfNeeded()
    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    .forPath(nodePath + nodePathName);
  if (client.checkExists().forPath(jobTaskPath + jobTaskPathName) == null) {
   client.create()
     .creatingParentsIfNeeded()
     .forPath(jobTaskPath + jobTaskPathName,
       nodeFactory.getIp().getBytes());
  }
 }
 /**
  * 
  * @Title: jobMonitor
  * @Description: 监控JOB
  * @throws Exception
  * @return: void
  */
 public void jobMonitor() throws Exception {
  zookeeperjobTask = new PathChildrenCache(client, jobTaskPath, true);
  zookeeperjobTask.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        jobTaskPath, rpcPort, nodeFactory);
     }
    });
  zookeeperjobTask.start(StartMode.BUILD_INITIAL_CACHE);
 }
 /**
  * 
  * @Title: addmasterMonitor
  * @Description: 监控Master
  * @throws Exception
  * @return: void
  */
 public void addMasterMonitor() throws Exception {
  zookeeperNodeEvent = new PathChildrenCache(client, nodePath, true);
  zookeeperNodeEvent.getListenable().addListener(
    new PathChildrenCacheListener() {
     public void childEvent(CuratorFramework client,
       PathChildrenCacheEvent enEvent) throws Exception {
      nodeFactory.getZookManage().masterMonitor(client,
        nodePath, rpcPort, nodeFactory);
     }
    });
  zookeeperNodeEvent.start(StartMode.BUILD_INITIAL_CACHE);
 }
@Override
 public void masterMonitor(CuratorFramework client, String nodePath,
   int port, NodeFactory nodeFactory) throws Exception {
  // 状态
  Map<String, String> nodeState = nodeFactory.getNodeState();
  // 获取当前IP
  String ip = nodeFactory.getIp();
  // 获取masterNode
  String masterNode = nodeFactory.getMasterNode();
  // 设置job节点
  List<String> nodeList = client.getChildren().forPath(nodePath);
  Collections.sort(nodeList);
  String master = (new String(client.getData().forPath(
    nodePath + "/" + nodeList.get(0))));
  // 选本机为master
  if (ip.equals(master)) {
   // 判断本机是否新的master null则为新的master
   if (nodeFactory.getRpcServiceNode() == null) {
    nodeFactory.setMasterNode(master);
    RpcServiceNode rpcServiceNode = new RpcServiceNode();
    rpcServiceNode.setPort(port);
    rpcServiceNode.setHost(ip);
    rpcServiceNode.start();
    // 设置rpc对象
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    // 设置RPC客户端
    nodeFactory.setRpcServiceNode(rpcServiceNode);
    // 开启策略
    BeanFactory.getBean().getLoadingService().init();
    // 设置服务
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_MASTER,
      PROCESS_START);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  } else {
   // 假如本机不是当前master则关闭RPC服务
   if (nodeFactory.getRpcServiceNode() != null) {
    nodeFactory.getRpcServiceNode().stop();
    nodeFactory.setRpcServiceNode(null);
    //关闭JOB
    BeanFactory.getBean().getLoadingService().stop();
    nodeState.remove(ip + PROCESS_CUT + StartNode.HOST_MASTER);
   }
   // 假如当前master发生改变则切换
   if ((!masterNode.equals(master))) {
    nodeFactory.setRpcCommand(getRpcCommandProxy(master, port));
    nodeFactory.setMasterNode(master);
    nodeState.put(ip + PROCESS_CUT + StartNode.HOST_SLAVE,
      PROCESS_START);
   }
  }
 }
 @Override
 public RpcCommand getRpcCommandProxy(String ip, int port) throws Exception {
  return RPC.getProxy(RpcCommand.class, RpcCommand.versionID,
    new InetSocketAddress(ip, port), new Configuration());
 }
  • 采集器组件

  • 采集组件简要介绍

    1. 采集层支持DB并发采集、FTP并发采集、syslog接收、本地文件采集

    2. 支持FTP、DB 异常补采

    3. 采集层支持JOB任务阀值配置,DB连接池设置、Ftp连接设置、syslog 批量生产文件等

    4. 提供采集层开发者模式,标准API接口

    5. 数据库表管理采集任务

    6. 将采集的数据负载均衡到KAFKA中间件中

  • kafka组件

  • kafka组件简要介绍

    1. 接收采集器消息存放分区中

    2. kafka负载均衡消息,ETL负载均衡消费分区中消息

    3. kafka支持订阅、消费消息、ETL实时分析消息

  • etl组件

  • etl组件简要介绍

    1. 清洗层支持数据追加、数据汇总、数据补全、过滤、映射、转换、拆分、解析 

    2. 清洗层支持清洗任务阀值配置

    3. 清洗层清洗开发者模式 ,标准API接口

    4. 清洗层支持库表管理清洗流程

    5. 接收清洗完成的数据,自定义存储,库、表、hive 等

    6. 存储层支持自定义多库存储、自定义表存储

    7. 提供存储层开发者模式,标准API接口

    8. 存储异常保存文件,监控异常文件重新存储。

    9. 支持实时分析,支持开发etl函数库

ETL集群采集器设计

 

ETL采集清洗应用(审计系统架构)

 

 

 

© 著作权归作者所有

共有 人打赏支持
w
粉丝 1
博文 9
码字总数 11465
作品 0
武汉
日志服务(原SLS)2018-2 月功能发布

[新地域] 亚太东南5(雅加达)访问入口 [新功能] 新增窗口/逻辑运算/二进制处理/Lamdba等分析函数 文档 分析功能新增窗口、逻辑运算、二进制处理、Lamdba等分析函数,提升日志处理分析能力。...

简志
03/01
0
0
案例|服务化架构系统监控难题解决方案

原文网址链接:http://url.cn/kVjUVO 众所周知,系统监控一直是拥有复杂IT架构的企业所面临的一个重要问题,而这也并不是每家企业都能够轻松解决的技术挑战。OPPO作为一家国际智能终端设备及...

数通畅联
2015/11/02
0
0
SQL Server数据同步的研究(单向/双向)

思路: 1、做中间件(简单:定时采集;复杂:分布式,订阅中心的形式,如微信的中间件:https://github.com/tencent-wechat/phxsql) 2、采用触发器的形式,有数据触发是进行多服务器的来回数...

easonjim
2016/11/29
0
0
Nginx 配置实践

nginx 一般用作请求转发,用作服务器集群的负载均衡 典型的高并发集群是 nginx+tomcat(多个) nginx可以高效处理对静态文件的请求,tomcat 负责动态请求 配置范例: #user nobody;worker_pr...

HZCoder
2016/03/29
49
0
爬虫面试遇到外行面试官

爬虫岗位很少,我总共也就面过五六家,其中某金融互联网公司技术最好,虽然他们的爬虫人员也是后来转的 问题一:如果连接断了怎么办? 什么连接呢,猜是tcp 连接吧,tcp连接不是时时联通的,...

HZCoder
2016/01/18
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

7 个致命的 Linux 命令

导读 如果你是一个 Linux 新手,在好奇心的驱使下,可能会去尝试从各个渠道获得的命令。以下是 7 个致命的 Linux 命令,轻则使你的数据造成丢失,重则使你的系统造成瘫痪,所以,你应当竭力避...

问题终结者
59分钟前
0
0
设计模式:工厂方法模式(工厂模式)

工厂方法模式才是真正的工厂模式,前面讲到的静态工厂模式实际上不能说是一种真正意义上的设计模式,只是一种变成习惯。 工厂方法的类图: 这里面涉及到四个种类: 1、抽象产品: Product 2、...

京一
今天
0
0
区块链和数据库,技术到底有何区别?

关于数据库和区块链,总会有很多的困惑。区块链其实是一种数据库,因为他是数字账本,并且在区块的数据结构上存储信息。数据库中存储信息的结构被称为表格。但是,区块链是数据库,数据库可不...

HiBlock
今天
0
0
react native 开发碰到的问题

react-navigation v2 问题 问题: static navigationOptions = ({navigation, navigationOptions}) => ({ headerTitle: ( <Text style={{color:"#fff"}}>我的</Text> ), headerRight: ( <View......

罗培海
今天
0
0
Mac Docker安装流程

久仰Docker大名已久,于是今天趁着有空,尝试了一下Docker 先是从docker的官网上下载下来mac版本的docker安装包,安装很简易,就直接拖图标就好了。 https://www.docker.com/products/docker...

writeademo
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部