文档章节

Spark 获取文本内最大的前3个数字实例

别寒
 别寒
发布于 2017/07/24 17:19
字数 161
阅读 77
收藏 0
package cn.hhb.spark.core;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * Created by dell on 2017/7/13.
 */
public class Top3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("AccumulatorVariable")
                .setMaster("local")
                .set("spark.testing.memory", "2147480000");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("C://number.log");
        JavaPairRDD<Integer, String> pairs = lines.mapToPair(new PairFunction<String, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(String s) throws Exception {
                return new Tuple2<Integer, String>(Integer.valueOf(s), s);
            }
        });

        JavaPairRDD<Integer, String> sortPairs = pairs.sortByKey(false);
        JavaRDD<Integer> sortNumbers = sortPairs.map(new Function<Tuple2<Integer, String>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, String> v1) throws Exception {
                return v1._1;
            }
        });

        sortNumbers.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer v1) throws Exception {
                System.out.println(v1);
            }
        });

        List<Integer> sortedNumberList = sortNumbers.take(3);
        for (Integer number : sortedNumberList){
            System.out.println(number);
        }

        sc.close();
    }
}

© 著作权归作者所有

别寒
粉丝 30
博文 273
码字总数 155300
作品 0
永州
程序员
私信 提问
[Spark]Spark Streaming 指南一 Example

1. 概述 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等许多源中提取,并且...

sjf0115
2017/03/01
0
0
【Spark】Spark本地运行模式及Standalone运行模式环境搭建

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86584947 软件版本: JDK:1.7.0_67 Scala:2.10.4 Hadoop:2.5.0-cdh5.3.6 ...

魏晓蕾
01/24
0
0
[Spark]Spark Streaming 指南四 输入DStreams和Receivers

1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams。在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入DStream(除 file strea...

sjf0115
2017/03/02
0
0
用Spark解决一些经典MapReduce问题

摘要 Spark是一个Apache项目,它被标榜为“快如闪电的集群计算”。它拥有一个繁荣的开源社区,并且是目前最活跃的Apache项目。Spark提供了一个更快、更通用的数据处理平台。和Hadoop相比,S...

力谱宿云
2016/12/01
507
0
大数据入门与实战-Spark上手

1 Spark简介 1.1 引言 行业正在广泛使用Hadoop来分析他们的数据集。原因是Hadoop框架基于简单的编程模型(MapReduce),它使计算解决方案具有可扩展性,灵活性,容错性和成本效益。在这里,主...

致Great
03/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud 笔记之Spring cloud config client

观察者模式它的数据的变化是被动的。 观察者模式在java中的实现: package com.hxq.springcloud.springcloudconfigclient;import org.springframework.context.ApplicationListener;i...

xiaoxiao_go
昨天
6
0
CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部