2017/04/26 16:15
阅读数 418


A Swarm of Sparks


Web scale computing has never been so simple

I work at WorldSense, where we build predictors for the best links you could add in your content by creating large language models from the World Wide Web. In the open source world, no tool is better suited for that kind of mass (hyper)text analysis than Apache Spark, and I wanted to share how we set it up and run it on the cloud, so you can give it a try.

我工作在WorldSense,我们创建一些链接的预测器,您可以通过创建来自全球的大型语言模型将其添加到你的内容中。在开源的世界中,没有比Apache Spark更适合超大文本分析的这一任务的了,我希望分享一些安装设置并运行云中的经验,希望你也能试一试。

Spark is a distributed system, and as any similar system, it has a somewhat demanding configuration. There is a plethora of ways of running Spark, and in this post I will try to describe what I think is a great setup nowadays: a standalone cluster running (mostly) on bare-bones Amazon EC2 spot instances configured using the newest Docker orchestrations tools.

Intermission: today I had the pleasure of playing with the amazing Databricks spark notebook during the Spark East Summit, and I highly recommend it.


顺便说一下,我在Spark East Summit, 试了一下 Databricks spark notebook,很有吸引力,因此推荐使用这种方法。

Back to work. Before we start, let us double check what we need for our spark setup:

  • The hardware, in the form of some machines in the cloud.
  • The software, Apache Spark, installed in each of them.
  • An abstraction layer to create a cluster from those machines.
  • Some coordination point through which all of this come to life.


  • 硬件,云中的一些虚拟机形式。
  • 软件,Apache Spark,安装在每一台机器上。
  • 从这些机器创建出集群的抽象层。
  • 投入运行中的协调和管理的方式。

We will move backwards through this list, as it makes it easier to present the different systems involved. We allocate our machines with Docker Machine, using the very latest docker engine version (v.1.10 is out already, no need to explicitly ask for it any longer), which contains all the functionality we need. Let us start with a very small machine:

我通过Docker Machine分配了一些机器,使用最新版(V1.10,译注:Docker发展很快,到官网下载最新版),我们先从最小的机器配置开始。

DRIVER_OPTIONS="--driver amazonec2 --amazonec2-security-group=default" 
# no longer needed: --engine-install-url
docker-machine create $DRIVER_OPTIONS --amazonec2-instance-type=t2.nano ${CLUSTER_PREFIX}ks

We will use that machine for Consul, an atomic distributed key-value store, inspired by Google's chubby. Consul will be responsible for keeping track of who is part of our cluster, among other things. Installing it is trivial, since someone on the internet already packed it as a Docker container for us:

我将这台机器用于Consul ,用于分布式的K-V存储。Consul将负责跟踪集群中的部件。安装起来很容易,因为已经有人将其封装成了Docker Container。如下方式,即可启动一个Consul的容器实例:

docker $(docker-machine config ${CLUSTER_PREFIX}ks) 
run -d -p "8500:8500" -h "consul" progrium/consul -server -bootstrap

This takes a few minutes to start, but you should only really need to do that once per cluster¹. Every time you bring the cluster up you can point to that same Consul instance, and keeping a t2.nano running will cost you less than five bucks an year.


Now we can instantiate the cluster's master machine. The core responsibility of this machine is coordinating the workers. It will be both the Spark master machine and the manager for our Docker Swarm, the system responsible for presenting the machines and containers as a cluster.

现在,我们初始化集群的主控节点。这个节点的主要任务是协调各个worker节点,既是Spark Master节点,也是Dcoker Swarm的管理节点(管理容器的集群和相应的服务器)。

KEYSTORE_IP=$(aws ec2 describe-instances | jq -r ".Reservations[].Instances[] | select(.KeyName==\"${CLUSTER_PREFIX}ks\" and .State.Name==\"running\") | .PrivateIpAddress")
SWARM_OPTIONS="--swarm --swarm-discovery=consul://$KEYSTORE_IP:8500 --engine-opt=cluster-store=consul://$KEYSTORE_IP:8500 --engine-opt=cluster-advertise=$NET_ETH:2376"
MASTER_OPTIONS="$DRIVER_OPTIONS $SWARM_OPTIONS --swarm-master -engine-label role=master --amazonec2-instance-type=m4.large"
docker-machine create $MASTER_OPTIONS --amazonec2-instance-type=m4.large $MASTER

