Akka实战:分散、聚合模式
Akka实战:分散、聚合模式
羊八井 发表于2年前
Akka实战:分散、聚合模式
  • 发表于 2年前
  • 阅读 3224
  • 收藏 83
  • 点赞 7
  • 评论 13

新睿云服务器60天免费使用,快来体验!>>>   

摘要: **分散、聚合**:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。 本实例来自一个真实的线上产品,现将其需求简化如下: 1. 传入一个关键词:`key`,根据`key`从网上抓取相关新闻 2. 可选传入一个超时参数:`duration`,设置任务到期时必须反回数据(返回实际已抓取数据) 3. 若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库

分散与聚合:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。

代码 http://git.oschina.net/yangbajing/akka-action

分散与聚合图

实例背景

本实例来自一个真实的线上产品,现将其需求简化如下:

  1. 传入一个关键词:key,根据key从网上抓取相关新闻
  2. 可选传入一个超时参数:duration,设置任务到期时必须反回数据(返回实际已抓取数据)
  3. 若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库

设计

根据需求,一个简化的分散、聚合模式可以使用两个actor来实现。

  • NewsTask:接收请求,并设置超时时间
  • SearchPageTask:执行实际的新闻抓取操作(本实例将使用TimeUnit模拟抓取耗时)

实现

NewsTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/NewsTask.scala

  override def metricPreStart(): Unit = {
    context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay)
  }

  override def metricReceive: Receive = {
    case StartFetchNews =>
      _receipt = sender()
      (0 until NewsTask.TASK_SIZE).foreach { i =>
        context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key)
      }

    case GetNewsItem(newsItem) =>
      _newses ::= newsItem
      if (_newses.size == NewsTask.TASK_SIZE) {
        logger.debug(s"分散任务,${NewsTask.TASK_SIZE}个已全部完成")

        if (_receipt != null) {
          _receipt ! NewsResult(key, _newses)
          _receipt = null
        }
        self ! PoisonPill
      }

    case TaskDelay =>
      if (_receipt != null) {
        _receipt ! NewsResult(key, _newses)
        _receipt = null
      }
  }

metricPreStart方法中设置定时方法,调用时间为从代码运行开始到doneDuration时间为止。定时被触发时将向当前Actor发送一个TaskDelay消息。

metricReceive方法中,分别对StartFetchNewsGetNewsItemTaskDelay三个消息进行操作。

在收到StartFetchNews消息时,actor首先保存发送者actor的引用(结果将返回到此actor)。再根据TASK_SIZE生成相应子任务

GetNewsItem消息的处理中,每收到一个消息就将其添加到_newses列表中。并判断当_newses个数等于TASK_SIZE时(所有子任务已完成)将结果发送给_receipt

self ! PoisonPill,这句代码停止actor自身。它将把“毒药”发送到NewsTask Actor的接收邮箱队列中。

TaskDelay消息被触发时,将直接返回已完成的新闻_newses。返回数据后并不终止当前还未运行完任务。

SearchPageTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/SearchPageTask.scala

  override def metricReceive: Receive = {
    case SearchPage(key) =>
      // XXX 模拟抓取新闻时间
      TimeUtils.sleep(Random.nextInt(20).seconds)

      val item = NewsItem(
        "http://newssite/news/" + self.path.name,
        "测试新闻" + self.path.name,
        self.path.name,
        TimeUtils.now().toString,
        "内容简介", "新闻正文")

      taskRef ! GetNewsItem(item)
      context.stop(self)
  }

SearchPageTask的代码逻辑就比较易懂了,这里使用sleep来模拟实际抓取新闻时的耗时。生成结果后返回数据给`taskRef`,并终止自己。

执行测试

./sbt
akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest

总结

这是一个简单的Akka实例,实现了任务分发与结果聚合。提供了一种在指定时间内返回部份有效数据,同时任务继续执行的方式。这种分散、聚合的模式在实际生产中很常用,比如对多种数据源的整合,或某些需要长时间运行同时对返回数据完整性无强制要求的情况等。

MetricActor演示了怎么自定义Actor,并为其提供一些侦测点的方式。以后有时间会写篇详文介绍。

标签: scala akka actor
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
羊八井
粉丝 91
博文 33
码字总数 38606
评论 (13)
红薯
代码扔到 git.oschina.net 吧 :)
孙青彪
刚玩过[13]
羊八井

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)
OK
erpadmin

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
羊八井

引用来自“Erp管理员_付强”的评论

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
两边都放
Fancy2015
Scala写多线程还是挺爽
jacksu
欢迎大家关注: [scala工具库](https://github.com/jacksu/utils4s) ,里面包含各种库的测试用例和使用说明文档
欢迎一起学习
jacksu
心尖偏左
[79]
壹峰

引用来自“Erp管理员_付强”的评论

引用来自“红薯”的评论

代码扔到 git.oschina.net 吧 :)

不行,有github为啥不用
Narcissu5
两年前学了akka,现在还没有用上
天天天
只能单机玩
羊八井

引用来自“天天天”的评论

只能单机玩
用akka-actor-remote可以扩展多机,akka也带了cluster
×
羊八井
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: