SPARK 本地模式运行
SPARK 本地模式运行
吹比龙 发表于7个月前
SPARK 本地模式运行
  • 发表于 7个月前
  • 阅读 9
  • 收藏 0
  • 点赞 0
  • 评论 0

之前搞过STORM知道本地模式非常的方便,特意查询学习SPARK本地DEBUG模式开发

Pom.xml

        <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>

2017年4月10日最新Spark

Java示例代码

经典WorldCount

参考文档:http://blog.csdn.net/xsdxs/article/details/52203922

package com.chuibilong.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName(
                "wordCountTest");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> list = new ArrayList<String>();
        list.add("1 1 2 a b");
        list.add("a b 1 2 3");
        JavaRDD<String> RddList = sc.parallelize(list);
        // 先切分为单词,扁平化处理
        JavaRDD<String> flatMapRdd = RddList
                .flatMap(new FlatMapFunction<String, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<String> call(String str) {
                        System.out.println(str);
                        return Arrays.asList(str.split(" ")).iterator();
                    }
                });
        // 再转化为键值对
        JavaPairRDD<String, Integer> pairRdd = flatMapRdd
                .mapToPair(new PairFunction<String, String, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    public Tuple2<String, Integer> call(String word)
                        throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        // 对每个词语进行计数
        JavaPairRDD<String, Integer> countRdd = pairRdd
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        System.out.println("结果:" + countRdd.collect());
        sc.close();
    }
}

DAY DAY UP

后面准备写个 SPARK STREAMING 的DEMO(预想是怒Spark 的github)

直接参考:http://blog.csdn.net/jacklin929/article/details/53689365

//注意本地调试,master必须为local[n],n>1,表示一个线程接收数据,n-1个线程处理数据
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置日志运行级别
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//创建一个将要连接到
JavaReceiverInputDStream<String> lines = hostname:port 的离散流
ssc.socketTextStream("master1", 9999); 
JavaPairDStream<String, Integer> counts = 
        lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
        .reduceByKey((x, y) -> x + y);

// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
counts.print();
// 启动计算
ssc.start();
ssc.awaitTermination();

建立服务端 
找台Linux服务器,运行netcat小工具: 
nc -lk 9999 
也就是上面代码里socketTextStream的参数.

标签: Spark
共有 人打赏支持
粉丝 3
博文 102
码字总数 28674
×
吹比龙
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: