文档章节

Akka实战:分散、聚合模式

羊八井
 羊八井
发布于 2015/11/26 00:16
字数 964
阅读 3315
收藏 83
点赞 7
评论 13

分散与聚合:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。

代码 http://git.oschina.net/yangbajing/akka-action

分散与聚合图

实例背景

本实例来自一个真实的线上产品,现将其需求简化如下:

  1. 传入一个关键词:key,根据key从网上抓取相关新闻
  2. 可选传入一个超时参数:duration,设置任务到期时必须反回数据(返回实际已抓取数据)
  3. 若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库

设计

根据需求,一个简化的分散、聚合模式可以使用两个actor来实现。

  • NewsTask:接收请求,并设置超时时间
  • SearchPageTask:执行实际的新闻抓取操作(本实例将使用TimeUnit模拟抓取耗时)

实现

NewsTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/NewsTask.scala

  override def metricPreStart(): Unit = {
    context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay)
  }

  override def metricReceive: Receive = {
    case StartFetchNews =>
      _receipt = sender()
      (0 until NewsTask.TASK_SIZE).foreach { i =>
        context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key)
      }

    case GetNewsItem(newsItem) =>
      _newses ::= newsItem
      if (_newses.size == NewsTask.TASK_SIZE) {
        logger.debug(s"分散任务,${NewsTask.TASK_SIZE}个已全部完成")

        if (_receipt != null) {
          _receipt ! NewsResult(key, _newses)
          _receipt = null
        }
        self ! PoisonPill
      }

    case TaskDelay =>
      if (_receipt != null) {
        _receipt ! NewsResult(key, _newses)
        _receipt = null
      }
  }

metricPreStart方法中设置定时方法,调用时间为从代码运行开始到doneDuration时间为止。定时被触发时将向当前Actor发送一个TaskDelay消息。

metricReceive方法中,分别对StartFetchNewsGetNewsItemTaskDelay三个消息进行操作。

在收到StartFetchNews消息时,actor首先保存发送者actor的引用(结果将返回到此actor)。再根据TASK_SIZE生成相应子任务

GetNewsItem消息的处理中,每收到一个消息就将其添加到_newses列表中。并判断当_newses个数等于TASK_SIZE时(所有子任务已完成)将结果发送给_receipt

self ! PoisonPill,这句代码停止actor自身。它将把“毒药”发送到NewsTask Actor的接收邮箱队列中。

TaskDelay消息被触发时,将直接返回已完成的新闻_newses。返回数据后并不终止当前还未运行完任务。

SearchPageTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/SearchPageTask.scala

  override def metricReceive: Receive = {
    case SearchPage(key) =>
      // XXX 模拟抓取新闻时间
      TimeUtils.sleep(Random.nextInt(20).seconds)

      val item = NewsItem(
        "http://newssite/news/" + self.path.name,
        "测试新闻" + self.path.name,
        self.path.name,
        TimeUtils.now().toString,
        "内容简介", "新闻正文")

      taskRef ! GetNewsItem(item)
      context.stop(self)
  }

SearchPageTask的代码逻辑就比较易懂了,这里使用sleep来模拟实际抓取新闻时的耗时。生成结果后返回数据给`taskRef`,并终止自己。

执行测试

./sbt
akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest

总结

这是一个简单的Akka实例,实现了任务分发与结果聚合。提供了一种在指定时间内返回部份有效数据,同时任务继续执行的方式。这种分散、聚合的模式在实际生产中很常用,比如对多种数据源的整合,或某些需要长时间运行同时对返回数据完整性无强制要求的情况等。

MetricActor演示了怎么自定义Actor,并为其提供一些侦测点的方式。以后有时间会写篇详文介绍。

© 著作权归作者所有

共有 人打赏支持
羊八井

羊八井

粉丝 93
博文 34
码字总数 39906
作品 0
渝北
技术主管
加载中

评论(13)

羊八井
羊八井

引用来自“天天天”的评论

只能单机玩
用akka-actor-remote可以扩展多机,akka也带了cluster
天天天
天天天
只能单机玩
Narcissu5
Narcissu5
两年前学了akka,现在还没有用上
壹峰
壹峰

