文档章节

SPARK 本地模式运行

吹比龙
 吹比龙
发布于 2017/04/10 17:12
字数 474
阅读 21
收藏 0

之前搞过STORM知道本地模式非常的方便,特意查询学习SPARK本地DEBUG模式开发

Pom.xml

        <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>

2017年4月10日最新Spark

Java示例代码

经典WorldCount

参考文档:http://blog.csdn.net/xsdxs/article/details/52203922

package com.chuibilong.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName(
                "wordCountTest");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> list = new ArrayList<String>();
        list.add("1 1 2 a b");
        list.add("a b 1 2 3");
        JavaRDD<String> RddList = sc.parallelize(list);
        // 先切分为单词,扁平化处理
        JavaRDD<String> flatMapRdd = RddList
                .flatMap(new FlatMapFunction<String, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<String> call(String str) {
                        System.out.println(str);
                        return Arrays.asList(str.split(" ")).iterator();
                    }
                });
        // 再转化为键值对
        JavaPairRDD<String, Integer> pairRdd = flatMapRdd
                .mapToPair(new PairFunction<String, String, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    public Tuple2<String, Integer> call(String word)
                        throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        // 对每个词语进行计数
        JavaPairRDD<String, Integer> countRdd = pairRdd
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        System.out.println("结果:" + countRdd.collect());
        sc.close();
    }
}

DAY DAY UP

后面准备写个 SPARK STREAMING 的DEMO(预想是怒Spark 的github)

直接参考:http://blog.csdn.net/jacklin929/article/details/53689365

//注意本地调试,master必须为local[n],n>1,表示一个线程接收数据,n-1个线程处理数据
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置日志运行级别
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//创建一个将要连接到
JavaReceiverInputDStream<String> lines = hostname:port 的离散流
ssc.socketTextStream("master1", 9999); 
JavaPairDStream<String, Integer> counts = 
        lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
        .reduceByKey((x, y) -> x + y);

// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
counts.print();
// 启动计算
ssc.start();
ssc.awaitTermination();

建立服务端 
找台Linux服务器,运行netcat小工具: 
nc -lk 9999 
也就是上面代码里socketTextStream的参数.

© 著作权归作者所有

共有 人打赏支持
吹比龙
粉丝 2
博文 116
码字总数 34765
作品 0
合肥
程序员
配置hadoop+pyspark环境

配置hadoop+pyspark环境 1、部署hadoop环境 配置hadoop伪分布式环境,所有服务都运行在同一个节点上。 1.1、安装JDK 安装jdk使用的是二进制免编译包,下载页面 下载jdk 解压文件,配置环境变...

巴利奇
06/25
0
0
Spark On Yarn Cluster 模式下的远程调试Spark源码(Attach模式)

Spark源码学习时,我们通常很想知道Spark-submit提交之后,Spark都做了什么,这就需要我们对Spark源码进行单步调试。另外,我们在spark on yarn模式下,尤其是yarn-cluster模式下,我们无法连...

stefan_xiepj
05/24
0
0
Spark的运行架构分析(一)之架构概述

本博客转载自:https://blog.csdn.net/gamer_gyt/article/details/51822765 1:Spark的运行模式 2:Spark中的一些名词解释 3:Spark的运行基本流程 4:RDD的运行基本流程 一:Spark的运行模式...

lubin2016
04/18
0
0
[Spark]Spark 应用程序部署工具spark-submit

1. 简介 Spark的bin目录中的spark-submit脚本用于启动集群上的应用程序。 可以通过统一的接口使用Spark所有支持的集群管理器,因此不必为每个集群管理器专门配置你的应用程序(It can use al...

sjf0115
2017/02/16
0
0
Hadoop CDH5 Spark部署

Spark是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速,Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark ...

China_OS
2014/05/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL数据库集群-PXC方案

网盘下载地址 MySQL数据库集群-PXC方案 PXC是开源的MySQL集群技术,如中国移动、阿里巴巴、腾讯、去哪网等企业均采用或者借鉴了PXC解决方案,可见该方案具有极佳的稳定性。本课程将在Linux环...

qq__2304636824
6分钟前
0
0
vue脚手架搭建项目

npm install -g vue-clivue init webpack my-projectcd my-projectnpm run dev

帝子兮
9分钟前
1
0
es6 字符串拓展方法

es6 include();返回Boolean,该字符串是否包含该字符 startWith() 返回Boolean,该字符串开头是否是该字符 endWith() 返回Boolean,该字符串结尾是否是该字符 repeat() 重复该字符串多少次,...

莫西摩西
9分钟前
0
0
Java语言实现word转PDF(10分钟解决)

前言: 经常做OA办公项目的同学一定和我一样被各种线上的office操作整疯了。基本上涉及到Java操作office的时候就会想到POI和openoffice.这两种方案都是需要找各种jar包,然后用里面繁杂的api。...

山里的红杏
11分钟前
0
0
Flask部分源码阅读

Flask主要依赖于Werkzeug和Jinja这两个库,是很简洁的Python Web框架。 Werkzeug 是一个WSGI的工具包,是Flask的核心库。 Jinja 则是一个模板渲染的库,主要负责渲染返回给客户端的html文件。...

Jian_Ming
14分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部