文档章节

使用Logstash将MySql数据迁移到Elasticsearch中

刺猬一号
 刺猬一号
发布于 2017/03/18 17:12
字数 1744
阅读 312
收藏 0

在许多情况下,我们希望使用不是由Elasticsearch本身支持的不同数据库的输入。在本文中,我们将展示如何通过Logstash将数据从MySql数据库迁移到Elasticsearch。

JBDC插件

Logstash可用的JDBC插件确保来自任何具有JDBC接口的数据库的数据都可以作为输入存入Logstash。这个插件还支持需要调度运行logstash。它还通过使用查询使选择性数据作为输入。在这些类型的数据库中,我们有行和列的概念。每个行都被视为单个事件,每个行(事件)中的列都被视为事件中的字段。

以下框图说明了JDBC连接器插件在从JDBC支持的数据库迁移数据时的作用: 
mysql-logstash1.png#asset:1073

在图中,我们有logstash运行配置文件,触发我们设置的预定义查询,以便将我们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。 

根据我们的要求,我们可以处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。我们将在后面的章节中的示例中显示详细的应用程序。

将MySql数据插入Elasticsearch

让我们继续在Logstash的帮助下将数据从顺序数据库(如MySql)迁移到Elasticsearch。我们需要相应的MySql的JDBC驱动程序。您可以 在这里下载 现在让我们"testdb"使用以下命令创建一个在MySql 中命名的数据库:

创建 testdb

数据库现在已创建,我们只是确保我们使用相同的数据库用于我们的目的:

显示 数据库 ;
使用 testdb;

使用以下模式创建"testtable"在数据库下命名的表"testdb"

创建 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));

现在在上表中插入一些测试数据: 

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,

我们创建了一个包含3名员工详细信息的表。您可以通过传递查询来显示表的详细信息以显示其所有内容:

select * from testtable

结果表将如下所示: 

mysqllogstash2.png#asset:1074

Logstash配置

现在我们已经创建了一个内容如上所示的MySql表,看看如何配置Logstash。在logstash文件夹中,我们有一个logstash.conf文件,它是要配置和运行以获取必要的结果的文件。初始配置如以下屏幕截图所示: 

mysqllogstash3.png#asset:1075

在上面的配置文件中,我们提到了许多参数,例如:JDBC连接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于"logstash.conf"文件后,通过键入以下命令运行Logstash:

bin / logstash -f logstash .conf

如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引"test-migrate"如下:

curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true'  -d  '{}'

上述查询将每行作为单独的文档列出,列为字段。一个例子: 

{
   “_index” : “测试迁移”,
   “_type” : “数据”,
   “_id” :“4004” ,
   “_score” :1,
   “_source”:{
     “PERSONID” :4004,
     “姓氏”:“男爵“,
     ”名字“:”理查“,
     ”城市“:”开普敦“,
     ”日期“:”2016-05-23T10:42:04.370Z“ ,
    “@version”:“1”,
     “@timestamp”:“2016-07-10T10:36:43.685Z”
  }}
}}

更多配置

在本节中,我们将展示各种用例场景。向上面的MySql中添加另一行数据,如下所示:

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
 VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');

另外,更新同一表格中一行的现有值,如下所示:

UPDATE测试表
- > SET FirstName = 'James' 
 - > WHERE PersonID = 4005 ;

1.重复问题

完成上述步骤后,再次运行logstash配置文件。我们期望总共4个文档包括新的行和更新的行。但是,当再次检查索引时,不是这样的。相反,我们共有7个文件。这是因为初始文档在elasticsearch数据库中保持不变,这是由于没有从表本身提供特定的id。当我们运行logstash配置时,整个内容"testtable"都会被索引一次。

我们如何解决这种重复?我们必须为每个文档提供一个唯一的ID。对于每次运行,每个文档的ID应该相同,以防止重复问题。这可以通过编辑conf文件的输出部分来实现,如下所示:

mysqllogstash4.png#asset:1076

2.变换操作

我们遇到的其他重要需求之一将是更改字段名称和值,当我们索引到elasticsearch。我们在我们当前的例子中添加一个要求来演示这个。对于所有与“开普敦”匹配的文档,该字段"City"应替换为值"South Africa",字段值应替换为"Country"

