文档章节

Airflow在Kubernetes上的操作器

openthings
 openthings
发布于 2018/07/07 15:49
字数 2236
阅读 586
收藏 4

Airflow在Kubernetes (第一部分): 一种不同类型的Operator

作者: Daniel Imberman (Bloomberg LP)

介绍

作为Bloomberg’s 持续提交来开发Kubernetes ecosystem,我们高兴地宣布 Kubernetes Airflow Operator的诞生。作为一种 Apache Airflow运行机制,一个流行的工作流程整合框架,可以原生地使用Kubernetes API来任意调用Kubernetes Pods。

什么是Airflow?

Apache Airflow 是 DevOps 的 “Configuration As Code.”方法论的实现之一。Airflow允许用户多个步骤的流水线,使用简单的Python object DAG (Directed Acyclic Graph)来实现。你可以定义dependencies,通过程序来构建复杂的workflows,然后监控调度执行的任务,具有易于查看的UI。

列表方式呈现的工作流程:

Airflow DAGs 

图形方式呈现的工作流程:

Airflow UI

为什么将 Airflow 运行在 Kubernetes之上?

因为从一开始,Airflow的最大优势就是其灵活性。Airflow 提供了非常广泛的服务的整合,包括Spark 和 HBase, 以及其它的不同的云服务提供者。Airflow 也通过器插件框架提供了非常好的扩展能力。但是,其限制在于Airflow users 被限制在其worker执行的框架和客户端。一个组织可能有不同的Airflow workflows,从数据科学分析流程到应用开发。这些应用场景给依赖管理带来问题,因为团队可能需要在不同的流程中使用不同的支持库。

为了聚焦于该问题,我们优化了Kubernetes允许用户执行任意的 Kubernetes pods 和 configurations。Airflow 的用户现在可以获得全面能力,来使用运行环境、资源以及安全设置,从而将 Airflow 转变为一个可以运行任何工作负载的workflow orchestrator。

Kubernetes 的 Operator

在开始下一步之前,我们限制在Airflow中的 Operator 是一个任务的定义。 当用户创建一个 DAG,他将使用一个 operator,如 “SparkSubmitOperator” 或者 “PythonOperator” 来分别提交/监控一个 Spark job 或者 Python函数。 Airflow 带有框架内置的operators,如 Apache Spark, BigQuery, Hive, 和 EMR。并且提供了Plugins entrypoint,允许DevOps 工程师开发自己的连接器。

Airflow 用户一直在寻找使开发和 ETL 流水线管理更简单的方法。任何解耦流水线步骤,增加可监控性,都能减少将来的中断和救火问题。下面是Airflow Kubernetes Operator提供的好处:

  • 对开发过程增加的灵活性:Airflow’s plugin API 提供显著的特性来帮助需要在DAGs上测试新功能的工程师。当开发者想要创建一个新的 operator,他们不得不开发一个完整的新 plugin.。现在,任何任务都可以运行在 Docker 容器之中,使用一致的operator来访问,没有额外的 Airflow 代码需要维护。

  • 配置和依赖关系的灵活性:对于运行在静态的Airflow workers的operators,依赖管理变得相当的困难。如果开发者想要运行一个要求依赖 SciPy 的任务,而另外一个要求 NumPy, 开发者就不得不维护二者的依赖库,使其能够适应所有的Airflow workers,或者将其分离到外部的机器。自定义的Docker镜像允许用户 确保运行环境、配置和依赖是完全等价的。

  • 使用 kubernetes secrets添加安全性:处理敏感数据是任何DevOps 工程都需要面对的职责所在。Airflow 用户希望隔离API keys, database passwords, 以及 login credentials在一个严格的“仅需知道” 的环境中。通过Kubernetes operator,用户可以使用 Kubernetes Vault 技术来存储所有敏感数据。这意味着Airflow workers永远不会存取这些信息,只需要简单地请求 pods,仅提供需要的 secrets 数据。

