文档章节

Ignite集成Spark之IgniteDataFrames

李玉珏
 李玉珏
发布于 2018/10/08 15:38
字数 1305
阅读 408
收藏 1

本系列共两篇文章,主要探讨如何将Ignite和Spark进行集成。

下面简要地回顾一下在第一篇文章中所谈到的内容。

Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。

Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。

因此,这两种技术是可以互补的。

将Ignite与Spark整合

整合这两种技术会为Spark用户带来若干明显的好处:

  • 通过避免大量的数据移动,获得真正可扩展的内存级性能;
  • 提高RDD、DataFrame和SQL的性能;
  • 在Spark作业之间更方便地共享状态和数据。

下图中显示了如何整合这两种技术,并且标注了显著的优势:

第一篇文章中,主要聚焦于IgniteRDD,而本文会聚焦于IgniteDataFrames。

IgniteDataframes

Spark的DataFrame API为描述数据引入了模式的概念,Spark通过表格的形式进行模式的管理和数据的组织。

DataFrame是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame等同于关系数据库中的表,并允许Spark使用Catalyst查询优化器来生成高效的查询执行计划。而RDD只是跨集群节点分区化的元素集合。

Ignite扩展了DataFrames,简化了开发,改进了将Ignite作为Spark的内存存储时的数据访问时间,好处包括:

  • 通过Ignite读写DataFrames时,可以在Spark作业之间共享数据和状态;
  • 通过优化Spark的查询执行计划加快SparkSQL查询,这些主要是通过IgniteSQL引擎的高级索引以及避免了Ignite和Spark之间的网络数据移动实现的。

IgniteDataframes示例

下面通过一些代码以及搭建几个小程序的方式,了解Ignite DataFrames如何使用,如果想实际运行这些代码,可以从GitHub上下载。

一共会写两个Java的小应用,然后在IDE中运行,还会在这些Java应用中执行一些SQL。

一个Java应用会从JSON文件中读取一些数据,然后创建一个存储于Ignite的DataFrame,这个JSON文件Ignite的发行版中已经提供,另一个Java应用会从Ignite的DataFrame中读取数据然后使用SQL进行查询。

下面是写应用的代码:

public class DFWriter {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {
        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession
                .builder()
                .appName("DFWriter")
                .master("local")
                .config("spark.executor.instances", "2")
                .getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        Dataset<Row> peopleDF = spark.read().json(
                resolveIgnitePath("resources/people.json").getAbsolutePath());

        System.out.println("JSON file contents:");

        peopleDF.show();

        System.out.println("Writing DataFrame to Ignite.");

        peopleDF.write()
                .format(IgniteDataFrameSettings.FORMAT_IGNITE())
                .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
                .option(IgniteDataFrameSettings.OPTION_TABLE(), "people")
                .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
                .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
                .save();

        System.out.println("Done!");

        Ignition.stop(false);
    }
}

DFWriter中,首先创建了SparkSession,它包含了应用名,之后会使用spark.read().json()读取JSON文件并且输出文件内容,下一步是将数据写入Ignite存储。下面是DFReader的代码:

public class DFReader {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {
        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession
                .builder()
                .appName("DFReader")
                .master("local")
                .config("spark.executor.instances", "2")
                .getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        System.out.println("Reading data from Ignite table.");

        Dataset<Row> peopleDF = spark.read()
                .format(IgniteDataFrameSettings.FORMAT_IGNITE())
                .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
                .option(IgniteDataFrameSettings.OPTION_TABLE(), "people")
                .load();

        peopleDF.createOrReplaceTempView("people");

        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");
        sqlDF.show();

        System.out.println("Done!");

        Ignition.stop(false);
    }
}

DFReader中,初始化和配置与DFWriter相同,这个应用会执行一些过滤,需求是查找所有的id > 0 以及 < 6的人,然后输出结果。

在IDE中,通过下面的代码可以启动一个Ignite节点:

public class ExampleNodeStartup {
    public static void main(String[] args) throws IgniteException {
        Ignition.start("config/example-ignite.xml");
    }
}

到此,就可以对代码进行测试了。

运行应用

首先在IDE中启动一个Ignite节点,然后运行DFWriter应用,输出如下:

