文档章节

elasticsearch之jdbc同步

王念博客
 王念博客
发布于 2016/04/14 13:43
字数 1526
阅读 4105
收藏 7

     由于es官网叫停river类的导入插件,因此原始的elasticsearch-jdbc-river变更为elasticsearch-jdbc,成为一个独立的导入工具。官方提到的同类型工具还有logstash,个人觉得logstash在做数据库同步的时候并不是很好用,有太多坑要填。

插件的github地址 https://github.com/jprante/elasticsearch-jdbc/

必须按照es的相应的版本安选择jdbc的版本

http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/<version>/elasticsearch-jdbc-<version>-dist.zip

下载时将<version>替换成相应的版本即可。

  1. 解压下载的压缩包

  2. 修改 bin下面的相应的命令文件,比如 windows有一个mysql-simple-example.bat

  3. 启动即可

jdbc本身有个坑是如果mysql中某个字段的值本身就是一个json格式的话就会报错,例如:

org.elasticsearch.index.mapper.MapperParsingException: failed to parse [page]

因此需要使用mysql的拼接函数来解决。

   CONCAT('(',HomePage,')') as `page`

报错信息:

java.net.MalformedURLException: unknown protocol: c


目前使用mysql导入数据到elasticsearch的方法常用方法如下:
elasticsearch-jdbc(早期的elasticsearch-river-jdbc)

https://github.com/jprante/elasticsearch-jdbc (推荐)


elasticsearch-river-mysql 

 https://github.com/scharron/elasticsearch-river-mysql

go-mysql-elasticsearch

https://github.com/siddontang/go-mysql-elasticsearch

logstash导入插件 

https://github.com/elastic/logstash


常用配置说明:

  "schedule" : "0 0/60 0-23 ? * *", 

  type: "jdbc", 

  jdbc: {

    url: "jdbc:mysql://127.0.0.1:3306/test", 

    user: "root", 

    password: "root", 

    sql : [

            {

                "statement" : "select id as _id  ... from ... where a = ?, b = ?, c = ?",

                "parameter" : [ "value for a", "value for b", "value for c" ]

            }

    ],

    locale: "zh_CN", 

    index: "创建的索引名", 

    type: "创建的type名", 

    index_settings: {

      index: {

        number_of_shards: "3"

      }

    }, 

    type_mapping: {

      "type名": {

        dynamic: true, 

        properties: {

          "field名": {

            type: "string", 

            analyzer: "ik", 

            indexAnalyzer: "ik", 

            searchAnalyzer: "ik"

          }, 

          "field名": {

            type: "string", 

            analyzer: "ik", 

            indexAnalyzer: "ik",

            searchAnalyzer: "ik"

          }, 

          "field名": {

            type: "string", 

            analyzer: "ik", 

            indexAnalyzer: "ik",

            searchAnalyzer: "ik"

          }, 

          "field名": {

            type: "string", 

            analyzer: "ik", 

            indexAnalyzer: "ik",

            searchAnalyzer: "ik"

          },   

          "LOCATIONS": {

            type: "geo_point"

          }

        }

      }

    }

  }

}



常用介绍:

获取一个表,select * from table可以使用查询。 查询从数据库选择数据的简单的变体。 他们转储表成Elasticsearch逐行。 如果没有_id列名,IDs将自动生成。

id as _id 这样的话可以增量同步,_id是es的默认id命名

"interval":"1800", 这里是同步数据的频率 1800s,半小时,可以按需要设成 1s或其它

"schedule" : "0 0/60 0-23 ? * *",   同步数据任务  60分钟一次

"flush_interval" : "5s",    刷新间隔为5S

sql.parameter——绑定SQL语句参数(按顺序)。 可以使用一些特殊的值具有以下含义:

    $now——当前时间戳

    $state——国家之一:BEFORE_FETCH,取回,AFTER_FETCH,无所事事,例外

    $metrics.counter——一个计数器

    $lastrowcount——从最后一条语句的行数

    $lastexceptiondate- SQL时间戳的例外

    $lastexception——完整的堆栈跟踪的例外

    $metrics.lastexecutionstart——最后一次执行SQL时间戳的时候开始

    $metrics.lastexecutionend- SQL时间戳的时候最后一次执行结束

    $metrics.totalrows——总获取的行数

    $metrics.totalbytes——获取的字节总数

    $metrics.failed——失败的SQL执行的总数

    $metrics.succeeded


删除river:


curl -XDELETE  localhost:9200/_river/_meta

http:// localhost:9200/_river/_meta   delete请求


参数名的介绍:


locale默认语言环境(用于解析数值,浮点的性格。 推荐的值是“en_US”)

timezone——JDBC的时区setTimestamp()调用绑定参数时的时间戳值

rounding——舍入模式解析数值。 可能的值“天花板”,“下”,“地板”,“halfdown”、“halfeven”,“halfup”、“不必要的”,“上”

scale——解析数值的精度

autocommit- - - - - -true如果每个语句应该被自动执行。 默认是false

fetchsize——fetchsize大型结果集,大多数司机使用这个控制行缓冲的数量而遍历结果集

max_rows——声明限制获取的行数,其余的行被忽略

max_retries——重试的次数(重新)连接到一个数据库

max_retries_wait——时间价值的时间应重试之间等。 

resultset_type- JDBC结果集类型,可以TYPE_FORWARD_ONLY TYPE_SCROLL_SENSITIVE TYPE_SCROLL_INSENSITIVE。 默认是TYPE_FORWARD_ONLY