引用来自“Erp管理员_付强”的评论

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
心尖偏左
心尖偏左
[79]
jacksu
jacksu
jacksu
jacksu
欢迎大家关注: [scala工具库](https://github.com/jacksu/utils4s) ,里面包含各种库的测试用例和使用说明文档
欢迎一起学习
Fancy2015
Fancy2015
Scala写多线程还是挺爽
羊八井
羊八井

引用来自“Erp管理员_付强”的评论

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
两边都放
erpadmin
erpadmin

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
2015总结暨2016展望

2015已过去,2016到来。展望未来也总结过去。 2015 2015年到了一家新的公司,是一家做大数据服务的创业公司(准备说是2014年底)。刚到公司时我们只有几人,到现在已经成为一家50人左右的中小...

羊八井
2016/01/07
105
2
《Akka应用模式:分布式应用程序设计实践指南》读书笔记3

分布式领域驱动设计   DDD领域驱动设计,应该就是一种系统设计的方法论,可以知道人们设计软件。其实老外就喜欢总结一些方法论用于指导实践,这一点还是很重要的。 DDD概述   DDD是一套关...

gabrywu
06/06
0
0
Scala 技术周刊 | 第 24 期

这里有最新的 Scala 社区动态、技术博文。 微信搜索 「scalacool」关注我们,及时获取最新资讯。 深度阅读 Resolve me, Implicitly 依赖注入 Refined types, what are they good for? 让类型...

ScalaCool
2017/10/23
0
0
使用DDD、事件风暴和Actor来设计反应式系统

领域驱动设计(domain-driven design,DDD)通常在微服务领域用于查找边界(限界上下文)。同样来自DDD的聚合(aggregate)对于定义持久化和一致性的范围来讲也是很重要的。 但是,并不是领域...

dotNET跨平台
04/04
0
0
分享一套Netty&ZeroMQ视频实战教程

该视频教程来源于微信群友分享,现在免费分享给大家,希望对你们有用。 视频教程目录结构如下: │ ├─1、课程介绍 │ │ 1-1 课程介绍.avi │ │ 1-2 初始设计.avi │ │ 1-课程介绍.pdf ...

架构之路
01/04
0
0
《Akka应用模式:分布式应用程序设计实践指南》读书笔记6

一致性和可扩展性   一致性是系统内比较复杂的属性,它会随着系统的变化而变化。简单来说,一致性就是数据保持一致,在分布式系统中,可以理解为多个节点中数据的值是一致的。一旦系统具有...

gabrywu
06/12
0
0
Play Framework 2.4 发布,Java Web 框架

Play Framework 2.4 发布,此版本主要改进内容如下: 依赖注入 可以直接嵌入 Play 应用 支持聚合反向路由器 Java 8 支持,要求 JDK 8 Maven/sbt 标准布局 Akka HTTP 支持 Reactive Streams 支...

oschina
2015/05/30
9.6K
46
Scala 技术周刊 | 第 21 期

这里有最新的 Scala 社区动态、技术博文。 微信搜索 「scalacool」关注我们,及时获取最新资讯。 深度阅读 Play! Framework 系列(二):play 的项目结构 Play! Framework 系列 Scalameta tu...

ScalaCool
2017/09/15
0
0
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
0
0
php socket 基础测试

什么是TCP/IP、UDP? TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,是一个工业标准的协议集,它是为广域网(WANs)设计的。 UDP(User Data Protoco...

ericSM
2016/08/25
21
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

eclipse SVN 项目重新定位

SVN 重新定位 1.方法一 首先:在Eclipse中选择Windows-> Show View->others 就会出现【SVN资源库/SVN Repositories】,选中后,点击确认; 然后:选中原有的地址,选择【重新定位/Relocate】...

qimh
5分钟前
0
0
Linux 第29课 ——Linux集群架构(下)

Linux集群架构(下) 八、DR模式搭建 8.1 准备工作 试验需求三台机器: 分发器,也叫调度器(简写为dir) 192.168.112.136 ying01 rs1 192.168.112.138 ying02 rs2 192.168.112.139 ying03 vip...

feng-01
10分钟前
0
0
轻松搭建svn版本管理工具+svnmanager管理客户端

前面的文章有写过svn版本管理工具的安装是基于svn的安装包进行安装,对于svn与apache的结合还得下svn和apache的模块进行结合过程比较繁琐,今天来介绍下通过centos的yum来安装svn能够快速安装...

javazyw
19分钟前
0
0
keepalived配置高可用集群

Linux集群概述 根据功能划分为两大类:高可用和负载均衡 高可用集群通常为两台服务器,一台工作,另外一台作为冗余,当提供服务的机器宕机,冗余将接替继续提供服务 实现高可用的开源软件有:...

TaoXu
25分钟前
0
0
mysql联表批处理操作

1 概述 mysql中的单表增删改查操作,可以说是基本中的基本. 实际工作中,常常会遇到一些基本用法难以处理的数据操作,譬如遇到主从表甚至多级关联表的情况(如一些历史问题数据的批量处理),考虑到...

社哥
28分钟前
0
0
IntelliJ IDEA 详细图解最常用的配置,适合刚刚用的新人。

刚刚使用IntelliJ IDEA 编辑器的时候,会有很多设置,会方便以后的开发,磨刀不误砍柴工。 比如:设置文件字体大小,代码自动完成提示,版本管理,本地代码历史,自动导入包,修改注释,修改...

kim_o
42分钟前
0
0
Google Java编程风格指南

目录 前言 源文件基础 源文件结构 格式 命名约定 编程实践 Javadoc 后记 前言 这份文档是Google Java编程风格规范的完整定义。当且仅当一个Java源文件符合此文档中的规则, 我们才认为它符合...

niithub
44分钟前
0
0
java.net.MalformedURLException异常说明

1.异常片段 Java代码中,在进行URL url = new URL(urllink)操作时,提示以下异常信息,该类异常主要问题出在参数urllink上面。 异常片段1 java.net.MalformedURLException at java.ne...

lqlm
45分钟前
1
0
CentOS7修改mysql5.6字符集

解决办法:CentOS7下修改MySQL数据库字符编码为UTF-8,UTF-8包含全世界所有国家所需要的字符集,是国际编码。 具体操作如下: 1.进入MySQL [root@tianqi-01 ~]# mysql -uroot -p Enter passw...

河图再现
46分钟前
0
0
DevExpress v18.1新版亮点——WPF篇(一)

用户界面套包DevExpress v18.1日前终于正式发布,本站将以连载的形式为大家介绍各版本新增内容。本文将介绍了DevExpress WPF v18.1 的新功能,快来下载试用新版本!点击下载>> Accordion Co...

Miss_Hello_World
49分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部