There are a few interesting things going on here. First, we used some shell-fu to find the IP address of our Consul machine inside the Amazon network. Then we fed that to the swarm-discovery and cluster-store options so Docker can keep track of the nodes in our cluster and the network layout of the containers running in each of them. With the configs in place, we proceeded to create a m4.large machine, and labeled it as our master. We now have a fully functional 1-machine cluster, and can run jobs on it. Just point to the Docker Swarm manager and treat it as a regular Docker daemon.

这里有一些有趣的东西。首先,我们需要一个脚本去发现Amazon网络上的Consul机器的IP地址。然后,用于Swarm的发现和集群设置选项,让Docker可以跟踪集群中每一个节点和每一个容器的网络。在配置时,我们创建了m4.large机器,并且标记为Master。我们现在有了一个全功能的单节点集群,而且可以在上面运行任务了。只需要指向Docker Swarm管理器,将其当作一个通常的Docker伺服器。

docker $(docker-machine config --swarm $MASTER) run hello-world

To install Spark on our cluster, we will use Docker Compose, another tool from the Docker family. With Compose we can describe how to install and configure a set of containers. Starting from scratch is easy, but we will take a shortcut by using an existing image, gettyimages/spark, and only focus on the configuration part. Here is the result, which you should save in a docker-compose.yml file in the local directory.

为了在这个集群上安装Spark,我们使用Docker Compose,这是Docker系列工具中的另外一个。通过Comopse文件,可以描述如何安装和配置一系列容器实例。从头开始并不难,但我们从一个现成的镜像 gettyimages/spark开始,不失为一条捷径,然后重点放在配置方面。如下所示,你可以将下面的内容存储为文件docker-compose.yml,放在本地目录下。

version: "2"
    container_name: master
    image: gettyimages/spark:1.6.0-hadoop-2.6
    command: /usr/spark/bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
      - constraint:role==master
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
      - "8081-8095"
    image: gettyimages/spark:1.6.0-hadoop-2.6
    command: /usr/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
      - constraint:role!=master
      - 8081:8081
      - "8081-8095"
    driver: overlay

There are a lot of knobs in Spark, and they can all be controlled through that file. You can even customize the spark distribution itself using a Dockerfile and custom base images, as we do at WorldSense to get Scala 2.11 and a lot of heavy libraries². In this example, we are doing the bare minimal, which is just opening the operational ports to the world, plus the spark internal ports to the rest of the cluster (the expose directive).

Spark涉及到的很多配置,都可以通过这个文件来进行控制。进一步,还可以通过Dockerfile来定制基础镜像,我在worldsense中使用Scala 2.11盒其它的一些库。在这里,我建立了一个最小集合,打开了外部操作端口和Spark内部端口给集群使用。

Also note the parts of the config referring to the overlay network. The default network is where all services defined in the config file will run, which means they can communicate with each other using the container name as the target hostname. The swarm scheduler will decide for us on which machine each container goes, respecting the constraints⁷ we have put in place. In our config file, we have one that pins the master service in the master machine (which is not very powerful) and another which keeps the workers outside that machine. Let us try bringing up the master:

需要注意到,这里的网络使用了overlay network。所有服务的缺省网络是配置文件中定义的,可以使用容器名称作为宿主机名称进行相互通讯。Swarm scheduler为我们决定每一个容器运行在哪一个机器上,除非我们制定了相应的限制条件。在这里配置文件中,我们将Master service指定在master主机上,其它的Worker放在别的机器上。下面试着启动master

eval $(docker-machine env --swarm $MASTER)
docker-compose up -d master
lynx http://$(aws ec2 describe-instances | jq -r ".Reservations[].Instances[] 
| select(.KeyName==\"$MASTER\" and .State.Name==\"running\") | .PublicDnsName"):8080

So far we have bootstrapped out architecture with Consul, defined our cluster with Docker Swarm and delineated our spark installation with Docker Compose. The last remaining step is to add the bulk of the machines which will do the heavy work.

