文档章节

Flume-ng 配置channel轮询负载均衡

AlexPeng
 AlexPeng
发布于 2017/04/06 20:41
字数 1757
阅读 359
收藏 8
点赞 0
评论 0

修改之前通过源码分析source在选择channel处理的核心类: org.apache.flume.channel.ChannelProcessor

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");

    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch ) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

这个方法中 selector 属性决定channel的选择器,该属性在配置中

自定义agent名称.sources.自定义source源名称.selector.type

selector.getRequiredChannels 该方法来决定获取到的需要接受事件的channel,selector.getOptionalChannels 该方法来决定optional规则来选取channel进行发送事件,然而要实现负载均衡的方式来处理source发过来的事件,这个地方可以配置我们自定义的 selector 的规则类来改变,自己新增LbChannelSelector 类重写MultiplexingChannelSelector类(ps:optional规则来选取channel的flume中默认的处理),增加selector的一个属性polling来配置需要负载均衡的channel名称,在配置中加入:

自定义agent名称.sources.自定义source源名称.selector.polling = ch1 ch2 ch3

后面的ch1 ch2 ch3 是配置好的channel 的名称

并修改getRequiredChannels 方法,在其中加入对polling属性的代码支持,使得返回结果是支持轮询方式得到source需要发送事件的channel,整个负载均衡的如下代码:


import java.util.*;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.AbstractChannelSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The type Load balance channel selector.
 *
 * @author pengming  
 * @date 2016年11月22日 16:38:49
 * @description
 */
public class LbChannelSelector extends AbstractChannelSelector {

    /**
     * The constant CONFIG_MULTIPLEX_HEADER_NAME.
     */
    public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header";
    /**
     * The constant DEFAULT_MULTIPLEX_HEADER.
     */
    public static final String DEFAULT_MULTIPLEX_HEADER = "flume.selector.header";
    /**
     * The constant CONFIG_PREFIX_MAPPING.
     */
    public static final String CONFIG_PREFIX_MAPPING = "mapping.";

    /**
     * The constant CONFIG_DEFAULT_CHANNEL.
     */
    public static final String CONFIG_DEFAULT_CHANNEL = "default";

    public static final String CONFIG_PREFIX_POLLING = "polling";
    public static final String CONFIG_PREFIX_COPY = "copy";
    /**
     * The constant CONFIG_PREFIX_OPTIONAL.
     */
    public static final String CONFIG_PREFIX_OPTIONAL = "optional";

    @SuppressWarnings("unused")
    private static final Logger LOG = LoggerFactory.getLogger(LbChannelSelector.class);

    private static final List<Channel> EMPTY_LIST = Collections.emptyList();

    private String headerName;

    private Map<String, List<Channel>> channelMapping;
    private Map<String, List<Channel>> optionalChannels;
    private List<Channel> defaultChannels;


    /** 复制事件到对应的channels中 */
    private List<Channel> copyChannels;

    /** 轮询loadBalanceChannels 的下标 */
    private volatile int index = 0;

    /** 轮询负载均衡到该list 中的 channel */
    private List<Channel> loadBalanceChannels;


    @Override
    public List<Channel> getRequiredChannels(Event event) {
        /** 根据header 配置值来找到对应处理的 channel */
        String headerValue = event.getHeaders().get(headerName);
        if (StringUtils.isNotEmpty(StringUtils.trim(headerValue))) {
            List<Channel> channels = channelMapping.get(headerValue);
            if (CollectionUtils.isNotEmpty(channels)) {
                return channels;
            }
        }

        /** 复制 */
        if (CollectionUtils.isNotEmpty(copyChannels)) {
            return copyChannels;
        }

        /** 轮询选择 channel */
        if (CollectionUtils.isNotEmpty(loadBalanceChannels)) {
            return getLbChannels();
        }

        return defaultChannels;
    }

    public synchronized List<Channel> getLbChannels() {
        List<Channel> channels = new ArrayList<>(1);
        channels.add(loadBalanceChannels.get(index));
        if (index++ >= (loadBalanceChannels.size() - 1)) {
            index = 0;
        }
        return channels;
    }


    @Override
    public List<Channel> getOptionalChannels(Event event) {
        String hdr = event.getHeaders().get(headerName);
        List<Channel> channels = optionalChannels.get(hdr);

        if (channels == null) {
            channels = EMPTY_LIST;
        }
        return channels;
    }



