文档章节

如何设计一个实时流计算系统

fourinone
 fourinone
发布于 2013/08/11 11:57
字数 1383
阅读 4525
收藏 111

实时流计算的场景归纳起来多半是:
业务系统根据实时的操作,不断生成事件(消息/调用),然后引起一系列的处理分析,这个过程是分散在多台计算机上并行完成的,看上去就像事件连续不断的流经多个计算节点处理,形成一个实时流计算系统。

市场上流计算产品有很多,主要是通过消息中枢结合工人模式实现,大致过程如下:
1、开发者实现好流程输入输出节点逻辑,上传job到任务生产者
2、任务生产者将任务发送到zookeeper,然后监控任务状态
3、任务消费者从zookeeper上获取任务
4、任务消费者启动多个工人进程,每个进程又启动多个线程执行任务
5、工人之间通过zeroMQ交互

我们看看如何做一个简单的流计算系统,做法跟上面有些不同:
1、首先不过多依赖zookeerper,任务的分配最好直接给到工人,并能直接监控工人完成状态,这样效率会更高。
2、工人之间直接通讯,不依赖zeroMQ转发。
3、并行管理扁平化,多进程下再分多线程意义不大,增加管理成本,实际上一台机器8个进程,每个进程再开8个线程,总体跟8-10个进程或者线程的效果差不多(数量视机器性能不同)。
4、做成一个流计算系统,而不是平台。

这里我们借助fourinone提供的api和框架去实现,第一次使用可以参考分布式计算上手demo指南,开发包下载地址 http://code.google.com/p/fourinone/

大致思路:用工头去做任务生产和分配,用工人去做任务执行,为了达到流的效果,需要在工人里面调用工头的方式,将多个工人节点串起来,形成一个计算拓扑图。

下面程序演示了连续多个消息先发到一个工人节点A处理,然后再发到两个工人节点B并行处理的流计算过程,并且获取到最后处理结果打印输出(如果不需要获取结果可以直接返回)。

StreamCtorA:工头A实现,它获取到线上工人A,然后将消息发给它处理,并轮循等待结果。工头A的main函数模拟了多个消息的连续调用。

StreamWorkerA:工人A实现,它接收到工头A的消息进行处理,然后创建一个工头B,通过工头B将结果同时发给两个工人B处理,然后将结果返回工头A。

StreamCtorB:工头B实现,它获取到线上两个工人B,调用doTaskBatch等待两个工人处理完成,然后返回结果给工人A。

StreamWorkerB:工人B实现,它接收到任务消息后模拟处理后返回结果。

运行步骤(在本地模拟):
1、启动ParkServerDemo(它的IP端口已经在配置文件指定)
java -cp fourinone.jar; ParkServerDemo

2、启动工人A
java  -cp fourinone.jar; StreamWorkerA localhost 2008

3、启动两个工人B
java  -cp fourinone.jar; StreamWorkerB localhost 2009
java  -cp fourinone.jar; StreamWorkerB localhost 2010

4、启动工头A
java  -cp fourinone.jar; StreamCtorA

多机部署说明:StreamCtorA可以单独部署一台机器,StreamWorkerA和StreamCtorB部署一台机器,两个StreamWorkerB可以部署两台机器。

总结:计算平台和计算系统的区别
如果我们只有几台机器,但是每天有人开发不同的流处理应用要在这几台机器上运行,我们需要一个计算平台来管理好job,让开发者按照规范配置好流程和运行时节点申请,打包成job上传,然后平台根据每个job配置动态分配资源依次执行每个job内容。
如果我们的几台机器只为一个流处理业务服务,比如实时营销,我们需要一个流计算系统,按照业务流程部署好计算节点即可,不需要运行多个job和动态分配资源,按照计算平台的方式做只会增加复杂性,开发者也不清楚每台机器上到底运行了什么逻辑。
如果你想实现一个计算平台,可以参考动态部署和进程管理功能(开发包内有指南)

//完整源码
// ParkServerDemo

import com.fourinone.BeanContext;
public class ParkServerDemo
{
	public static void main(String[] args)
	{
		BeanContext.startPark();
	}
}

//StreamCtorA

import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;

public class StreamCtorA extends Contractor
{
 public WareHouse giveTask(WareHouse inhouse)
 {
  WorkerLocal[] wks = getWaitingWorkers("StreamWorkerA");
  System.out.println("wks.length:"+wks.length);
  
  WareHouse result = wks[0].doTask(inhouse);
  while(true){
   if(result.getStatus()!=WareHouse.NOTREADY)
   {
    break;
   }
  }
  return result;
 }
 
 public static void main(String[] args)
 {
  StreamCtorA sc = new StreamCtorA();
  for(int i=0;i<10;i++){
    WareHouse msg = new WareHouse();
    msg.put("msg","hello"+i);
    WareHouse wh = sc.giveTask(msg);
    System.out.println(wh);
  }
  sc.exit();
 }
}

//StreamWorkerA

import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;

public class StreamWorkerA extends MigrantWorker
{
 public WareHouse doTask(WareHouse inhouse)
 {
  System.out.println(inhouse);
  //do something
  StreamCtorB sc = new StreamCtorB();
  WareHouse msg = new WareHouse();
  msg.put("msg",inhouse.getString("msg")+",from StreamWorkerA");
  WareHouse wh = sc.giveTask(msg);
  sc.exit();
  
  return wh;
 }
 
 public static void main(String[] args)
 {
  StreamWorkerA wd = new StreamWorkerA();
  wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerA");
 }
}

//StreamCtorB
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;

