文档章节

Spark Streaming(2):Join

Joe_Wu
 Joe_Wu
发布于 2017/08/08 16:48
字数 306
阅读 15
收藏 0
点赞 0
评论 0
package com.pyrrha.examples;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;

import scala.Tuple2;

public class JoinTransformation {
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		join(sc);
		
		sc.close();
	}
	/**
	 * @return:
	    ListA:
		[(name,Jos), (name,Marry), (name1,Tom)]
	    ListB
		[(name,Jos), (name,Black), (name2,David)]
		
		-----------------join-------------------
		[(name,(Jos,Jos)), (name,(Jos,Black)), (name,(Marry,Jos)), (name,(Marry,Black))]
		
		-----------------leftOuterJoin-------------------
		[(name,(Jos,Optional[Jos])), (name,(Jos,Optional[Black])), (name,(Marry,Optional[Jos])), (name,(Marry,Optional[Black])), (name1,(Tom,Optional.empty))]
		
		-----------------rightOuterJoin-------------------
		[(name,(Optional[Jos],Jos)), (name,(Optional[Jos],Black)), (name,(Optional[Marry],Jos)), (name,(Optional[Marry],Black)), (name2,(Optional.empty,David))]
		
		-----------------fullOuterJoin-------------------
		[(name,(Optional[Jos],Optional[Jos])), (name,(Optional[Jos],Optional[Black])), (name,(Optional[Marry],Optional[Jos])), (name,(Optional[Marry],Optional[Black])), (name2,(Optional.empty,Optional[David])), (name1,(Optional[Tom],Optional.empty))]
	 * @param sc
	 */
	public static void join(JavaSparkContext sc) {
		List<Tuple2<String, String>> listA = new ArrayList<Tuple2<String, String>>();
		listA.add(new Tuple2<String, String>("name","Jos"));
		listA.add(new Tuple2<String, String>("name","Marry"));
		listA.add(new Tuple2<String, String>("name1","Tom"));
		List<Tuple2<String, String>> listB = new ArrayList<Tuple2<String, String>>();
		listB.add(new Tuple2<String, String>("name","Jos"));
		listB.add(new Tuple2<String, String>("name","Black"));
		listB.add(new Tuple2<String, String>("name2","David"));
		
		JavaPairRDD<String, String> RDDA = sc.parallelizePairs(listA);
		JavaPairRDD<String, String> RDDB = sc.parallelizePairs(listB);
		System.out.println("-----------------data-------------------");
		System.out.println("ListA:");
		System.out.println(RDDA.collect());
		System.out.println("\n");
		System.out.println("ListB");
		System.out.println(RDDB.collect());
		System.out.println("\n");
		
		
		JavaPairRDD<String, Tuple2<String, String>> RDDC = RDDA.join(RDDB);
		System.out.println("-----------------join-------------------");
		System.out.println(RDDC.collect());
		System.out.println("\n");
		
		JavaPairRDD<String, Tuple2<String, Optional<String>>> RDDD = RDDA.leftOuterJoin(RDDB);
		System.out.println("-----------------leftOuterJoin-------------------");
		System.out.println(RDDD.collect());
		System.out.println("\n");
		
		JavaPairRDD<String, Tuple2<Optional<String>, String>> RDDE = RDDA.rightOuterJoin(RDDB);
		System.out.println("-----------------rightOuterJoin-------------------");
		System.out.println(RDDE.collect());
		System.out.println("\n");
		
		JavaPairRDD<String, Tuple2<Optional<String>, Optional<String>>> RDDF = RDDA.fullOuterJoin(RDDB);
		System.out.println("-----------------fullOuterJoin-------------------");
		System.out.println(RDDF.collect());
		System.out.println("\n");
		
	}
	
	
	

}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836 ⋅ 04/12 ⋅ 0

Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer ⋅ 05/24 ⋅ 0

Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区 ⋅ 05/16 ⋅ 0

spark和hive storm mapreduce的比较

Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一 就是,Spank Streaming和Stom的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将...

necther ⋅ 04/28 ⋅ 0

Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名 ⋅ 04/29 ⋅ 0

Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf ⋅ 05/12 ⋅ 0

教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据 ⋅ 05/20 ⋅ 0

Spark及Spark Streaming核心原理及实践

  【IT168 技术】Spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,...

中国大数据 ⋅ 05/31 ⋅ 0

Spark Streaming 反压(Back Pressure)机制介绍

文章目录 1 背景 2 反压机制 3 Spark Streaming 反压机制的使用 背景 在默认情况下,Spark Streaming 通过 receivers (或者是 Direct 方式) 以生产者生产数据的速率接收数据。当 batch proc...

Spark ⋅ 05/28 ⋅ 0

Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细...

cql252283126 ⋅ 04/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

收集自网络的wordpress 分页导航的代码教程(全网最全版)

wordpress 分页导航是用来切换文章的一个功能,添加了 wordpress 分页导航后,用户即可自由到达指定的页面数浏览分类文章,而这样的一个很简单功能却有很多朋友在用插件:WP-PageNavi,插件的...

Rhymo-Wu ⋅ 37分钟前 ⋅ 0

微服务 WildFly Swarm 入门

Hello World 就像前面章节中的其他框架一样,我们希望添加一些基本的 Hello-world 功能,然后在其上逐步添加更多的功能。让我们从在我们的项目中创建一个 HolaResources 开始。您可以使用您的...

woshixin ⋅ 44分钟前 ⋅ 0

Maven的安装和Eclipse的配置

1. 下载Maven 下载地址 2. 解压压缩包,放到自己习惯的硬盘中 此处我将其放到了 D:\Tools 目录下。 3. 配置环境变量 右键此电脑 -> 属性 -> 高级系统设置 -> 环境变量。 在系统变量中新建,变...

影狼 ⋅ 52分钟前 ⋅ 0

python pip使用国内镜像的方法

国内源 清华:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/ 华中理工大学:http://......

良言 ⋅ 52分钟前 ⋅ 0

对于url变化的spa应该如何使用微信jssdk

使用vue单页面碰上微信jssdk config验证失败的坑。第一次成功 之后切换页面全部失败,找到了解决方法,第一次验证成功后保存验证信息 切换页面时验证信息直接拿来用,加一个wx.error() 失败时...

孙冠峰 ⋅ 57分钟前 ⋅ 0

Spring Cloud Gateway 一般集成

SCF发布,带来很多新东西,不过少了点教程,打开方式又和以前的不一样,比如这个SCG,压根就没有入门指导,所以这里写一个,以备后用。 一、集成 pom.xml <dependency> <groupI...

kut ⋅ 今天 ⋅ 0

建造模式

《JAVA与模式》之建造模式

Cobbage ⋅ 今天 ⋅ 0

WePY框架开发的小程序如何在微信web开发者工具中运行起来

一、首先需要安装node.js,安装步骤如下: 首先下载安装包 https://nodejs.org/en/download/ 点击下载相应的zip版本 然后将文件夹解压到任意目录 比如我这里解压到了:C:\Program Files\node...

Helios51 ⋅ 今天 ⋅ 0

使用EnumSet 代替位域(32)

1、位域(Bit field):使用or 运算将几个常量合并到一个集合中 位操作,可以有效地执行 AND 、OR 这样的位操作 但是 位域比int 常量枚举缺点更多 2、java.util 包里面的EnumSet 类是有效的替...

职业搬砖20年 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部