文档章节

spark streming写入外部系统

 张欢19933
发布于 2017/06/22 18:25
字数 749
阅读 37
收藏 0

SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统。然而,重要的是要了解如何正确有效地使用这种原始方法。一些常见的错误,以避免如下: 
写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统。为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据。代码如下:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }}

上面的代码是一个错误的演示,因为连接是在Driver中创建的,而写数据是在worker中完成的。此时连接就需要被序列化然后发送到worker中。但是我们知道,连接的信息是不能被序列化和反序列化的(不同的机器连接服务器需要使用不同的服务器端口,即便连接被序列化了也不能使用)

进而我们可以将连接移动到worker中实现,代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}

但是此时,每处理一条数据记录,就需要连接一次外部系统,对于性能来说是个严重的问题。这也不是一个完美的实现。

Spark基于RDD进行编程,RDD的数据不能改变,如果擅长foreachPartition底层的数据可能改变,做到的方式foreachPartition操作一个数据结构,RDD里面一条条数据,但是一条条的记录是可以改变的spark也可以运行在动态数据源上。(就像数组的数据不变,但是指向的索引可以改变) 
我们可以将代码做如下的改进:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}

这样一个partition,只需连接一次外部存储。性能上有大幅度的提高。但是不同的partition之间不能复用连接。我们可以使用连接池的方式,使得partition之间可以共享连接。代码如下:

stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }}

1、需要注意的是: 
(1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。 
(2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。 
(3)、在插入MySQL的时候最好使用批量插入。 
(4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。 
(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

© 著作权归作者所有

共有 人打赏支持
上一篇: linux下挂载硬盘
下一篇: linux命令vim
粉丝 35
博文 481
码字总数 230666
作品 0
海淀
私信 提问
spark streaming on yarn每个批次接收的数据量非常少的问题

最近在调试spark on yarn的程序,通过spark streming调用kafka的低阶API实时获取kafka中的数据,spark streaming中每个批次获取到的kafka数据非常少,只有几百,但是用local模式运行正常,使用Rec...

MeYJ
2016/07/15
565
1
Apache Spark 2.4 正式发布,重要功能详细介绍

美国时间 2018年11月08日 正式发布了。一如既往,为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.4 带来了许多新功能,如下: 添加一种支持屏障模式(barrier mode)的调度器,以便...

Spark
11/10
0
0
Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区
05/16
0
0
Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0
04/23
0
0
【译】Apache Spark 2.4内置数据源Apache Avro

原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4 Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Ka...

开源大数据
12/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

render常用模版 结合iview

表格添加一对按钮 { title: '操作', align: 'center', render: (h, params) => { return h('div', [ h('Button', { props: { ......

Carbenson
20分钟前
1
0
一次生产 CPU 100% 排查优化实践

前言 到了年底果然都不太平,最近又收到了运维报警:表示有些服务器负载非常高,让我们定位问题。 还真是想什么来什么,前些天还故意把某些服务器的负载提高(没错,老板让我写个 BUG!),不...

crossoverJie
28分钟前
8
0
Spring Cloud Alibaba Sentinel 整合 Feign 的设计实现

作者 | Spring Cloud Alibaba 高级开发工程师洛夜 来自公众号阿里巴巴中间件投稿 前段时间 Hystrix 宣布不再维护之后(Hystrix 停止开发。。。Spring Cloud 何去何从?),Feign 作为一个跟 ...

Java技术栈
44分钟前
9
0
虚拟机加密

在超融合的基础设施和虚拟化成为常态的世界里,对加密的要求越来越高,越来越迫切,IT部门需考虑的重大安全问题和方法也浮现了出来。 物理数据中心时代,采取双保险式数据安全方法是相对简单...

linuxCool
47分钟前
2
0
MySQL 主从同步

MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主从后,在A上写数据,另外一台B也会跟着写数据,两者数据实时同步的 MySQL主从是基于binlog的,主上须开启bin...

野雪球
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部