架构

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python Client 来提交请求,然后由 APIServer (1)处理。然后Kubernetes使用你定义的参数来启动你的 pod(2). 容器镜像将会在需要时被载入,并赋予需要的环境变量, secrets 和 dependencies, enacting a single command。一旦 job启动,operator 只需要监控logs (3)。用户可以收集本地调度器的logs或者分布式的 logging service(已经部署在 Kubernetes 集群之中)。

使用 Kubernetes Operator

基本例程

下面的 DAG 是一个简单的例子,用来演示 Kubernetes Operator 如何工作。该 DAG 创建了两个 pods 到 Kubernetes:一个带Python的Linux distro和一个Ubuntu distro(不带python)。 该 Python pod 将正常运行 Python 请求, 没有Python的将会报告一个失败信息。如果Operator 正常工作,该 passing-task pod 将完成而 failing-task pod 将返回 failure 到Airflow webserver。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

显示的任务图如下:

Basic DAG Run

如何结合到 workflow?

钙离子只用了两个 images,Docker的神奇之处在于同样的DAG可以工作在任何image/command,只要你愿意。下面是一些 CI/CD 流程,在Airflow DAG中输出产品级的代码。

1: PR in github

使用 Travis 或者 Jenkins 来运行 unit 和 integration 测试,然后合并到 master 分之,触发自动化的 CI build过程。

2: CI/CD via Jenkins -> Docker Image

Generate your Docker images and bump release version within your Jenkins build.

3: Airflow launches task

最终,更新DAGs 来反映新版本的变化!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

启动 test deployment

因为Kubernetes Operator还没有发布,我们还没有推出一个正式的 helm chart 和 operator (都在进行之中)。下面包含了一些基本的步骤,来实验这些新的特征。

Step 1: Set your kubeconfig to point to a kubernetes cluster

Step 2: Clone the Airflow Repo:

运行 git clone https://github.com/apache/incubator-airflow.git 来复制Airflow代码仓库。

Step 3: Run

为了运行基本的开发过程, 我们采用一些脚本来驱动当前的 Kubernetes Executor (将在下一篇文章中介绍). 运行下面三个命令:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在进一步开始之前,先讨论一下这些命令的作用:

sed -ie “s/KubernetesExecutor/LocalExecutor/g” scripts/ci/kubernetes/kube/configmaps.yaml

该 Kubernetes Executor是另外一个Airflow 的特征,允许等价pods的动态分配。我切换到 LocalExecutor的原因是为了更简单地介绍这些特征。你可以跳过这些,如果愿意尝试一下 Kubernetes Executor, 但这将在未来的文章中讲述。

./scripts/ci/kubernetes/Docker/build.sh

该 script 将包装 Airflow master 源码,构建一个Docker container为Airflow distribution。

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们将在集群中创建一个完整的Airflow分发。包括 Airflow configs, 一个postgres backend,  webserver + scheduler, 以及所有需要的服务。需要注意的一个事是 role binding 是 cluster-admin, 因此如果没有集群的权限, 你可以修改 scripts/ci/kubernetes/kube/airflow.yaml。

Step 4: Log into your webserver

现在Airflow 实例已经运行,我们看一下 UI。UI 服务于Airflow pod端口 8080 ,简单地运行:

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

现在Airflow UI 将服务于 http://localhost:8080。为了登录,简单地进入 airflow/airflow就可以访问 Airflow web UI。

Step 5: Upload a test document

为了修改/添加自己的 DAGs, 你可以使用 kubectl cp 上载本地文件到Airflow scheduler的DAG folder。Airflow将读取新的DAG 然后自动上载自己的系统。下面的命令自动上载本地温江到正确的目录:

kubectl cp <local file> <namespace>/<pod>:/root/airflow/dags -c scheduler

Step 6: Enjoy!

 

什么时候可以用呢?

这个特征功能还在早起开发阶段,我们希望下几个月内就能发布一些版本,可以更广泛地使用。

参与进来

该特征是促进Apache Airflow 集成进的 Kubernetes的诸多努力的一个开始。该 Kubernetes Operator 已经合并进 1.10 release branch of Airflow (executor在体验模式), 完整的 k8s 原生调度器称为 Kubernetes Executor。

