文档章节

SPARK 本地模式运行

吹比龙
 吹比龙
发布于 2017/04/10 17:12
字数 474
阅读 25
收藏 0

之前搞过STORM知道本地模式非常的方便,特意查询学习SPARK本地DEBUG模式开发

Pom.xml

        <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>

2017年4月10日最新Spark

Java示例代码

经典WorldCount

参考文档:http://blog.csdn.net/xsdxs/article/details/52203922

package com.chuibilong.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName(
                "wordCountTest");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> list = new ArrayList<String>();
        list.add("1 1 2 a b");
        list.add("a b 1 2 3");
        JavaRDD<String> RddList = sc.parallelize(list);
        // 先切分为单词,扁平化处理
        JavaRDD<String> flatMapRdd = RddList
                .flatMap(new FlatMapFunction<String, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<String> call(String str) {
                        System.out.println(str);
                        return Arrays.asList(str.split(" ")).iterator();
                    }
                });
        // 再转化为键值对
        JavaPairRDD<String, Integer> pairRdd = flatMapRdd
                .mapToPair(new PairFunction<String, String, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    public Tuple2<String, Integer> call(String word)
                        throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        // 对每个词语进行计数
        JavaPairRDD<String, Integer> countRdd = pairRdd
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        System.out.println("结果:" + countRdd.collect());
        sc.close();
    }
}

DAY DAY UP

后面准备写个 SPARK STREAMING 的DEMO(预想是怒Spark 的github)

直接参考:http://blog.csdn.net/jacklin929/article/details/53689365

//注意本地调试,master必须为local[n],n>1,表示一个线程接收数据,n-1个线程处理数据
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置日志运行级别
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//创建一个将要连接到
JavaReceiverInputDStream<String> lines = hostname:port 的离散流
ssc.socketTextStream("master1", 9999); 
JavaPairDStream<String, Integer> counts = 
        lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
        .reduceByKey((x, y) -> x + y);

// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
counts.print();
// 启动计算
ssc.start();
ssc.awaitTermination();

建立服务端 
找台Linux服务器,运行netcat小工具: 
nc -lk 9999 
也就是上面代码里socketTextStream的参数.

© 著作权归作者所有

共有 人打赏支持
下一篇: 重试机制
吹比龙
粉丝 5
博文 119
码字总数 34820
作品 0
合肥
程序员
私信 提问
配置hadoop+pyspark环境

配置hadoop+pyspark环境 1、部署hadoop环境 配置hadoop伪分布式环境,所有服务都运行在同一个节点上。 1.1、安装JDK 安装jdk使用的是二进制免编译包,下载页面 下载jdk 解压文件,配置环境变...

巴利奇
10/30
0
0
Spark On Yarn Cluster 模式下的远程调试Spark源码(Attach模式)

Spark源码学习时,我们通常很想知道Spark-submit提交之后,Spark都做了什么,这就需要我们对Spark源码进行单步调试。另外,我们在spark on yarn模式下,尤其是yarn-cluster模式下,我们无法连...

stefan_xiepj
05/24
0
0
Spark的运行架构分析(一)之架构概述

本博客转载自:https://blog.csdn.net/gamer_gyt/article/details/51822765 1:Spark的运行模式 2:Spark中的一些名词解释 3:Spark的运行基本流程 4:RDD的运行基本流程 一:Spark的运行模式...

lubin2016
04/18
0
0
[Spark]Spark 应用程序部署工具spark-submit

1. 简介 Spark的bin目录中的spark-submit脚本用于启动集群上的应用程序。 可以通过统一的接口使用Spark所有支持的集群管理器,因此不必为每个集群管理器专门配置你的应用程序(It can use al...

sjf0115
2017/02/16
0
0
Spark的运行架构分析(二)之运行模式详解

在上一篇博客 spark的运行架构分析(一)中我们有谈到Spark的运行模式是多种多样的,那么在这篇博客中我们来具体谈谈Spark的运行模式 本博客转载自https://blog.csdn.net/gamer_gyt/article...

lubin2016
04/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

docker部署springboot项目

安装docker 菜鸟教程 springboot项目 maven依赖 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001......

yimingkeji
今天
10
0
ios多个target

1.建立3个target,分别为heroone,heroone test,heroone dev;分别为正式环境,test环境,dev环境 2.注意取消掉autocreate以防止名字不对,分别以Duplicate的方式建立另外两个scheme 3.创建...

HeroHY
今天
6
0
php获取客户端IP

php获取客户端IP 首先先阅读关于IP真实性安全的文章:如何正確的取得使用者 IP? 「任何從客戶端取得的資料都是不可信任的!」 HTTP_CLIENT_IP头是有的,但未成标准,不一定服务器都实现。 ...

DrChenXX
昨天
0
0
. The valid characters are defined in RFC 7230 and RFC 问题

通过这里的回答,我们可以知道: Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。 具体来说,就是添加了些规则去限制HTTP头的规范性 参考这里 具体来说: org.apache.tom...

west_coast
昨天
1
0
刷leetcode第704题-二分查找

今天双十一买的算法书到货了,路上刷到有人说的这个题,借(chao)鉴(xi)一下别人的思路,这个是C++标准库里面的经典方法,思路精巧,优雅好品味 int search(int* nums, int numsSize, in...

锟斤拷烫烫烫
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部