文档章节

解决 MySQL 与 Elasticsearch 数据不对称问题

netkiller-
 netkiller-
发布于 2017/08/22 15:23
字数 923
阅读 2928
收藏 111
点赞 4
评论 5

本文节选自《Netkiller Database 手札》

作者:netkiller 网站:http://www.netkiller.cn

23.10.7. 解决MySQL与Elasticsearch 数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where mtime > :sql_last_value"
    use_column_value => true
    tracking_column => "mtime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-mtime.last"
  }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (
  `id` int(11) NOT NULL,
  `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
	-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
	IF NEW.status = 'N' THEN
		insert into elasticsearch_trash(id) values(OLD.id);
	END IF;
	-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
    IF NEW.status = 'Y' THEN
		delete from elasticsearch_trash where id = OLD.id;
	END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
	-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
	insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
	@Id
	private int id;

	@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
	private Date ctime;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public Date getCtime() {
		return ctime;
	}

	public void setCtime(Date ctime) {
		this.ctime = ctime;
	}

}

仓库 

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}

定时任务 

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
	private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

	@Autowired
	private TransportClient client;

	@Autowired
	private ElasticsearchTrashRepository alasticsearchTrashRepository;

	public ScheduledTasks() {
	}

	@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
	public void cleanTrash() {
		for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
			DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
			RestStatus status = response.status();
			logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
			if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
				alasticsearchTrashRepository.delete(elasticsearchTrash);
			}
		}
	}
}

Spring boot 启动主程序。 

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

 

© 著作权归作者所有

共有 人打赏支持
netkiller-

netkiller-

粉丝 671
博文 240
码字总数 322337
作品 10
深圳
部门经理
加载中

评论(5)

雷兽
怎么好像有人。。。。。。啥也没看懂就评论了啊 哈哈哈哈
netkiller-
netkiller-

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?

引用来自“jarchan”的评论

应该是数据插入到MySQL还是应用程序自己负责,使用logstash的jdbc-input-plugin能够定期将插入的数据喂给ES。只不过jdbc-input-plugin不能够处理数据更新和删除的情况,所以写了这篇文章。
:+1:
jarchan
jarchan

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
应该是数据插入到MySQL还是应用程序自己负责,使用logstash的jdbc-input-plugin能够定期将插入的数据喂给ES。只不过jdbc-input-plugin不能够处理数据更新和删除的情况,所以写了这篇文章。
久永
久永

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
+++
千面书生鬼见愁
没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
Elasticsearch中文分词研究

一、ES分析器简介 ES是一个实时搜索与数据分析引擎,为了完成搜索功能,必须对原始数据进行分析、拆解,以建立索引,从而实现搜索功能; ES对数据分析、拆解过程如下: 首先,将一块文本分成...

zhaipengfei1231
04/18
0
0
Elasticsearch学习(1)—— 简介

【简介】 Elasticsearch ( ES ) 是一个基于 Lucene 的实时分布式开源的全文搜索和分析引擎。它不但稳定、可靠、快速,而且也具有良好的水平扩展能力,是专门为分布式环境设计的。 Elasticsea...

叶枫啦啦
07/07
0
0
SpringBoot整合Elastic-Job,实现动态创建定时任务,任务持久化

SpringBoot使用Elastic-Job-lite,实现动态创建定时任务,任务持久化 Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。 ...

oppo5630
04/16
0
0
基于ELK实时日志分析的最佳实践

在2018云栖大会深圳峰会大数据分析与可视化专场上,由阿里巴巴搜索引擎事业部开放搜索团队的吴迪带来了“基于ELK实时日志分析的最佳实践”的主题分享。介绍了传统的日志分析、ELK的概念和ELK...

smile小太阳
05/06
0
0
elasticsearch搜索引擎相关资料(更新中)

最近需要用到elasticsearch搜索引擎,所以搜集了很多相关资料,先放在这里(未详细整理) 一、步骤总结:(linux环境下) 1. 安装 (1)下载elasticsearch安装包:http://www.elasticsearch....

核桃人
03/08
0
0
Spark中hive的使用(hive操作es示例)

配置hive-site.xml <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value> <description>JDBC connect ......

守望者之父
06/15
0
0
logstash、elasticsearch、kibana搭建日志平台

1、下载logstash a、官方下载地址:https://www.elastic.co/downloads/logstash b、解压:tar -xzvf logstash-5.1.1.tar.gz c、设置用户测试的配置文件:vim logstatsh_test.conf,编辑内容如...

binhu
05/23
0
0
ELK环境搭建(ElasticSearch+Logstash+kibana)

ELK是指Elasticsearch + Logstash + Kibaba三个组件的组合。本文讲解一个基于日志文件的ELK平台的搭建过程,有关ELK的原理以及更多其他信息,会在接下来的文章中继续研究。   在这个系统中...

阿恒灬
2017/10/31
0
0
ElasticSearch使用

安装之前,请参考https://github.com/richardwilly98/elasticsearch-river-mongodb根据你的MongoDB版本号决定需要的elasticsearch版本号和插件号。 1)安装ES 下载ElasticSearch_版本号.tar....

强子哥哥
2014/04/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

利用 acme.sh 获取网站证书并配置https访问

acme.sh 实现了 acme 协议, 可以从 letsencrypt 生成免费的证书.(https://github.com/Neilpang/acme.sh/wiki/%E8%AF%B4%E6%98%8E) 主要步骤: 安装 acme.sh 生成证书 copy 证书到 nginx/ap...

haoyuehong
8分钟前
0
0
微擎框架内如何根据media_id获取到微信图片的路径

微擎的框架内,图片选择后,获取的是那个字符串是media_id,相当于你这张图片在微信的图片服务器里面的id 要求是:获取https://mmbiz.qpic.cn/mmbiz_jpg/…… 微信图片的路径 而微信并没有根据m...

老bia同学
12分钟前
0
0
Spring boot中日期的json格式化

Model 在model层中,类的日期属性上面添加如下注解: @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd hh:mm:ss") 参考 Jackson Date格式化教程...

亚林瓜子
14分钟前
0
0
Eclipse:Failed to load the JNI shared library

1.问题背景: 由于我之前使用jdk1.9学习,当使用Luke的时候发现jdk版本过高,需要向下配置jdk,就向朋友拷了一个安装包。重新配置路径后,便开始报错。 2.问题描述: Failed to load the JNI...

tinder_boy
16分钟前
0
0
少儿学习编程课程是否真的适合七八岁的低龄儿童[图]

少儿学习编程课程是否真的适合七八岁的低龄儿童[图]: 天下熙熙皆为利来,天下攘攘皆为利往。 这几年来,乐高教育机构在国内如同雨后春笋般出现,当然关闭/转手的也很多。从教师角度来看,部...

原创小博客
22分钟前
0
0
ES12-词项查询

1.词项查询介绍 全文查询将在执行之前分析查询字符串,但词项级别查询将按照存储在倒排索引中的词项进行精确操作。这些查询通常用于数字,日期和枚举等结构化数据,而不是全文本字段。 或者,...

贾峰uk
30分钟前
0
0
http状态码与ajax的状态值

ajax状态值 1.1 200 & OK:状态请求成功

litCabbage
33分钟前
0
0
iOS动画效果合集、飞吧企鹅游戏、换肤方案、画板、文字效果等源码

iOS精选源码 动画知识运用及常见动画效果收集 3D卡片拖拽卡片叠加卡片 iFIERO - FLYING PENGUIN 飞吧企鹅SpriteKit游戏(源码) Swift封装的空数据提醒界面EmptyView 沙盒文件浏览与分享调试控...

sunnyaigd
36分钟前
0
0
AngularJS配置.run()块中设置路由事件的监听器以及过滤未经授权的请求

AngularJS中的run方法初始化全局数据,只对全局作用域起作用,如$rootScope.多个控制器之间可以共享数据,如下代码所示: <script type="text/javascript"> var m1 = angular....

孟飞阳
36分钟前
0
0
Java语言学习(十):输入/输出

Java中,I/O操作代表着输入、输出,Java所有的I/O机制都是基于数据流进行输入输出。java.io类包提供了很多的输入输出处理功能方法,大家可以参考下JDK文档中关于I/O的一些处理方法:JDK在线中...

海岸线的曙光
46分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部