如果感兴趣加入,建议先了解一下下面的信息:

  • 加入airflow-dev的邮件列表 dev@airflow.apache.org。
  • 提交问题到 Apache Airflow JIRA。
  • 加入我们的 SIG-BigData 会议,在 Wednesdays at 10am PST。
  • 通过slack联系:#sig-big-data 在 kubernetes.slack.com。

特别感谢Apache Airflow 和 Kubernetes 社区,尤其是 Grant Nicholas, Ben Goldberg, Anirudh Ramanathan, Fokko Dreisprong, 和 Bolke de Bruin。

© 著作权归作者所有

openthings
粉丝 320
博文 1129
码字总数 675031
作品 1
东城
架构师
私信 提问
AirFlow/NiFi/MLFlow/KubeFlow进展

大数据分析中,进行流程化的批处理是必不可少的。传统的大数据处理大部分是基于关系数据库系统,难以实现大规模扩展;主流的基于Hadoop/Spark体系总体性能较强,但使用复杂、扩展能力弱。大数...

openthings
06/21
257
0
基于Kubernetes的瓜子云的任务调度系统

很大的挑战。 接下来我讲详细介绍一下瓜子云的任务调度系统搭建所遇到的问题和解决方案。 需求 瓜子最早的时候,任务调度用的是Crontab,后来由于数据仓库的复杂调度需求,我们引入了Airflow...

店家小二
2018/12/14
0
0
Apache 基金会宣布 Apache Airflow 毕业成为顶级项目

Apache 软件基金会宣布,Apache Airflow 已经成功地从孵化毕业,成为基金会的一个新的顶级项目。 Apache Airflow 是一个灵活、可扩展的工作流自动化和调度系统,可编集和管理数百 PB 的数据流...

王练
01/09
3.8K
4
airflow单机版搭建记录——不使用mysql,redis

[toc] airflow单机版搭建记录 环境准备 Python(pip)——airflow由python编写 安装airflow pip install apache-airflow 环境变量配置 本人是在root用户下执行,可自行选择 export AIRFLOW_HOM...

helplove
07/03
53
0
airflow使用mysql数据库,LocalExecutor并发调度

mysql-airflow 在mysql上执行 create database airflow; —— 创建数据库 GRANT all privileges on airflow.* TO 'airflow'@'%' IDENTIFIED BY '123456'; —— 将数据库airflow的所有权限授权......

helplove
07/03
130
0

没有更多内容

加载失败,请刷新页面

加载更多

64.监控平台介绍 安装zabbix 忘记admin密码

19.1 Linux监控平台介绍 19.2 zabbix监控介绍 19.3/19.4/19.6 安装zabbix 19.5 忘记Admin密码如何做 19.1 Linux监控平台介绍: 常见开源监控软件 ~1.cacti、nagios、zabbix、smokeping、ope...

oschina130111
今天
13
0
当餐饮遇上大数据,嗯真香!

之前去开了一场会,主题是「餐饮领袖新零售峰会」。认真听完了餐饮前辈和新秀们的分享,觉得获益匪浅,把脑子里的核心纪要整理了一下,今天和大家做一个简单的分享,欢迎感兴趣的小伙伴一起交...

数澜科技
今天
7
0
DNS-over-HTTPS 的下一代是 DNS ON BLOCKCHAIN

本文作者:PETER LAI ,是 Diode 的区块链工程师。在进入软件开发领域之前,他主要是在做工商管理相关工作。Peter Lai 也是一位活跃的开源贡献者。目前,他正在与 Diode 团队一起开发基于区块...

红薯
今天
9
0
CC攻击带来的危害我们该如何防御?

随着网络的发展带给我们很多的便利,但是同时也带给我们一些网站安全问题,网络攻击就是常见的网站安全问题。其中作为站长最常见的就是CC攻击,CC攻击是网络攻击方式的一种,是一种比较常见的...

云漫网络Ruan
今天
12
0
实验分析性专业硕士提纲撰写要点

为什么您需要研究论文的提纲? 首先当您进行研究时,您需要聚集许多信息和想法,研究论文提纲可以较好地组织你的想法, 了解您研究资料的流畅度和程度。确保你写作时不会错过任何重要资料以此...

论文辅导员
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部