文档章节

spark streming写入外部系统

 张欢19933
发布于 2017/06/22 18:25
字数 749
阅读 32
收藏 0

SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统。然而,重要的是要了解如何正确有效地使用这种原始方法。一些常见的错误,以避免如下: 
写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统。为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据。代码如下:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }}

上面的代码是一个错误的演示,因为连接是在Driver中创建的,而写数据是在worker中完成的。此时连接就需要被序列化然后发送到worker中。但是我们知道,连接的信息是不能被序列化和反序列化的(不同的机器连接服务器需要使用不同的服务器端口,即便连接被序列化了也不能使用)

进而我们可以将连接移动到worker中实现,代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}

但是此时,每处理一条数据记录,就需要连接一次外部系统,对于性能来说是个严重的问题。这也不是一个完美的实现。

Spark基于RDD进行编程,RDD的数据不能改变,如果擅长foreachPartition底层的数据可能改变,做到的方式foreachPartition操作一个数据结构,RDD里面一条条数据,但是一条条的记录是可以改变的spark也可以运行在动态数据源上。(就像数组的数据不变,但是指向的索引可以改变) 
我们可以将代码做如下的改进:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}

这样一个partition,只需连接一次外部存储。性能上有大幅度的提高。但是不同的partition之间不能复用连接。我们可以使用连接池的方式,使得partition之间可以共享连接。代码如下:

stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }}

1、需要注意的是: 
(1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。 
(2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。 
(3)、在插入MySQL的时候最好使用批量插入。 
(4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。 
(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

© 著作权归作者所有

共有 人打赏支持
粉丝 32
博文 453
码字总数 224021
作品 0
海淀
Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区
05/16
0
0
[Kafka与Spark集成系列三] Spark编程模型

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/82082109 在Spark中,我们通过对分布式数据集的操作来表达我们的计算意图,...

朱小厮
08/26
0
0
Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0
04/23
0
0
Spark的运行架构分析(一)之架构概述

本博客转载自:https://blog.csdn.net/gamer_gyt/article/details/51822765 1:Spark的运行模式 2:Spark中的一些名词解释 3:Spark的运行基本流程 4:RDD的运行基本流程 一:Spark的运行模式...

lubin2016
04/18
0
0
Spark2.x写入Elasticsearch的性能测试

一、Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有: 1. 优秀的全文检索能力 2. 高效的列式存储与查询能力 3. 数据分布式存储(Shard 分片) 相...

openfea
2017/10/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

职场老人谈:Linux学习分享

随着Linux系统慢慢渗透到生活的方方面面,对Linux的基本操作就越来越必要了。 我是一位软件测试工程师,初次接触Linux主要是因为需要配置Linux服务器。因为Linux系统是一套开源的操作系统,安...

linux-tao
33分钟前
1
0
Oracle chr() ascii()

函数简介 实用函数 chr() 和 ascii() chr() 函数将ASCII码转换为字符: ASCII码 –> 字符; ascii() 函数将字符转换为ASCII码: 字符 –> ASCII码; 在 Oracle 中 chr() 函数和 ascii() 是一对...

taadis
35分钟前
2
0
职场老人谈:Linux学习分享

随着Linux系统慢慢渗透到生活的方方面面,对Linux的基本操作就越来越必要了。 我是一位软件测试工程师,初次接触Linux主要是因为需要配置Linux服务器。因为Linux系统是一套开源的操作系统,安...

linuxprobe16
44分钟前
1
0
Confluence 6 Windows 中以服务方式自动重启的原因

针对长时间使用的 Confluence,我们推荐你配置 Confluence 自动随操作系统重启而启动。针对一些 Windows 的服务器,这意味着需要让 Confluence 以服务的方式运行。 有下面 2 种方式来以服务的...

honeymose
今天
4
0
day93-20180920-英语流利阅读-待学习

时尚之觞:外表光鲜靓丽,其实穷得要命 Lala 2018-09-20 1.今日导读 讲到时尚界,我们脑海里浮现的可能都是模特和设计师光鲜靓丽、从容潇洒的模样。可是,最近在法国出版的一本书却颠覆了我们...

飞鱼说编程
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部