目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。
问题的定义与决策
为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用 MySQL 作为主数据库存储,因此有以下选择:
-
直接在 MySQL 数据库中查询用户在搜索框中输入的每个关键词,就像 %#{word1}%#{word2}%... 这样。
-
使用一个高效的搜索数据库,如 Elasticsearch。
因此,我们必须决定一种高效、可靠的方式,将数据实时地从 MySQL 迁移到 Elasticsearch 中。接下来需要做出如下的决定:
-
使用 Worker 定期查询 MySQL 数据库,并将所有变化的数据发送到 Elasticsearch。 -
在应用程序中使用 Elasticsearch 客户端,将数据同时写入到 MySQL 和 Elasticsearch 中。 -
使用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到 Elasticsearch。
选项1并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项 1 无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。
在我们的场景中,如果选择选项 2,那么我们可以预见一些问题:如果 Elasticsearch 建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入 Elasticsearch 时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入 Elasticsearch 客户端,那么我们将在 Elasticsearch 中更新这次数据的更改,无法保证 MySQL 与 Elasticsearch 间的数据一致性。
接下来我们该考虑如何将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用 Elasticsearch 客户端带来的问题,只不过是将风险从 Elasticsearch 转移到了消息管道。最终我们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到消息管道中的方式来实现基于事件的流引擎。关于 binlog 的内容可以点击链接:https://dev.mysql.com/doc/refman/5.7/en/binary-log.html,在这里不再赘述。
服务简介
为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题
和内容
,这部分内容我们称之可搜索内容(Searchable Content)。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata)。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content)。到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:
{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}
基础设施
Apache Kafka: Apache Kafka 是开源的分布式事件流平台。我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储。
mysql-binlog-connector-java: 我们使用 mysql-binlog-connector-java 从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。我们将单独启动一个服务来完成这个过程。
在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中。
Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。
配置技术栈
我们使用 docker 和 docker-compose 来配置和部署服务。为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境。
version: "3"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: app
ports:
3306:3306
volumes:
mysql:/var/lib/mysql
zookeeper:
image: bitnami/zookeeper:3.6.2
container_name: zookeeper
ports:
2181:2181
volumes:
zookeeper:/bitnami
environment:
ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.7.0
container_name: kafka
ports:
9092:9092
volumes:
kafka:/bitnami
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
zookeeper
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: elasticsearch
environment:
discovery.type=single-node
volumes:
elasticsearch:/usr/share/elasticsearch/data
ports:
9200:9200
volumes:
mysql:
driver: local
zookeeper:
driver: local
kafka:
driver: local
elasticsearch:
driver: local
# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
"mappings": {
"properties": {
"searchable": {
"type": "nested",
"properties": {
"title": {
"type": "text"
},
"content": {
"type": "text"
}
}
},
"metadata": {
"type": "nested",
"properties": {
"tenant_id": {
"type": "long"
},
"type": {
"type": "integer"
},
"created_at": {
"type": "date"
},
"created_by": {
"type": "keyword"
},
"updated_at": {
"type": "date"
},
"updated_by": {
"type": "keyword"
}
}
},
"raw": {
"type": "nested"
}
}
}
}'
核心代码实现(SpringBoot + Kotlin)
Binlog 采集端:
override fun run() {
client.serverId = properties.serverId
val eventDeserializer = EventDeserializer()
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
)
client.setEventDeserializer(eventDeserializer)
client.registerEventListener {
val header = it.getHeader<EventHeader>()
val data = it.getData<EventData>()
if (header.eventType == EventType.TABLE_MAP) {
tableRepository.updateTable(Table.of(data as TableMapEventData))
} else if (EventType.isRowMutation(header.eventType)) {
val events = when {
EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
else -> emptyList()
}
logger.info("Mutation events: {}", events)
for (event in events) {
kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
}
}
}
client.connect()
}
Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
"id": 1,
"title": "Foo",
"content": "Bar"
}
事件处理器
class KafkaBinlogTopicListener(
val binlogEventHandler: BinlogEventHandler
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
}
private val objectMapper = jacksonObjectMapper()
fun process(message: String) {
val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
logger.info("Consume binlog event: {}", binlogEvent)
binlogEventHandler.handle(binlogEvent)
}
}
binlogEventHandler
去进行处理。实际上
BinlogEventHandler
是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到
KafkaBinlogTopicListener
中。
class ElasticsearchIndexerBinlogEventHandler(
val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
val payload = binlogEvent.payload as Map<*, *>
val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
// Should delete from Elasticsearch
if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
val deleteRequest = DeleteRequest()
deleteRequest
.index("search")
.id(documentId)
restHighLevelClient.delete(deleteRequest, DEFAULT)
} else {
// Not ever WRITE or UPDATE, just reindex
val indexRequest = IndexRequest()
indexRequest
.index("search")
.id(documentId)
.source(
mapOf<String, Any>(
"searchable" to mapOf(
"title" to payload["title"],
"content" to payload["content"]
),
"metadata" to mapOf(
"tenantId" to payload["tenantId"],
"type" to payload["type"],
"createdAt" to payload["createdAt"],
"createdBy" to payload["createdBy"],
"updatedAt" to payload["updatedAt"],
"updatedBy" to payload["updatedBy"]
)
)
)
restHighLevelClient.index(indexRequest, DEFAULT)
}
}
}
总结
其实 Binlog 的处理部分有很多开源的处理引擎,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!
-end-
本文分享自微信公众号 - 京东云开发者(JDT_Developers)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。