resultset_concurrency- JDBC结果集并发性,可以CONCUR_READ_ONLY CONCUR_UPDATABLE。 默认是CONCUR_UPDATABLE

ignore_null_values——如果NULL值构建JSON文档时应该被忽略。 默认是false

detect_geo——如果geo多边形/分在SQL列构造时应解析JSON文档。 默认是true

detect_json——如果json结构构建json文档时应该解析SQL列。 默认是true

prepare_database_metadata——如果司机元数据作为参数要准备好。 默认是false

prepare_resultset_metadata——如果结果集元数据应该准备作为参数。 默认是false

column_name_map——地图的别名应该用作替代数据库的列名称。 对于Oracle 30 char列名称限制。 默认是null

query_timeout——第二个价值多长时间允许SQL语句被执行之前被认为是输了。 默认是1800

connection_properties——地图的连接属性用于创建驱动程序连接。 默认是null

schedule——一个单一的或cron表达式列表计划执行。 语法是相当于 石英cron表达式格式语法(见下文)

threadpoolsize——计划执行的线程池的大小schedule参数。 如果设置为1,所有工作将连续执行。 默认是4。

interval——两个运行之间的延迟时间值(默认值:不设置)

elasticsearch.cluster——Elasticsearch集群名称

elasticsearch.host——一系列Elasticsearch主机(主机名或规范host:port)

elasticsearch.port——Elasticsearch主机

elasticsearch.autodiscover——如果true、JDBC进口国将尝试连接到所有集群节点。 默认是false

max_bulk_actions每个批量索引请求提交的长度(默认值:10000)

max_concurrrent_bulk_requests并发大量请求的最大数量(默认值:2 * CPU核的数量)

max_bulk_volume——一个字节大小参数允许的最大体积的大部分请求(默认值:10米)

max_request_wait——时间价值的最大等待时间响应大部分请求

flush_interval——时间价值区间段冲洗索引文档批量操作(默认值:“5 s”)

index——Elasticsearch指数用于索引

type——Elasticsearch用于索引的索引类型

index_settings-可选设置Elasticsearch指数

type_mapping-可选为Elasticsearch指数类型映射

statefile——文件的名称JDBC进口国读写状态信息

metrics.lastexecutionstart——开始的UTC日期/时间的最后一次执行一个获取

metrics.lastexecutionend——最后的UTC日期/时间的最后一次执行一个获取

metrics.counter——一个计数器度量,将每一个获取后增加

metrics.enabled——如果true启用日志记录,指标。 默认是false

metrics.interval——度量日志之间的时间间隔。 默认是30秒。

metrics.logger.plain——如果true纯文本格式的日志消息,写指标。 默认是false

metrics.logger.json——如果true、写度量JSON格式的日志消息。 默认是false


博客地址:http://my.oschina.net/wangnian

© 著作权归作者所有

共有 人打赏支持
王念博客
粉丝 144
博文 99
码字总数 63930
作品 0
浦东
程序员
加载中

评论(1)

Level1一方通行
Level1一方通行
这个插件装了就能自动同步吗,求问
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
09/11
0
0
Centos6搭建elk系统,监控IIS日志

**所需程序: 服务器端:java、elasticsearch、kikbana 客 户 端:IIS、logstash** 一、服务器端(192.168.10.46)操作: 先建立一个ELK专门的目录: [root@Cent65 ~]mkdir /elk/ 上传到elk...

D杀手D
04/24
0
0
ElasticSearch使用

安装之前,请参考https://github.com/richardwilly98/elasticsearch-river-mongodb根据你的MongoDB版本号决定需要的elasticsearch版本号和插件号。 1)安装ES 下载ElasticSearch_版本号.tar....

强子哥哥
2014/04/09
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
Centos7单机部署ELK+x-pack

ELK分布式框架作为现在大数据时代分析日志的常为大家使用。现在我们就记录下单机Centos7部署ELK的过程和遇到的问题。   系统要求:Centos7(内核3.5及以上,2核4G)   elk版本:6.4.2(较...

DearMyLove
08/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

活动推荐|互联网3.0与区块链新时代论坛(北京)

1 时间地点 **时间:**9月22日 14:00 - 18:00 地点:(北京海淀)西大街70号 3w咖啡 二层 2 活动详情 Harmony创始人Stephen及团队将介绍他们的区块链分片扩容技术。Stephen曾任Apple地图服务...

HiBlock
39分钟前
1
0
如何优雅的删除Redis的大key

关于Redis大键(Key),我们从[空间复杂性]和访问它的[时间复杂度]两个方面来定义大键。前者主要表示Redis键的占用内存大小;后者表示Redis集合数据类型(set/hash/list/sorted set)键,所含有的...

IT--小哥
49分钟前
1
0
spring cloud学习笔记

工具 eclipse 4.9 gradle 4.10.2 spring cloud Finchley.SR1 spring boot 2.0.4 build.gradle buildscript {ext {springBootVersion = '2.0.4.RELEASE'}repositories {jcenter()......

bobby2006
50分钟前
1
0
Tcl命令操作实验-----(5)

Vivado% proc myproc {arg} {puts $arg}Vivado% myproc mynamemynameVivado% if {2>1} {puts 2>1} else {puts 2<1}2>1...

whoisliang
今天
1
0
比特币钱包RPC的PHP调用方法

当我们希望在Php开发的网站中加入比特币支付功能时,需要解决的第一个 问题,就是如何在Php程序代码中调用比特币钱包的RPC API开发接口来实现 我们期望的功能,例如比特币的支付与接收。 例如...

汇智网教程
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部