Logstash实时同步mysql数据库至elastic

原创
2017/05/09 15:49
阅读数 675

 

  1. 安装logstash-jdbc-input 插件(教程自己找)
  2. 运行 logstash -f D:\logstash-5.2.2\config\shipper.config
  3. shipper文件中的主要目标,定时刷新最新有变更的(依据update_date)数据至elastic,初始时直接插入,elastic检测到id相同时,则更新自有的数据。
  4. shipper.config 文件内容:
input {
    jdbc{
	 type => "sys_user"
	 #The path to our download jdbc driver
	 jdbc_connection_string => "jdbc:mysql://192.168.133.4:3306/orderproduction_temp?useUnicode=true&characterEncoding=utf-8"
	 jdbc_driver_library => "D:\logstash-5.2.2\driver\mysql-connector-java-5.1.39.jar"
	 jdbc_driver_class => "com.mysql.jdbc.driver"
	 
	 #The user we wish to execute our statement as
	 jdbc_user => "root"
	 jdbc_password => "dbserver666"
	 
	 #last_run_metadata_path => "./logstash_last_run_display"
	 
	 #every 5 minutes execute
	 #schedule => "*/5 * * * *"
	 schedule => "* * * * *"
	 #if clean_run set to true, sql_last_value is set to 19700101
	 #clean_run => true 
	 last_run_metadata_path => "./logstash_jdbc_last_run"
	 #out query
	 #parameters => {"name" => "长沙大东家"}
	 #statement => "select * from sys_user where name = :name"
	 statement => "select * from sys_user where update_date > :sql_last_value"
	 use_column_value => true
	 tracking_column => "update_date"
	 
	 jdbc_paging_enabled => "true"
	 jdbc_page_size => "1000"
	 
	 jdbc_default_timezone =>"Asia/Shanghai"
	}
}

filter {
			#grok{
			                #break on first match ,when there are multiple pattern need to cope
						    #break_on_match=>true
							#extract year and date info from create_date field
							#match => ["create_date","%{YEAR:year}-%{MONTH:month}-dd HH:mm:ss"]
							#add_field =>{
							#	"yearmonth" => "%{year}-%{month}"
							#}
			#}
			ruby {
		          #note 1 ruby need to be put before mutate
			      #note 2 if not run or correctly run ruby ,please install jruby and try it again
			      #in  linux, if hostname is no configured in hosts file, logstash get 0.0.0.0 ,so we add this code to get hostname on linux platform IO.popen('hostname')
			          #init => "require 'time'"
		              code => "
                               # IO.popen('hostname') { |io| while (line = io.gets) do event['host']=line end }
								# get date from path
								event.set('yearmonth',event.get('create_date').to_s[0..6])
                           "
		   }
			#date {
				 #use timezone is to avoid the value subtract timezone,if +8 zone,hours will be -8,in deploy situation ,please check the     hour value is correct, if not try remove timezone
			#	 timezone=>"Asia/Shanghai"
			#	 match => ["create_date","YYYY-MM-dd HH:mm:ss +0800"]
			#	 target => "@timestamp"
			#}
}

output {
   #stdout{ codec => json_lines  }
   stdout { codec => rubydebug }
   elasticsearch{
   "hosts" => "192.168.133.4:9200"
   "index" => "orderproduction_%{yearmonth}"
   "document_type" => "%{type}"
   "document_id" => "%{id}"
   doc_as_upsert => true
   action => "update"
   }
}

 

展开阅读全文
打赏
1
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
1
分享
在线直播报名
返回顶部
顶部