文档章节

logstash tcp multihost output(多目标主机输出,保证TCP输出链路的稳定性)

 大东家
发布于 2015/01/13 10:28
字数 633
阅读 95
收藏 0
ELK

在清洗日志时,有一个应用场景,就是TCP输出时,需要在一个主机挂了的情况下,自已切换到下一个可用入口,而原tcp output仅支持单个目标主机设定。故本人在原tcp的基础上,开发出tcp_multihost输出插件,来满足此场景。

插件在一开始的时候会随机选择一个链路,而在链路出错连续超过3(默认)次后会尝试数组中下一个主机

github: http://github.com/xiaohelong2005

Logstash版本:1.4.2

文件位置:

# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "thread"
#@auhor:xiaohelong 
#@date:2014-10-24
#@email:xiaohelong2005@gmail.com
# Write events over a TCP socket.
#Auto select the host from iplist to tolerate the link
# Each event json is separated by a newline.
#
# Can  connect to a server,
class LogStash::Outputs::Tcp_Multihost < LogStash::Outputs::Base
  config_name "tcp_multihost"
  milestone  0
  default :codec, "json"
  # the address to connect to.
  #hosts config example
  config :hosts, :validate => :array, :required => true,:default=>{}
    config :reconnect_times, :validate => :number,:default=>3
  # When connect failed,retry interval in sec.
  config :reconnect_interval, :validate => :number, :default => 10
 #last available host
 @hosts_size=0
 #last available host ip
@host="0.0.0.0"
#last available port
@port=9200
#retry action count,if retry_count<=0,we need to update the host and port , also include retry_count itself
@retry_count=0
#get the desgined index data
@initloc=0

  public
  def register
    require "stud/try"
  #here we use the recorded host,if recorded host is not available, we need update it
  @retry_count=@reconnect_times
   @hosts_size=@hosts.length
    @logger.info("length:#@hosts_size; hosts:#@hosts")
   @initloc=Random.rand(@hosts_size)#generate 0-(hosts_size-1) int
    @logger.info("initloc:#@initloc")
    icount=0;
      @hosts.each do |hosthash|               
            @logger.info("hosthash info",hosthash)
       end#do
        @host=@hosts[@initloc].keys[0]        
       @port=@hosts[@initloc][@host]
       
      client_socket = nil
      @codec.on_event do |payload|
        begin
            @retry_count=@reconnect_times#here we need to init retry mark
          client_socket = connect unless client_socket
          r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)          
          # don't expect any reads, but a readable socket might
          # mean the remote end closed, so read it and throw it away.
          # we'll get an EOFError if it happens.
          client_socket.sysread(16384) if r.any?
          # Now send the payload
          client_socket.syswrite(payload) if w.any?
           @logger.info("tcp output info:", :host => @host, :port => @port,
                       :exception => e, :backtrace => e.backtrace)
        rescue => e
          @logger.warn("tcp output exception", :host => @host, :port => @port,
                       :exception => e, :backtrace => e.backtrace)
          client_socket.close rescue nil
          client_socket = nil
          @retry_count-=1
            @logger.info("retry_count:#@retry_count")
          if    @retry_count<=0
                                        @initloc+=1
                              @initloc=@initloc%@hosts_size #update  init location
                                           @host=@hosts[@initloc].keys[0]        
                                            @port=@hosts[@initloc][@host]
                   @retry_count=@reconnect_times #update retry_count
                      @logger.info("retry_count <=0,initloc:#@initloc,retry_count=#@retry_count:", :host => @host, :port => @port, :exception => e, :backtrace => e.backtrace)
          end
          sleep @reconnect_interval
          retry
        end
      end
  end # def register

  private
  def connect
    Stud::try do
      return TCPSocket.new(@host,@port)
    end
  end # def connect
  public
  def receive(event)
    return unless output?(event)
    @codec.encode(event)
  end # def receive
end # class LogStash::Outputs::Tcp_multihost


配置说明(我放在LOGSTASH_HOME/config):

output{

tcp_multihost{

   hosts=>[
            {"127.0.0.1"=>"9202"},
       {"localhost"=>"9201"},
   {"127.0.0.1"=>"9203"},
            {"127.0.0.1"=>"9204"}
         ] #主机列表
    workers =>16 #线程,默认1

reconnect_times=>3 # 默认3次, 尝试多少次数后切换
reconnect_interval=>3 #默认10秒,失败重连间隔
}
}

调用执行:

 "LOGSTASH_HOME/bin/logstash" agent --debug -f "LOGSTASH_HOME/config/shipper.config"  --pluginpath "LOGSTASH_HOME"

NC接收端可以尝试:

nc -lkv 9201 之类的

本文转载自:http://blog.csdn.net/xiaohelong2005/article/details/40510443

粉丝 22
博文 255
码字总数 111936
作品 0
长沙
项目经理
私信 提问
logstash收集log4j日志

使用logstash收集log4j日志信息 log4j日志文件配置 重要参数详解 mode logstash工作模式,可选"server"或者"client",默认是"server",server就是把logstash看做是日志的服务器,接收log4j主机......

冷川
2016/08/01
494
0
利用 Nmap 实现快速的网络发现与管理

快速并准确掌握网络中主机、网络设备及运行的网络服务信息是管理大型网络的基础,传统基于预定义端口的扫描或者基于 SLP 协议的发现机制,很少考虑到实际的网络环境,网络发现的效率和可侦测...

红薯
2011/03/08
1K
3
日志收集工具 logpipe 更新至 0.16.1 版本

0.16.1 2018-01-16 calvin * 修正了插件logpipe-input-file的转档时重复读取问题 0.16.0 2018-01-15 calvin * 优化了插件logpipe-output-ek性能,性能提高了一倍 * 插件logpipe-input-file新...

calvinwilliams
2018/02/27
937
3
ELK实战之通过TCP收集日志

一、TCP收集日志使用场景 tcp模块的使用场景如下: 有一台服务器A只需要收集一个日志,那么我们就可以不需要在这服务器上安装logstash,我们通过在其他logstash上启用tcp模块,监听某个端口,...

IT_外卖小哥
2018/01/02
0
0
ELK实时日志分析平台环境部署

原本打算构建和实践基于ELK的实时日志分析平台的,偶然发现此文,甚是详细和实用,便转载作以记录。 另,近期在使用公司RDS日志监控架构时了解到,使用的日志收集工具为filebeat,百度发现f...

wqhlmark64
2017/11/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
161
8
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
2
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部