PigPen 介绍:Clojure 的 Map-Reduce

原创
2014/01/18 23:37
阅读数 768

这篇文章翻译自 http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html。之前翻译过关于 cascalog 的文章(Cascalog 入门(1)Cascalog 入门(2))。Cascalog 是基于 Cascading,PigPen 是基于 Apache Pig,两者是比较相似的东西。以下进入正文:

PigPen

我们今天很高兴向全世界发布了 PigPen,这是一个为 Clojure 准备的 Map-Reduce,它会最终被编译成 Apache Pig,但是你并不需要非常了解 Pig。

PigPen 是什么?

  • 一种看起来和用起来跟 clojure.core 都很像的 map-reduce 语言
  • 可以把 map-reduce 的查询当成程序来写,而不是当成脚本来写
  • 为单元测试和迭代部署提供强大的支持

注意:如果你对 Clojure 不是很熟悉,我们强烈推荐你试下这里这里 或者 这里 的教程来了解一些 基础

<!--more-->

真的又是一种 map-reduce 语言吗?

如果你会 Clojure,你就已经会 PigPen 了

PigPen 的主要目标是要把语言带出等式的行列。PigPen 的操作符设计的和 Clojure 里尽可能的相似,没有特殊的用户自定义函数(UDFs)。只需要定义函数(匿名的或者命名的),然后你就能像在 Clojure 程序里一样使用它们。

这里有个常用的 word count 的例子:

(require '[pigpen.core :as pig])

(defn word-count [lines]
  (->> lines
    (pig/mapcat #(-> % first
                   (clojure.string/lower-case)
                   (clojure.string/replace #"[^\w\s]" "")
                   (clojure.string/split #"\s+")))
    (pig/group-by identity)
    (pig/map (fn [[word occurrences]] [word (count occurrences)]))))

这段代码定义了一个函数,这个函数返回一个 PigPen 的查询表达式。这个查询接受一系列的行作为输入,返回每个单词出现的次数。你可以看到这只是一个 word count 的逻辑,并没有设计到一些外部的东西,比如数据从哪里来的,会产生哪些输出。

可以组合吗?

当然。PigPen 的查询是写成函数的组合——数据输入、输出。只需要写一次,不需要到处复制、粘贴。

现在我们利用以上定义的 word-count 函数,加上 load 和 store 命令,组成一个 PigPen 的查询:

(defn word-count-query [input output]
  (->>
    (pig/load-tsv input)
    (word-count)
    (pig/store-tsv output)))

这个函数返回查询的 PigPen 表示,他自己不会做什么,我们需要从本地执行它,或者生成一个脚本(之后会讲)。

你喜欢单元测试?我们可以做

利用 PigPen,你可以 mock 输入数据来为你的查询写单元测试。再也不需要交叉着手指想象提交到 cluster 上后会发生什么,也不需要截出部分文件来测试输入输出。

Mock 数据真的很容易,通过 pig/return 和 pig/constantly,你可以在你的脚本里注入任意的数据作为起始点。

一个常用的模式是利用 pig/take 来从实际数据源中抽样出几行,用 pig/return 把结果包一层,就得到了 mock 数据。

(use 'clojure.test)

(deftest test-word-count
  (let [data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]])]
    (is (= (pig/dump (word-count data))
           [["moon" 1]
            ["jumped" 2]
            ["dog" 1]
            ["over" 2]
            ["cow" 1]
            ["fox" 1]
            ["the" 4]]))))

pig/dump 操作符会在本地执行查询。

闭包

向你的查询传参数很麻烦,所有函数范围内的变量或者 let 的绑定在函数里都可用。

(defn reusable-fn [lower-bound data]
  (let [upper-bound (+ lower-bound 10)]
    (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))

注意 lower-bound 和 upper-bound 在生成脚本的时候就有了,在 cluster 上执行函数的时候也能使用。

那么我怎么用它呢?

只要告诉 PigPen 哪里会把一个查询写成一个 Pig 脚本:

(pig/write-script "word-count.pig"
                  (word-count-query "input.tsv" "output.tsv"))

这样你就能得到一个可以提交到 cluster 上运行的 Pig 脚本。这个脚本会用到 pigpen.jar,这是一个加入所有依赖的 uberjar,所以要保证这个 jar 也一起被提交了。还可以把你的整个 project 打包成一个 uberjar 然后提交,提交之前记得先重命名。怎么打包成 uberjar 请参照教程。

之前看到,我们可以用 pig/dump 来本地运行查询,返回 Clojure 数据:

=> (def data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]]))
                          