public class StreamCtorB extends Contractor
{
 public WareHouse giveTask(WareHouse inhouse)
 {
  WorkerLocal[] wks = getWaitingWorkers("StreamWorkerB");
  System.out.println("wks.length:"+wks.length);
  
  WareHouse[] hmarr = doTaskBatch(wks, inhouse);
  
  WareHouse result = new WareHouse();
  result.put("B1",hmarr[0]);
  result.put("B2",hmarr[1]);
  
  return result;
 }
}

//StreamWorkerB
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;

public class StreamWorkerB extends MigrantWorker
{
 public WareHouse doTask(WareHouse inhouse)
 {
  System.out.println(inhouse);
  //do something
  inhouse.put("msg",inhouse.getString("msg")+",from StreamWorkerB");
  return inhouse;
 }
 
 public static void main(String[] args)
 {
  StreamWorkerB wd = new StreamWorkerB();
  wd.waitWorking(args[0],Integer.parseInt(args[1]),"StreamWorkerB");
 }
}

 

© 著作权归作者所有

共有 人打赏支持
fourinone

fourinone

粉丝 273
博文 43
码字总数 49961
作品 1
杭州
私信 提问
加载中

评论(6)

旁观者-郑昀
旁观者-郑昀

引用来自“fourinone”的评论

引用来自“FeiFan”的评论

這個用mq不就可以完成了嗎?不需要zookeeper吧。

上面讲的例子没有zookeeper。
单纯一个mq只能完成异步通讯,但是不容易实现“任务的调度和状态监控、集群工人管理、流效果”等。

How about this:Redis(BLPOP/BRPOP的)+Gearman?
see http://huoding.com/2013/05/19/257
FeiFan
FeiFan

引用来自“fourinone”的评论

引用来自“FeiFan”的评论

這個用mq不就可以完成了嗎?不需要zookeeper吧。

上面讲的例子没有zookeeper。
单纯一个mq只能完成异步通讯,但是不容易实现“任务的调度和状态监控、集群工人管理、流效果”等。

嗯 可能你有特定的业务环境吧. 不知道python的celery 加上一个result back-end能否满足你说的监控 管理, 流效果 :)
fourinone
fourinone

引用来自“FeiFan”的评论

這個用mq不就可以完成了嗎?不需要zookeeper吧。

上面讲的例子没有zookeeper。
单纯一个mq只能完成异步通讯,但是不容易实现“任务的调度和状态监控、集群工人管理、流效果”等。
n
neoaries
工人之間直接通訊的目的是什麼呢?舉個例子?
若水191
若水191
顶楼上的,搞这么复杂,mq的topic已经足够了,
FeiFan
FeiFan
這個用mq不就可以完成了嗎?不需要zookeeper吧。
【推荐系统介绍】NetFlix - 系统概述

阅读背景: 1 : 您需要知道什么是NetFlix? 2 : 推荐系统的基本概念? 3 : 推荐系统的实时化。 阅读目的: 精确的了解目前推荐系统的分层结构:确立各个层次之间精细划分。 系统被分为3部分...

止静
2014/09/10
0
0
实时数据平台设计:技术选型与应用场景适配模式

实时数据平台(RTDP,Real-time Data Platform)是一个重要且常见的大数据基础设施平台。在href="http://mp.weixin.qq.com/s?biz=MzI4NTA1MDEwNg==&mid=2650769309&idx=1&sn=1ddfddafc057f0......

卢山巍
2018/08/09
0
0
大数据经典学习路线(及供参考)

转:https://blog.csdn.net/yuexianchang/article/details/52468291 目录(?)[+]

junzixing1985
2018/04/15
0
0
第九届中国数据库技术大会:邀您开启技术人生下一个十年

DTCC已经悄然来到了第九届,同样是成长的9年,不断变化的大会主题,适应时代潮流技术的新议题,都是反复调整和适应的结果,这中间也有坚持:始终关注大数据、数据库的变化和更迭能给我们带来...

码云Gitee
2018/01/10
83
0
DataPipeline & Confluent Kafka Meetup上海站

一、活动介绍 Confluent作为国际数据“流”处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据。DataPipeline作为国内首家原生支持Kafka解决方...

DataPipeline
2018/09/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rabbitmq安装教程

RabbitMQ有Windows与Linux版本的,这里先写Windows版本的安装。 以前安装软件总是在百度上找某某安装教程,结果能按照教程安装好的软件真的不多。想起先前以为大牛说的一句话,去官网按照官网...

em_aaron
今天
6
0
Android 贝塞尔曲线实践——波浪式运动

一、波浪效果如下 贝塞尔曲线自定义波浪效果的案例很多,同样方法也很简单,大多数和本案例一样使用二次贝塞尔曲线实现,同样还有一种是PathMeasure的方式,这里我们后续补充,先来看贝塞尔曲...

IamOkay
今天
3
0
Nmap之防火墙/IDS逃逸

选项 解释 -f 报文分段 --mtu 指定偏移大小 -D IP欺骗 -sI 原地址欺骗 --source-port 源端口欺骗 --data-length 指定发包长度 --randomize-hosts 目标主机随机排序 --spoof-mac Mac地址欺骗 ...

Frost729
今天
2
0
带你搭一个SpringBoot+SpringData JPA的环境

不知道大家对SpringBoot和Spring Data JPA了解多少,如果你已经学过Spring和Hibernate的话,那么SpringBoot和SpringData JPA可以分分钟上手的。 其实我在学完SpringBoot和SpringData JPA了之...

java菜分享
今天
7
0
Chocolatey 在Window搭建一个开发环境

在看了(利用 Chocolatey 快速在 Windows 下搭建一个开发环境)后,准备从零开始 一、准备工作 1、用管理员权限启动:powershell,执行错误请参考(PowerShell因为在此系统中禁止执行脚本的解...

近在咫尺远在天涯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部