    @Override
    public void configure(Context context) {
        Map<String, Channel> channelNameMap = getChannelNameMap();
        /** 设置header */
        this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER);
        /** 默认处理事件的channel */
        defaultChannels = getChannelListFromNames(context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);
        LOG.info("默认处理 defaultChannelsSize: " + defaultChannels.size());
        if (CollectionUtils.isEmpty(defaultChannels)) {
            defaultChannels = EMPTY_LIST;
        }

        /** 配置负载均衡,轮询的channel */
        loadBalanceChannels = getChannelListFromNames(context.getString(CONFIG_PREFIX_POLLING), channelNameMap);
        LOG.info("轮询负载 loadBalanceSize: " + loadBalanceChannels.size());
        if (CollectionUtils.isEmpty(loadBalanceChannels)) {
            loadBalanceChannels = EMPTY_LIST;
        }

        /** 复制事件到对应的channel 中 */
        copyChannels = getChannelListFromNames(context.getString(CONFIG_PREFIX_COPY), channelNameMap);
        LOG.info("复制处理 copyChannelsSize: " + copyChannels.size());
        if (CollectionUtils.isEmpty(copyChannels)) {
            copyChannels = EMPTY_LIST;
        }

        /** 设置header 值对应的 channel 映射处理 */
        Map<String, String> mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING);
        channelMapping = new HashMap<String, List<Channel>>();
        for (String headerValue : mapConfig.keySet()) {
            List<Channel> configuredChannels = getChannelListFromNames(mapConfig.get(headerValue), channelNameMap);

            //This should not go to default channel(s)
            //because this seems to be a bad way to configure.
            if (configuredChannels.size() == 0) {
                throw new FlumeException("No channel configured for when " + "header value is: " + headerValue);
            }

            if (channelMapping.put(headerValue, configuredChannels) != null) {
                throw new FlumeException("Selector channel configured twice");
            }
        }
        //If no mapping is configured, it is ok.
        //All events will go to the default channel(s).


        /** 配置 其他规则, 如果与默认配置channel冲突则删除, 发送到对应的channel 与 ChannelProcessor 处理相关 */
        Map<String, String> optionalChannelsMapping = context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");

        optionalChannels = new HashMap<String, List<Channel>>();
        for (String hdr : optionalChannelsMapping.keySet()) {
            List<Channel> confChannels = getChannelListFromNames(optionalChannelsMapping.get(hdr), channelNameMap);
            if (confChannels.isEmpty()) {
                confChannels = EMPTY_LIST;
            }
            //Remove channels from optional channels, which are already
            //configured to be required channels.

            List<Channel> reqdChannels = channelMapping.get(hdr);
            //Check if there are required channels, else defaults to default channels
            if (CollectionUtils.isEmpty(reqdChannels)) {
                reqdChannels = defaultChannels;
            }
            for (Channel c : reqdChannels) {
                if (confChannels.contains(c)) {
                    confChannels.remove(c);
                }
            }
            if (optionalChannels.put(hdr, confChannels) != null) {
                throw new FlumeException("Selector channel configured twice");
            }
        }

    }

}

整体的flume配置:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = src1
agent.channels = ch1 ch2 ch3
agent.sinks = sink1 sink2 sink3

# For each one of the sources, the type is defined
agent.sources.src1.type=com.flume.source.HTTPSource
#dev
#agent.sources.src1.bind=10.10.10.79
#test
agent.sources.src1.bind=10.20.22.223
#agent.sources.src1.bind=10.20.22.225
agent.sources.src1.port=8990

# The channel can be defined as follows.
agent.sources.src1.channels = ch1 ch2 ch3
agent.sources.src1.selector.type = com.flume.source.LbChannelSelector
agent.sources.src1.selector.default = ch1
agent.sources.src1.selector.polling = ch1 ch2 ch3

agent.sources.src1.handler = com.flume.source.JsonHandler
#agent.sources.src1.batchSize = 1000
agent.sources.src1.keep-alive = 3


# Each sink's type must be defined
#agent.sinks.sink1.type = logger
#agent.sinks.sink2.type = logger