我们基于Consul建立了Docker Swarm的集群,使用Docker Compose安装了Spark,下一步添加几个执行实际计算工作的机器节点。

The worker machines should be more powerful, and you don't have to care too much about the stability of the individual instances. These properties make workers a perfect candidate for Amazon EC2 spot instances. They often cost less than one forth of the price of a reserved machine, a bargain you can't get elsewhere. Let us bring a few of them up, using docker-machine³ and the very helpful gnu parallel⁴ script.

worker机器需要计算能力比较强大,而且不需要太担心单个实例的稳定性问题。Amazon EC2 spot instances比较满足这个特性,比预留的机器的价格经常便宜四分之一。我们将启动一些这种实例,使用docker-machine工具和gnu parallel脚本来完成,如下。


parallel -j0 --no-run-if-empty --line-buffer docker-machine create \
$WORKER_OPTIONS < <(for n in $(seq 1 $CLUSTER_NUM_NODES); \
do echo "${CLUSTER_PREFIX}n$n"; done)

You now have over 300 cores available in your cluster, for less than a dollar an hour. Last month in WorldSense we used a similar cluster to process over 2 billion web pages from the common crawl repository over a few days. For now, let us bring up everything and compute the value of pi:

现在,在你的集群中有超过300个计算核可用。最近一个月,在WordSense我使用类似的集群处理了二十亿来自于common crawl repository的Web页面。下面,我们启动所有节点来计算pi的值:

eval $(docker-machine env --swarm $MASTER)
docker-compose scale master=1 worker=10
docker run --net=container:master --entrypoint spark-submit gettyimages/spark:1.6.0-hadoop-2.6 --master spark://master:7077 --class org.apache.spark.examples.SparkPi /usr/spark/lib/spark-examples-1.6.0-hadoop2.6.0.jar

In a more realistic scenario one would use something like rsync to push locally developed jars in the master machine, and then use docker volume support to expose those to the driver. That is how we do it in WorldSense⁵.

在更需要弹性的场合,可以使用rsync上载本地开发的jars到master主机,然后使用docker volume支持,这也是我们在WorldSense里所做的。

I think this is a powerful setup, with the great advantage that it is also easy to debug and replicate locally. I can simply change a bit the flags⁶ in these scripts to get virtually the same environment in my laptop. This flexibility has been helpful countless times.


Many companies offer hosted solutions for running code in Spark, and I highly recommend giving them a try. In our case, we had both budget restrictions and flexibility requirements that forced us into a custom deployment. It hasn't come without its costs, but we are sure having some fun.

Ah, talking about costs, do not forget to bring your cluster down!



docker-machine ls | grep "^${CLUSTER_PREFIX}" | cut -d\  -f1 | xargs docker-machine rm -y

This text was cross-posted from WorldSense’s blog at


  1. The need for serialized creation of the cluster-store should improve at some point.
  2. Spark runs jobs in its workers jvm, and sometimes it is really hard to avoid jar-hell when you have some library version in your code and the spark workers already have a different version. For some cases, the only solution is to modify the pom.xml that generates the workers jar itself, and we have done that to fix incompatibilities with logback, dropwizard, and jackson, among others. If you find yourself in the same position, don't be afraid to try that. It works.
  3. Machine allocation with docker-machine is very simple, but not super reliable. I often have some slaves that do not install correctly, and I simply kill them in a shell loop checking for the success of docker-machine env.
  4. GNU Parallel requires a citation, and I have to say that I do it happily. Before the advent of docker swarm, most of the setup we used was powered by GNU Parallel alone :-).
    O. Tange (2011): GNU Parallel — The Command-Line Power Tool,
     ;login: The USENIX Magazine, February 2011:42–47.
  5. By splitting our jars in rarely-changed dependencies and our own code, most of the time running fresh code in the cluster is just a matter of uploading a couple of megabytes.
  6. In my laptop, I need the following changes: DRIVER_OPTIONS= — driver virtualbox, NET_ETH=eth1 and KEYSTORE_IP=$(docker-machine ip keystore).
  7. I have had trouble recently with constraints in more complex scenarios, although they work fine with the simple examples in this page. Unfortunately this has prevented a more aggressive migration of our infrastructure to swarm.


0 收藏
0 评论
0 收藏