为了实现此要求,请使用该"filter"属性在elasticsearch中操作已提取的数据。使用"mutate"里面的属性"filter"执行所需的更改。使用上面的"input""output"部分的设置,我们需要"filter"logstash.conf文件中添加以下部分:

filter {
   if [city] == “开普敦” {
    mutate {
       rename => { “city” => “country” }
        replace => [ “country”,“South Africa” ]
      }}
   }}
}}

检查所获取的数据"Cape Town"的每个事件的"City"列的值。如果找到匹配项,"City"则重命名该字段,"country"并将每个匹配的值替换为"South Africa"。  

3.计划和增量更新

如果数据在MySql数据库中不断更新,我们需要递增和定期对其进行索引,该怎么办?要定期获取数据,请"scheduler"在输入部分中添加属性。在"scheduler,"给定的值时,让每隔一段时间运行conf文件。它是高度可定制的,并使用Rufus调度程序语法

对于增量更新,请修改查询以"sql_last_value"针对字段使用。这里我们给这个字段  "Date"。我们还设置"use_column_value"为true,并将对应的列链接到"Date"使用"tracking_column"

用于情况1,2和3的完整配置文件如下:

mysqllogstash5.png#asset:1077

为了看到上面的配置工作,添加几个字段到现有的MySql表,其"Date"值比之前存在的值更新。现在运行logstash,您可以看到只有新数据已在Elasticsearch索引中建立索引。

结论

在本文中,我们讨论了用于使用logstash将数据从连续数据库迁移到Elasticsearch的JDBC插件。我们还熟悉如何处理常见问题,如重复,字段和值的突变,以及调度和增量更新。问题/意见?给我们一行下面。

© 著作权归作者所有

刺猬一号
粉丝 12
博文 373
码字总数 616361
作品 0
深圳
私信 提问
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
2018/09/11
0
0
Logstash迁移Elasticsearch数据方法解读

Elasticsearch中数据搬迁是工程师们经常会做的,有时是为了集群迁移、有时是为了数据备份、有时是为了升级等等,迁移的方式也有很多种,比如说通过elasticsearch-dump、通过snapshot、甚至是...

pcdog
2018/05/21
0
0
CentOS7.3下ELK日志分析系统集群搭建

Elasticsearch是个基于Lucene实现的开源、分布式、restful的全文本搜索引擎,此外他还是一个分布式实时文档存储,其中每个文档的每个filed均是可被索引的数据,且可被搜索,也是一个带实时分...

wujunqi1996
2018/07/14
0
0
小白都会超详细--ELK日志管理平台搭建教程

目录 一、介绍 二、安装JDK 三、安装Elasticsearch 四、安装Logstash 五、安装Kibana 六、Kibana简单使用 系统环境:CentOS Linux release 7.4.1708 (Core) 当前问题状况 开发人员不能登录线...

渣渣辉
2018/07/15
0
0
开源数据同步神器——canal

前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将m...

IT米粉
01/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

在Javascript中Eval函数的使用

【eval()函数】 JavaScript有许多小窍门来使编程更加容易。 其中之一就是eval()函数,这个函数可以把一个字符串当作一个JavaScript表达式一样去执行它。 举个小例子: var the_unevaled_ans...

花漾年华
13分钟前
1
0
[日更-2019.5.22、23] Android 系统的分区和文件系统(二)--Android 文件系统中的文件

声明 Android系统中有很多分区,每个分区内的文件系统一般都不同的,使用ADB进入系统/目录下可发现挂载这很多的目录,不同的目录中可来自不同的分区及文件系统; 那么,就来分下这些目录里面...

小馬佩德罗
17分钟前
2
0
数组操作相关算法

/*数组的相关的算法操作:1、在数组中找最大值/最小值*/class Test11_FindMax{public static void main(String[] args){int[] array = {4,2,6,8,1};//在数组中找最大...

architect刘源源
今天
2
0
okhttp3 以上版本在安卓9.0无法请求数据的解决方案

应用官方的说明:在 Android 6.0 中,我们取消了对 Apache HTTP 客户端的支持。 从 Android 9 开始,默认情况下该内容库已从 bootclasspath 中移除且不可用于应用。且Android P 限制了明文流量...

chenhongjiang
今天
12
0
简单示例:NodeJs连接mysql数据库

开篇引用网上的说法: 简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是一个基于Chrome JavaScript 运行时建立的一个平台。Node.js是一个事件驱动I/O服务端JavaScript环境,基于...

李朝强
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部