文档章节

kafka+spark streaming集成

杰仪
 杰仪
发布于 2017/03/21 10:05
字数 392
阅读 49
收藏 0

主要的:pom

        <kafka.version>0.10.2.0</kafka.version>
        <slf4j.version>1.7.10</slf4j.version>
        <spark.version>2.1.0</spark.version>        
...
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

 

主要功能:打印出现ERROR和WARN的log

import com.boyoi.kafka.KafkaParams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;

/**
 * 基础测试
 */
public class Kafka2Error {

    public static void main(String[] avgs) throws URISyntaxException, IOException, InterruptedException {

        Collection<String> topics = Arrays.asList("test");
        JavaSparkContext sc = new JavaSparkContext("local","error");

        JavaStreamingContext jssc = new JavaStreamingContext(sc, Duration.apply(1000));
        final JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, KafkaParams.getSer9Params())
        );

        JavaDStream<String> map = directStream.map(a -> a.value());

        JavaDStream<String> filter = map.filter(s -> {
            if (
                    (
                            s.charAt(24) == '[' &&
                                    s.charAt(25) == ' ' &&
                                    s.charAt(26) == 'W' &&
                                    s.charAt(27) == 'A' &&
                                    s.charAt(28) == 'R' &&
                                    s.charAt(29) == 'N' &&
                                    s.charAt(30) == ']'
                    )
                            ||
                            (
                                    s.charAt(24) == '[' &&
                                            s.charAt(25) == 'E' &&
                                            s.charAt(26) == 'R' &&
                                            s.charAt(27) == 'R' &&
                                            s.charAt(28) == 'O' &&
                                            s.charAt(29) == 'R' &&
                                            s.charAt(30) == ']'
                            )
                    ) {
                return true;
            }
            return false;
        });

        filter.foreachRDD(each -> each.foreach(each2-> System.out.println(each2)));

        jssc.start();
        jssc.awaitTermination();
    }

}

 

kafka默认参数如下。value.deserializer使用了自定义的反序列化类。因源使用的GBK字符集,而kafka默认使用的UTF-8来反序列化。

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka 默认参数
 */
public class KafkaParams {
    private static Map<String, Object> kafkaParams = new HashMap<String, Object>();
    static {
        kafkaParams.put("bootstrap.servers", "192.168.1.9:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializerGBK.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
    }

    public static Map<String, Object> getSer9Params(){
        return kafkaParams;
    }


}

自定义的反序列化类代码。

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.UnsupportedEncodingException;

/**
 * 自定义字符反序列。默认GBK
 */
public class StringDeserializerGBK extends StringDeserializer {

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            return data == null?null:new String(data, "GBK");
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding gbk");
        }
    }
}

 

© 著作权归作者所有

上一篇: storm集群安装
下一篇: kafka+log4j配置
杰仪
粉丝 1
博文 52
码字总数 10231
作品 0
成都
私信 提问
Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像SparkStreaming、SparkSQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁...

openthings
2016/03/11
482
0
Spark 学习资源收集

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

openthings
2016/05/29
201
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5K
0
Structured streaming+kafka集成样例

关于structured streaming, spark社区已经有很多文章介绍,个人认为其中最大的特点是将流视作没有边界的大表,从而能够使用sql来操作这张表,其中包括使用sql join(截止Spark2.1.1,目前只支...

biggeng
2017/07/05
0
0
利用Spark Streaming来实现实时的数据管道服务

现在需要搜集用户的行为记录,之前我们打算采用AWS提供的服务,大致架构是这样的: 建立一个rest来收集来自服务器或者是终端的(从手机端,网页)的数据,之后将这些数据放到 Kinesis Streamin...

AC-carrot
2016/02/26
115
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud 笔记之Spring cloud config client

观察者模式它的数据的变化是被动的。 观察者模式在java中的实现: package com.hxq.springcloud.springcloudconfigclient;import org.springframework.context.ApplicationListener;i...

xiaoxiao_go
今天
4
0
CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
今天
4
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
7
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部