JSON file contents:
+-------------------+---+------------------+
|         department| id|              name|
+-------------------+---+------------------+
|Executive Committee|  1|       Ivan Ivanov|
|Executive Committee|  2|       Petr Petrov|
|         Production|  3|          John Doe|
|         Production|  4|         Ann Smith|
|         Accounting|  5|    Sergey Smirnov|
|         Accounting|  6|Alexandra Sergeeva|
|                 IT|  7|         Adam West|
|        Head Office|  8|    Beverley Chase|
|        Head Office|  9|      Igor Rozhkov|
|                 IT| 10|Anastasia Borisova|
+-------------------+---+------------------+

Writing DataFrame to Ignite.
Done!

如果将上面的结果与JSON文件的内容进行对比,会显示两者是一致的,这也是期望的结果。

下一步会运行DFReader,输出如下:

Reading data from Ignite table.
+-------------------+--------------+---+
|         DEPARTMENT|          NAME| ID|
+-------------------+--------------+---+
|Executive Committee|   Ivan Ivanov|  1|
|Executive Committee|   Petr Petrov|  2|
|         Production|      John Doe|  3|
|         Production|     Ann Smith|  4|
|         Accounting|Sergey Smirnov|  5|
+-------------------+--------------+---+

Done!

这也是期望的输出。

总结

通过本文,会发现使用Ignite DataFrames是如何简单,这样就可以通过Ignite DataFrame进行数据的读写了。

未来,这些代码示例也会作为Ignite发行版的一部分进行发布。

关于Ignite和Spark的集成,内容就是这些了。

© 著作权归作者所有

李玉珏

李玉珏

粉丝 387
博文 79
码字总数 149758
作品 0
沈阳
架构师
私信 提问
全面对比,深度解析 Ignite 与 Spark

经常有人拿 Ignite 和 Spark 进行比较,然后搞不清两者的区别和联系。Ignite 和 Spark,如果笼统归类,都可以归于内存计算平台,然而两者功能上虽然有交集,并且 Ignite 也会对 Spark 进行支...

编辑部的故事
2018/09/13
3.6K
6
Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

在前面的文章中,我们分别介绍了 Ignite 和 Spark 这两种技术,从功能上对两者进行了全面深入的对比。经过分析,可以得出这样一个结论:两者都很强大,但是差别很大,定位不同,因此会有不同...

编辑部的故事
2018/12/27
2.3K
1
Apache Ignite 2.5.0 版本发布,紧急问题修复

特性和改进 Apache Ignite Linux packages 2.6 update [#IGNITE-8807] Windows WSL configuration has to be added to Ignite configs [#IGNITE-8804] Support reuse of already initialized......

李玉珏
2018/07/19
3
0
Apache Ignite 1.9.0 发布,内存数据组织平台

Apache Ignite 内存数组组织框架是一个高性能、集成和分布式的内存计算和事务平台,用于大规模的数据集处理。Ignite 为应用和不同的数据源之间提供一个高性能、分布式内存中数据组织管理的框...

王练
2017/03/07
1K
2
Ignite集成Spark之IgniteRDD

本系列共两篇文章,会探讨如何将Ignite和Spark进行集成。 Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。 ...

李玉珏
2018/09/13
463
1

没有更多内容

加载失败,请刷新页面

加载更多

DDD(十)--仓储

1、引言 DDD中的Repository(仓储):协调领域和数据映射层,利用类似与集合的接口来访问领域对象。——《领域驱动设计-软件核心复杂性应对之道》 仓储是DDD中产生的概念,也就是说,如果应...

MrYuZixian
28分钟前
8
0
Jenkins的多种迁移方法

说明 Jenkins有时需要进行迁移,主目录会发生改变,本文主要讲解如何更改主目录。由于jenkins安装方式的不同,主目录也不一样。 本测试环境:Centos7.6 X64。注意:在更改主目录之前,请一定...

Elson
29分钟前
8
0
好程序员web前端教程分享前端javascript练习题三

好程序员web前端教程分享前端javascript练习题三,cookie 一周内免登录 样式代码: <form action=""> 姓名:<input type="text" id="usename"/><br /> 密码:<input type="text" i="mima"/>......

好程序员官网
48分钟前
8
0
Table 信息转成pojo属性

import com.google.common.base.CaseFormat;import java.sql.*;/** * @author: liyhu * @date: 2019/11/22 */public class TableToPojo { static String url="jdbc:mys......

暗中观察
今天
10
0
Access数据库-C#操作类

//Access数据库-C# 操作类 代码using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Data.OleDb;using System.Data;namespace XXX{......

芳缘
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部