#'pigpen-demo/data

=> (pig/dump (word-count data))
[["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]

如果你现在就像开始,请参照 getting started & tutorials

为什么我需要 Map-Reduce?

Map-Reduce 对于处理单台机器搞不定的数据是很有用,有了 PigPen,你可以像在本地处理数据一样处理海量数据。Map-Reduce 通过把数据分散到可能成千上万的集群节点来达到这一目的,这些节点每个都会处理少量的数据,所有的处理都是并行的,这样完成一个任务就比单台机器快得多。像 join 和 group 这样的操作,需要多个节点数据集的协调,这种情况会通过公共的 join key 把数据分到同一个分区计算,join key 的同一个值会送到同一个指定的机器。一旦机器上得到了所有可能的值,就能做 join 的操作或者做其他有意思的事。

想看看 PigPen 怎么做 join 的话,就来看看 pig/cogroup 吧。cogroup 接受任意数量的数据集然后根据一个共同的 key 来分组。假设我们有这样的数据:

foo:

  {:id 1, :a "abc"}
  {:id 1, :a "def"}
  {:id 2, :a "abc"}

bar:

  [1 42]
  [2 37]
  [2 3.14]

baz:

  {:my_id "1", :c [1 2 3]]}

如果想要根据 id 分组,可以这样:

(pig/cogroup (foo by :id)
             (bar by first)
             (baz by #(-> % :my_id Long/valueOf))
             (fn [id foos bars bazs] ...))

前三个参数是要 join 的数据集,每一个都会指定一个函数来从数据源中选出 key。最后的一个参数是一个函数,用来把分组结果结合起来。在我们的例子中,这个函数会被调用两次:

[1 ({:id 1, :a "abc"}, {:id 1, :a "def"})
   ([1 42])
   ({:my_id "1", :c [1 2 3]]})]
[2 ({:id 2, :a "abc"})
   ([2 37] [2 3.14])
   ()]

这把所有 id 为 1 的值和 id 为 2 的值结合在了一起。不同的键值被独立的分配到不同的机器。默认情况下,key 可以不在数据源中出现,但是有选项可以指定必须出现。

Hadoop 提供了底层的接口做 map-reduce job,但即便如此还是有限制的,即一次只会运行一轮 map-reduce,没有数据流和复杂查询的概念。Pig 在 Hadoop 上抽象出一层,但到目前为止,它仍旧只是一门脚本语言,你还是需要用 UDF 来对数据做一些有意思的事情。PigPen 更进一步的做了抽象,把 map-reduce 做成了一门语言。

如果你刚接触 map-reduce,我们推荐你看下这里

做 PigPen 的动机

  • **代码重用。**我们希望能定义一段逻辑,然后通过穿参数把它用到不同的 job 里。
  • **代码一体化。**我们不想在脚本和不同语言写的 UDF。 之间换来换去,不想考虑不同数据类型在不同语言中的对应关系。
  • **组织好代码。**我们想把代码写在多个文件里,想怎么组织怎么组织,不要被约束在文件所属的 job 里。
  • **单元测试。**我们想让我们的抽样数据关联上我们的单元测试,我们想让我们的单元测试在不存取数据的情况下测试业务逻辑。
  • **快速迭代。**我们想能够在任何时候注入 mock data,我们想在不用等 JVM 启动的情况下测试一个查询。
  • **只给想要命名的东西命名。**大部分 map-reduce 语言对中间结果要求命名和指定数据结构,这使得用 mock data 来测试单独的 job 变得困难。我们想要在我们觉得合适的地方组织业务逻辑并命名,而不是受语言的指使。
  • 我们受够了写脚本,我们想要写程序。

注意:PigPen 不是 一个 Clojure 对 Pig 脚本的封装,很有可能产生的脚本是人看不懂的。

设计和功能

PigPen 设计的和 Clojure 尽可能保持一致。Map-Reduce 是函数式编程,那为什么不利用一门已存在的强大的函数式编程语言呢?这样不光学习曲线低,而且大多数概念也能更容易的应用到大数据上。

在 PigPen 中,查询被当做 expression tree 处理,每个操作符都被表示需要的行为信息的 map,这些 map 可以嵌套在一起组成一个复杂查询的树形表式。每个命令包含了指向祖命令的引用。在执行的时候,查询树会被转化成一个有向无环的查询图。这可以很容易的合并重复的命令,优化相关命令的顺序,并且可以利用 debug 信息调试查询。

优化

去重 当我们把查询表示成操作图的时候,去重是一件很麻烦的事。Clojure 提供了值相等的操作,即如果连个对象的内容相同,它们就相等。如果两个操作有相同的表示,那它们完全相同,所以在写查询的时候不用担心重复的命令,它们在执行之前都会被优化。

举个例子,假设我们有这样两个查询:

(let [even-squares (->>
                     (pig/load-clj "input.clj")
                     (pig/map (fn [x] (* x x)))
                     (pig/filter even?)
                     (pig/store-clj "even-squares.clj"))
      odd-squares (->>
                    (pig/load-clj "input.clj")
                    (pig/map (fn [x] (* x x)))
                    (pig/filter odd?)
                    (pig/store-clj "odd-squares.clj"))]
  (pig/script even-squares odd-squares))

在这个查询中,我们从一个文件加载数据,计算每个数的平方,然后分成偶数和奇数,操作图看起来是这样: 在此输入图片描述

这符合我们的查询,但是做了很多额外的工作。我们加载了 input.clj 两次,所有数的平方也都计算了两次。这看上去可能没有很多工作,但是当你对很多数据做这样的事情,简单的操作累加起来就很多。为了优化这个查询,我们可以找出相同的操作。看第一眼发现我们计算平方的操作可能是一个候选,但是他们有不同的父节点,因此不能把他们合并在一起。但是我们可以把加载函数合并,因为他们没有父节点,而且他们加载相同的文件。

现在我们的图看起来是这样: 在此输入图片描述

现在我们值加载一次数据,这会省一些时间,但还是要计算两次平方。因为我们现在只有一个加载的命令,我们的 map 操作现在相同,可以合并: 在此输入图片描述

这样我们就得到了一个优化过的查询,每个操作都是唯一的。因为我们每次只会合并一个命令,我们不会修改查询的逻辑。你可以很容易的生成查询,而不用担心重复的执行,PigPen 对重复的部分只会执行一次。

序列化 当我们用 Clojure 处理完数据以后,数据必须序列化成二进制字节,Pig 才能在集群的机器间传数据。这对 PigPen 是一个很昂贵但是必须的过程。幸运的是一个脚本中经常有很多连续的操作可以合成一个操作,这对于不必要的序列化和反序列化节省了很多时间。例如,任意连续的 map,filter 和 mapcat 操作都可以被重写成一个单独的 mapcat 操作。

我们通过一些例子来说明: 在此输入图片描述

在这个例子中,我们从一个序列化的值(蓝色)4开始,对它反序列化(橙色),执行我们的 map 函数,然后再把它序列化。

现在我们来试一个稍微复杂一点的(更现实的)例子。在这个例子中,我们执行一个 map,一个 mapcat 和一个 filter 函数。

如果你以前没用过 mapcat,我可以告诉你这是对一个值运行一个函数然后返回一串值的操作。那个序列会被 flatten,每个值都会传给下一步使用。在 Clojure 里,那是 map 和 concat 联合之后的结果,在 Scala 里,这叫做 flatMap,而在 C# 里叫 selectMany。

在下图中,左边的流程是我们优化之前的查询,右边的是优化之后的。和第一个例子一样,我们同样从 4 开始,计算平方,然后对这个值做减一的操作,返回本身和加一的操作。Pig 会得到这个值的集合然后做 flatten,使每个值都成为下一步的输入。注意在和 Pig 交互的时候我们要序列化和反序列化。第三步,也就是最后一步对数据进行过滤,在这个例子中我们只保留奇数值。如图所示,我们在任意两步之间都序列化和反序列化数据。

右边的图显示了优化后的结果。每个操作都返回了一个元素序列。map 操作返回一个只有单元素 16 的序列,mapcat 也一样,过滤操作返回 0 元素或单元素的序列。通过是这些命令保持一致,我们可以很容易的把他们合并到一起。我们在一套命令中flattrn 了更多的值序列,但是在步骤之间没有序列化的消耗。虽然卡起来更复杂,但是这个优化是每个步骤都执行的更快了。 在此输入图片描述

测试,本地执行,以及调试

交互式开发,测试,以及可调试性是 PigPen 的关键功能。如果你有一个一次运行好几天的 job,那你最不想看到的是跑了十一个小时后冒出来一个 bug。PigPen 有个基于 rx 的本地运行模式。这可以让我们对查询写单元测试。这样我们可以更有把握的知道运行的时候不会挂掉,并且能返回期待的值。更牛逼的是这个功能可以让我们进行交互式的开发。

通常情况下,我们刚开始会从数据源中选一些记录来做单元测试。因为 PigPen 在 REPL 中返回数据,我们不需要额外构造测试数据。这样,通过 REPL,我们可以根据需要对 mock 数据做 map,filter,join 和 reduce 的操作。每个步骤都可以验证结果是不是我们想要的。这种方法相对于写一长串脚本然后凭空想象能产生更可靠的数据。还有一个有用的地方是可以把复杂的查询写成几个较小的函数单元。Map-reduce 查询随着数据源的量级可能产生剧烈的增加或减少。当你把脚本作为一个整体测试的时候,你可能要读一大堆数据,最后产生一小撮数据。通过把查询细化成较小的单元,你可以对读 100 行,产生 2 行这样子来测试一个单元,然后测试第二个单元的时候可以用这两行作为模板来产生 100 多个数据。

调试模式对于解决异常很有用,启用后会在正常输出的同时,把脚本中每个操作的结果写到磁盘上。这对于像 Hadoop 这样的环境很有用,在这种情况下,你没法单步跟踪代码,而且每个步骤都可能花好几个小时。调试模式还可以可视化流程图。这样可以可视化的把执行计划的和实际操作的输出关联起来。

要启用调试模式,请参考 pig/write-scriptpig/generate-script 的选项,这会在指定的目录下写额外的调试输出。

启用调试模式的例子:

(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)

要启用可视化模式,可以看看 pig/showpig/dump&show

可视化的例子:

(pig/show my-pigpen-query)        ;; Shows a graph of the query
(pig/dump&show my-pigpen-query)   ;; Shows a graph and runs it locally

扩展 PigPen

PigPen 有个好用的功能是可以很容易的创建自己的操作符。例如,我们可以定义像求差集和交集这样的集合和多集合的操作符,这些只是像 co-group 这样的操作符的变体,但是如果能定义,测试它们,然后再也不去想这些逻辑怎么实现的,那就更好了。

这对更复杂的操作也是很有用的。对于集合数据我们有 sumavgminmaxsdquantiles 这些可重用的统计操作符,还有 pivot 这样的操作符可以把多维数据分组然后对每组计数。

这些操作本身都是简单的操作,但是当你把它们从你的查询中抽象出来之后,你的查询也会变的简单很多。这时候你可以花更多的时间去想怎么解决问题,而不是每次都重复写基本的统计方法。

为什么用 Pig?

我们选择 Pig 是因为我们不想把 Pig 已有的优化的逻辑重写一遍,不考虑语言层面的东西的话,Pig 在移动大数据方面做得很好。我们的策略是利用 Pig 的 DataByteArray 二进制格式来移动序列化的 Clojure 数据。在大多数情况下,Pig 不需要知道数据的底层展现形式。Byte array 可以很快的做比较,这样对于 join 和 group 操作,Pig 只需要简单的比较序列化的二进制,如果序列化的输出一致,在 Clojure 中值就相等。不过这对于数据排序不适用。二进制的排序其实没什么用,而且和原始数据的排序结果也不一样。要想排序,还得把数据转化回去,而且只能对简单类型排序。这也是 Pig 强加给 PigPen 的为数不多的一个缺陷。

我们在决定做 PigPen 之前也评估过其他语言。第一个要求就是那必须是一门编程语言,并不是一种脚本语言加上一堆 UDF。我们简单看过 Scalding,它看上去很有前途,但是我们的团队主要是用的 Clojure。 可以这么说,PigPen 对于 Clojure 就像是 Scalding 对于 Scala。Cascalog 是用 Clojure 写 map-reduce 通常会用的语言,但是从过去的经验来看,Cascalog 对于日常工作其实没什么用,你需要学一套复杂的新语法和很多概念,通过变量名对齐来做隐式 join 也不是理想的方案,如果把操作符顺序弄错了会造成很大的性能问题,Cascalog 会 flatten 数据结果(这可能很浪费),而且组合查询让人感觉很别扭。

我们也考虑过对 PigPen 用一门宿主语言。这样也能在 Hive 之上构建类似的抽象,但是对每个中间产物都定义 schema 跟 Clojure 的理念不符。而且 Hive 类似与 SQL,使得从功能性语言翻译更难。像 SQL 和 Hive 这样的关系模型语言与像 Clojure 和 Pig 这样的功能性语言之间有着巨大的差。最后,最直接的解决办法就是在 Pig 之上做一层抽象。

展开阅读全文
加载中
点击加入讨论🔥(1) 发布并加入讨论🔥
打赏
1 评论
0 收藏
0
分享
返回顶部
顶部