#Specify the channel the sink should use
agent.sinks.sink1.channel = ch1
agent.sinks.sink1.type = com.flume.sink.KafkaSink
agent.sinks.sink1.useFlumeEventFormat = true
agent.sinks.sink1.kafka.topic = dev_tag
agent.sinks.sink1.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092
agent.sinks.sink1.flumeBatchSize = 1000
agent.sinks.sink1.kafka.producer.acks = 1
agent.sinks.sink1.kafka.producer.linger.ms = 1
agent.sinks.sink1.kafka.producer.compression.type = snappy

agent.sinks.sink2.channel = ch2
agent.sinks.sink2.type = com.flume.sink.KafkaSink
agent.sinks.sink2.useFlumeEventFormat = true
agent.sinks.sink2.kafka.topic = dev_tag
agent.sinks.sink2.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092
agent.sinks.sink2.flumeBatchSize = 1000
agent.sinks.sink2.kafka.producer.acks = 1
agent.sinks.sink2.kafka.producer.linger.ms = 1
agent.sinks.sink2.kafka.producer.compression.type = snappy

agent.sinks.sink3.channel = ch3
agent.sinks.sink3.type = com.flume.sink.KafkaSink
agent.sinks.sink3.useFlumeEventFormat = true
agent.sinks.sink3.kafka.topic = dev_tag
agent.sinks.sink3.kafka.bootstrap.servers = wx-kafka-03:9092,wx-kafka-04:9092,wx-kafka-05:9092,wx-kafka-06:9092,wx-kafka-07:9092,wx-kafka-08:9092
agent.sinks.sink3.flumeBatchSize = 1000
agent.sinks.sink3.kafka.producer.acks = 1
agent.sinks.sink3.kafka.producer.linger.ms = 1
agent.sinks.sink3.kafka.producer.compression.type = snappy


#agent.sinkgroups = g1
#agent.sinkgroups.g1.sinks = sink1 sink2 sink3
#agent.sinkgroups.g1.processor.type = load_balance
#agent.sinkgroups.g1.processor.backoff=true
#agent.sinkgroups.g1.processor.selector=round_robin
#agent.sinkgroups.g1.processor.priority.sink1 = 9
#agent.sinkgroups.g1.processor.priority.sink2 = 7
#agent.sinkgroups.g1.processor.priority.sink3 = 8
#agent.sinkgroups.g1.processor.maxpenalty = 10000

# Each channel's type is defined.
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 100000
agent.channels.ch1.transactionCapacity = 10000

agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 100000
agent.channels.ch2.transactionCapacity = 10000

agent.channels.ch3.type = memory
agent.channels.ch3.capacity = 100000
agent.channels.ch3.transactionCapacity = 10000


其中的配置的kafka,请自行修改成为自己的集群地址,我这里版本是flume1.6,因为公司kafka版本比较低,所以需要自己加入kafka的client的jar包,flume1.7不需要增加,请自行注释掉pom中kafka的client的jar包,完整代码地址:https://github.com/mingpeng2live/flume-plugs   整个工程是flume的插件直接maven打包会有一个 -bin的文件,将其copy到flume根目录直接解压,启动请参考相关的文章:https://my.oschina.net/mingpeng/blog/861372  的摘要说明去修改service.sh脚本中的flume启动命令参数。

© 著作权归作者所有

共有 人打赏支持
AlexPeng
粉丝 14
博文 23
码字总数 12131
作品 0
普陀
高级程序员
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
0
9
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
04/14
0
0
Flume NG 学习笔记(七)Sink Processors(故障转移与负载均衡)测试

版权声明:本文为博主原创文章,未经博主允许不得转载。 目录(?)[+] Sink groups允许组织多个sink到一个实体上。 Sink processors能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败...

jackwxh
06/29
0
0
flume 1.7 源码导入eclipse windows

安装maven,设置MAVEN_HOME等配置 下载flume源码 eclipse-oxygen,设置eclipse 使用外部maven,并配置settings.xml 遇到问题: 如果顺利,已将所需jar都下载下来了。 导入后遇到如下问题 fl...

柯里昂
2017/10/31
0
0
flume伪分布模式搭建(6)

搭建flume的伪分布的环境 Flume主要解决的是在集群中为hadoop收集数据,一个典型的应用就是web应用,一个大的网站肯定会有好多台服务器的,虽然只有一个中心的数据库,但是为了负载均衡还是会...

