文档章节

解决 MySQL 与 Elasticsearch 数据不对称问题

netkiller-
 netkiller-
发布于 2017/08/22 15:23
字数 923
阅读 2957
收藏 111

本文节选自《Netkiller Database 手札》

作者:netkiller 网站:http://www.netkiller.cn

23.10.7. 解决MySQL与Elasticsearch 数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where mtime > :sql_last_value"
    use_column_value => true
    tracking_column => "mtime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-mtime.last"
  }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (
  `id` int(11) NOT NULL,
  `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
	-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
	IF NEW.status = 'N' THEN
		insert into elasticsearch_trash(id) values(OLD.id);
	END IF;
	-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
    IF NEW.status = 'Y' THEN
		delete from elasticsearch_trash where id = OLD.id;
	END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
	-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
	insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
	@Id
	private int id;

	@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
	private Date ctime;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public Date getCtime() {
		return ctime;
	}

	public void setCtime(Date ctime) {
		this.ctime = ctime;
	}

}

仓库 

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}

定时任务 

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
	private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

	@Autowired
	private TransportClient client;

	@Autowired
	private ElasticsearchTrashRepository alasticsearchTrashRepository;

	public ScheduledTasks() {
	}

	@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
	public void cleanTrash() {
		for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
			DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
			RestStatus status = response.status();
			logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
			if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
				alasticsearchTrashRepository.delete(elasticsearchTrash);
			}
		}
	}
}

Spring boot 启动主程序。 

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

 

© 著作权归作者所有

共有 人打赏支持
netkiller-

netkiller-

粉丝 686
博文 255
码字总数 346643
作品 10
深圳
部门经理
私信 提问
加载中

评论(5)

雷兽
怎么好像有人。。。。。。啥也没看懂就评论了啊 哈哈哈哈
netkiller-
netkiller-

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?

引用来自“jarchan”的评论

应该是数据插入到MySQL还是应用程序自己负责,使用logstash的jdbc-input-plugin能够定期将插入的数据喂给ES。只不过jdbc-input-plugin不能够处理数据更新和删除的情况,所以写了这篇文章。
:+1:
jarchan
jarchan

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
应该是数据插入到MySQL还是应用程序自己负责,使用logstash的jdbc-input-plugin能够定期将插入的数据喂给ES。只不过jdbc-input-plugin不能够处理数据更新和删除的情况,所以写了这篇文章。
久永
久永

引用来自“风中静草”的评论

没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
+++
千面书生鬼见愁
没解决过程说明!
示例也没有。
同一批数据如何既进ES又入MySQL?
Elasticsearch 基本介绍及其与 Python 的对接实现

什么是 Elasticsearch 想查数据就免不了搜索,搜索就离不开搜索引擎,百度、谷歌都是一个非常庞大复杂的搜索引擎,他们几乎索引了互联网上开放的所有网页和数据。然而对于我们自己的业务数据...

崔庆才
08/01
0
0
Ubuntu 搭建 Elasticsearch 6 集群流程

为何要搭建 Elasticsearch 集群 凡事都要讲究个为什么。在搭建集群之前,我们首先先问一句,为什么我们需要搭建集群?它有什么优势呢? 高可用性 Elasticsearch 作为一个搜索引擎,我们对它的...

崔庆才
08/04
0
0
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
09/11
0
0
I-team 博客全文检索 Elasticsearch 实战

一直觉得博客缺点东西,最近还是发现了,当博客慢慢多起来的时候想要找一篇之前写的博客很是麻烦,于是作为后端开发的楼主觉得自己动手丰衣足食,也就有了这次博客全文检索功能Elasticsearch...

haifeiWu
07/23
0
0
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Accept和Content-type的意思

Accept意思是我希望接收到的数据类型 Content-type意思是我发出去的数据类型

大灰狼wow
20分钟前
1
0
Java每天10道面试题,跟我走,offer有!(五)

41.Iterator、ListIterator 和 Enumeration的区别?   迭代器是一种设计模式, 它是一个对象, 它可以遍历并选择序列中的对象, 而开发人员不需要了解 该序列的底层结构。 迭代器通常被称为...

Java干货分享
20分钟前
2
0
meta 解决页面浏览器兼容性

使用最高级的ie内核,如果支持谷歌内核,使用谷歌内核 <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1" /> 这 样写可以达到的效果是如果安装了GCF,则使用GCF来渲染页面,如...

之渊
22分钟前
2
0
极验验证demo(django+vue)

在使用之前,曾经试过用阿里云的人机验证,不过在签名部分比较复杂,下载sdk后需要自己写很多,折腾了一下,还是放弃。而腾讯云的人机验证python版本有demo,直接填写keyhe1secret就可以使用...

xiaoge2016
23分钟前
1
0
浅谈js回调

js回调极为简洁,无需声明,直接通过参数传入方法实体,调用方法实体的时候,可以直接调用方法名或者方法名加参数即可,以下看例子 socket.initWebSocket(this, userName, userId, (isSucce...

Carbenson
28分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部