Apache Beam 的Spark Streaming 示例

原创
2017/04/01 10:27
阅读数 1.6K

应网友要求,需要使用Beam,底层使用Spark 接受 Kafka 流数据。

我看了下源码的实现,这里只是给出Demo,没有做测试,具体执行,自行做测试使用,如果有什么问题,可以留言交流。

package com.ryan.beam.spark.stream;

import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.StorageLevels;
import org.joda.time.Instant;

/**
 * <pre>
 * User:        Ryan
 * Date:        2017/3/6
 * Email:       liuwei412552703@163.com
 * Version      V1.0
 * Discription:
 */
public class BeamKafkaWordCount {


    public static void main(String[] args) {
        SparkPipelineOptions sparkPipelineOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        sparkPipelineOptions.setSparkMaster("local[4]");
        sparkPipelineOptions.setJobName("kafkaWordCount");
        sparkPipelineOptions.setAppName("kafkaWordCount");
        sparkPipelineOptions.setStorageLevel(StorageLevels.MEMORY_AND_DISK.toString());
        sparkPipelineOptions.setEnableSparkMetricSinks(true);
        sparkPipelineOptions.setBatchIntervalMillis(10L);
        sparkPipelineOptions.setMaxRecordsPerBatch(500L);


        Pipeline pipeline = Pipeline.create(sparkPipelineOptions);


        /**
         * Beam Spark Kafka 数据
         * 接受Kafka 数据
         *
         */
        ImmutableMap<String, Object> immutableMap = ImmutableMap.<String, Object>of("receive.buffer.bytes", 1024 * 1024);

        pipeline.apply(KafkaIO.read().withBootstrapServers("192.168.1.102:9092,192.168.1.102:9092")
                .withTopics(ImmutableList.of("topic_a", "topic_b"))
                // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>

                // rest of the settings are optional :

                // set a Coder for Key and Value (note the change to return type)
                .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
                .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>

                // you can further customize KafkaConsumer used to read the records by adding more
                // settings for ConsumerConfig. e.g :
                .updateConsumerProperties(immutableMap)

                // custom function for calculating record timestamp (default is processing time)
                .withTimestampFn(new SerializableFunction<KV<Long, String>, Instant>() {
                    @Override
                    public Instant apply(KV<Long, String> input) {

                        return null;
                    }
                })

                // custom function for watermark (default is record timestamp)
                .withWatermarkFn(new SerializableFunction<KV<Long, String>, Instant>() {
                    @Override
                    public Instant apply(KV<Long, String> input) {
                        return null;
                    }
                })

                // finally, if you don't need Kafka metadata, you can drop it
                // PCollection<KV<Long, String>>
                .withoutMetadata())
                // PCollection<String>
                .apply(Values.<String>create());


    }
}

 

Beam 是为了统一大数据处理的 组件,目前还处于孵化期,不建议生产使用!

 

 

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部