lixiyuan
2014/04/04
0
0
flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353
05/28
0
0
flume对nginx群集日志收集方案

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 fl...

weijixian1015
06/26
0
0
Flume框架简单介绍(34)

Flume是一个分布式的海量数据收集框架. Flume框架流程图 Channel是缓存的数据,如果Sink传送给了HDFS,Channel中缓存的数据就会删除,如果没有传送成功,Channel相当于做了备份,Sink重复从C...

肖鋭
2014/04/06
0
0
flume-ng 多节点集群示例

假设:现有两台机子,命名为:agent,collect。agent IP地址为:192.168.150.137,collect为192.168.150.135 要求:实现agent到collect的连接,并能向collect发送日志。 步骤: 两台电脑上分...

不是柯西
2014/02/28
0
0
kafka来读取flume的数据

一、查看kafka topic ./kafka-topics.sh --list --zookeeper bigdata-test-3:2181, bigdata-test-2:2181, bigdata-test-1:2181, bigdata-test-4:2181, bigdata-test-5:2181 ./kafka-topics.s......

weixin_41876523
05/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

【面试题】盲人坐飞机

有100位乘客乘坐飞机,其中有一位是盲人,每位乘客都按自己的座位号就坐。由于盲人看不见自己的座位号,所以他可能会坐错位置,而自己的座位被占的乘客会随便找个座位就坐。问所有乘客都坐对...

garkey
今天
0
0
谈谈神秘的ES6——(二)ES6的变量

谈谈神秘的ES6——(二)ES6的变量 我们在《零基础入门JavaScript》的时候就说过,在ES5里,变量是有弊端的,我们先来回顾一下。 首先,在ES5中,我们所有的变量都是通过关键字var来定义的。...

JandenMa
今天
1
0
arts-week1

Algorithm 594. Longest Harmonious Subsequence - LeetCode 274. H-Index - LeetCode 219. Contains Duplicate II - LeetCode 217. Contains Duplicate - LeetCode 438. Find All Anagrams ......

yysue
今天
0
0
NNS拍卖合约

前言 关于NNS的介绍,这里就不多做描述,相关的信息可以查看NNS的白皮书http://doc.neons.name/zh_CN/latest/nns_background.html。 首先nns中使用的竞价货币是sgas,关于sgas介绍可以戳htt...

红烧飞鱼
今天
1
0
Java IO类库之管道流PipeInputStream与PipeOutputStream

一、java管道流介绍 在java多线程通信中管道通信是一种重要的通信方式,在java中我们通过配套使用管道输出流PipedOutputStream和管道输入流PipedInputStream完成线程间通信。多线程管道通信的...

老韭菜
今天
0
0
用Python绘制红楼梦词云图,竟然发现了这个!

Python在数据分析中越来越受欢迎,已经达到了统计学家对R的喜爱程度,Python的拥护者们当然不会落后于R,开发了一个个好玩的数据分析工具,下面我们来看看如何使用Python,来读红楼梦,绘制小...

猫咪编程
今天
1
0
Java中 发出请求获取别人的数据(阿里云 查询IP归属地)

1.效果 调用阿里云的接口 去定位IP地址 2. 代码 /** * 1. Java中远程调用方法 * http://localhost:8080/mavenssm20180519/invokingUrl.action * @Title: invokingUrl * @Description: * @ret......

Lucky_Me
今天
1
0
protobuf学习笔记

相关文档 Protocol buffers(protobuf)入门简介及性能分析 Protobuf学习 - 入门

OSC_fly
昨天
0
0
Mybaties入门介绍

Mybaties和Hibernate是我们在Java开发中应用的比较多的两个ORM框架。当然,目前Mybaties正在慢慢取代Hibernate,这是因为相比较Hibernate而言Mybaties性能更好,响应更快,更加灵活。我们在开...

王子城
昨天
2
0
编程学习笔记之python深入之装饰器案例及说明文档[图]

编程学习笔记之python深入之装饰器案例及说明文档[图] 装饰器即在不对一个函数体进行任何修改,以及不改变整体的原本意思的情况下,增加函数功能的新函数,因为这个新函数对旧函数进行了装饰...

原创小博客
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部