文档章节

How we reindexed 36 billion documents in 5 days within the same Elasticsearch cluster

不道归来
 不道归来
发布于 2017/02/15 15:34
字数 2657
阅读 18
收藏 0
点赞 0
评论 0

Prepairing to reindex the whole cluster (credits)

At Synthesio, we use ElasticSearch at various places to run complex queries that fetch up to 50 million rich documents out of tens of billion in the blink of an eye. Elasticsearch makes it fast and easily scalable where running the same queries over multiple MySQL clusters would take minutes and crash a few servers on the way. Every day, we push Elasticsearch boundaries further, and going deeper and deeper in its internals leads to even more love.

Last week, we decided to reindex a 136TB dataset with a brand new mapping. Updating an Elasticsearch mapping on a large index is easy until you need to change an existing field type or delete one. Such updates require a complete reindexing in a separate index created with the right mapping so there was no easy way out for us.

The "Blackhole" cluster

We've called our biggest Elasticsearch cluster "Blackhole", because that's exactly what it is: a hot, ready to use datastore being able to contain virtually any amount of data. The only difference with a real blackhole is that we can get our data back at the speed of light.

When we designed blackhole, we had to chose between 2 different models.

  • A few huge machines with 4 * 12 core CPU, 512GB of memory and 36 800GB SSD drives, each of them running multiple instances of Elasticsearch.
  • A lot of smaller machines we could scale horizontally as the cluster grows.

We opted for the latter since it would make scaling much easier and didn't require spending too much money upfront.

Blackhole runs on 75 physical machines:

* 2 http nodes, one in each data center behind a HAProxy to load balance the queries.
* 3 master nodes located in 3 different data center.
* 70 data nodes into 2 different data center.

Each node has quad core Xeon D-1521 CPU running at 2.40GHz and 64GB of memory. The data nodes have a RAID0 over 4*800GB SSD drives with XFS. The whole cluster runs a Systemd less Debian Jessie with a 3.14.32 vanilla kernel. The current version of the cluster has 218,75TB of storage and 4,68TB of memory with 2.39TB being allocated to Elasticsearch heap. That's all for the numbers.

Elasticsearch configuration

Blackhole runs ElasticSearch 1.7.5 on Java 1.8. Indexes have 12 shards and 1 replica. We ensure each data center hosts 100% of our data using Elasticsearch rack awareness feature. This setup allows to crash a whole data center without neither data loss nor downtime, which we test every month.

All the filtered queries are ran with _cache=false. ElasticSearch caches the filtered queries result in memory, making the whole cluster explode at the first search. Running queries on 100GB shards, this is not something you want to see.

When running in production, our configuration is:

routing:
  allocation:
    node_initial_primaries_recoveries: 20
    node_concurrent_recoveries: 20
    cluster_concurrent_rebalance: 20
    disk:
      threshold_enabled: true
      watermark:
        low: 60%
        high: 78%
index:
  number_of_shards: 12
  number_of_replicas: 1
  merge:
    scheduler:
      max_thread_count: 8
      type: 'concurrent'
      policy:
        type: 'tiered'
        max_merged_segment: 100gb
        segments_per_tier: 4
        max_merge_at_once: 4
        max_merge_at_once_explicit: 4
  store:
    type: niofs
  query:
    bool:
      max_clause_count: 10000
      
action:
  auto_create_index: false
  
  indices:
    recovery:
      max_bytes_per_sec: 2048mb
    fielddata:
      breaker:
        limit: 80%
      cache:
        size: 25%
        expire: 1m
    store:
      throttle:
        type: 'none'
        
discovery:
  zen:
    minimum_master_nodes: 2
    ping:
      multicast:
        enabled: false
      unicast:
        hosts: ["master01","master02","master03"]
        
threadpool:
  bulk:
    queue_size: 3000
    type: cached
  index:
    queue_size: 3000
    type: cached
bootstrap:
  mlockall: true
memory:
  index_buffer_size: 10%
http:
  max_content_length: 1024mb

After trying both ElasticSearch default_fs and mmapfs, we’ve picked up niofs for file system storage.

The NIO FS type stores the shard index on the file system (maps to Lucene  NIOFSDirectory) using NIO. It allows multiple threads to read from the same file concurrently.

The reason why we decided to go with niofs is to let the kernel manage the file system cache instead of relying on the broken, out of memory error generator mmapfs.

Tuning the Java virtual machine

We launch the java virtual machine with -Xms31g -Xmx31g. Combined with ElasticSearch mlockall=true, it ensures ElasticSearch gets enough memory to run and never swaps. The remaining 33GB are used for ElasticSearch threads and file system cache.

Despite ElasticSearch recommendations we have replaced the Concurrent Mark Sweep (CMS) garbage collector with the Garbage First Garbage Collector (G1GC). With CMS, we would run into a stop the world garbage collection for every single query on more than 1 month of data.

Our configuration of G1GC is relatively simple but does the job under pressure:

JAVA_OPTS=”$JAVA_OPTS -XX:-UseParNewGC”
JAVA_OPTS=”$JAVA_OPTS -XX:-UseConcMarkSweepGC”
JAVA_OPTS=”$JAVA_OPTS -XX:+UseCondCardMark”
JAVA_OPTS=”$JAVA_OPTS -XX:MaxGCPauseMillis=200"
JAVA_OPTS=”$JAVA_OPTS -XX:+UseG1GC “
JAVA_OPTS=”$JAVA_OPTS -XX:GCPauseIntervalMillis=1000"
JAVA_OPTS=”$JAVA_OPTS -XX:InitiatingHeapOccupancyPercent=35"

Blackhole Initial indexing

We started the initial indexing mid December 2015. It took 19 days from fetching the raw data to pushing it into ElasticSearch.

Back then, Blackhole only had 46 nodes:

  • 3 master nodes
  • 1 query node
  • 42 data nodes

This led to a cluster sized for 30 months of data with 1.29TB of memory and 134TB of storage, all SSD.

For this initial indexing, we decided to go with 1 index per month and 30 shards per index. This didn't work as expected as each query on a month would request data from 3TB and 1.2 billion documents. As most queries went on 3 to 12 months, this made the cluster impossible to scale properly.

The first part of the process took 10 days. We had to fetch 30 billion documents from our main Galera datastore, turn it into JSON and push it into a Kafka queue, each month of data being pushed into a different Kafka partition. Since we were scanning the database incrementally, the process went pretty fast considering the amount of data we were processing.

The migration processes were running on 8 virtual machines with 4 core and 8GB RAM. Each machine was running a 8 processes of a Scalahomemade program.

During the second part, we merged the data from the Kafka with data from 2 other Galera clusters and an Elasticsearch cluster before pushing them into Blackhole.

Blackhole initial migration

The merge and indexing parts took place on 8 virtual machines, each having 4 core and 8GB RAM. Each machine was running 8 indexing processes reading an offset of a Kafka partition.

The indexer was shard aware. It had a mapping between the index it was writing on, its shards and the data node they were hosted on. This allowed to index directly on the right data nodes with the lowest possible network latency.

This part was not as smooth as we expected.

The first version of the indexer was developed in Scala, but for some reasons was slow as hell, not being able to index more than 30,000 documents per second. We rewrote it in Go in 2 days, and it was much better, with an average of 60,000 indexed documents per second, with peaks at 120,000 documents per second.

Surprisingly, the main bottleneck was neither one of the Galera clusters nor the Elasticsearch metadata cluster, but the Kafka queues. For some reasons, we could not read more than 10,000 documents per second per Kafka partition.

The other unexpected bottleneck was the CPU. Surprisingly, we were CPU bound but the disks were not a problem (which is normal since we're using SSDs).

After 9 days, the data was fully indexed and we could start playing with the data.

Blackhole reindexing

When we decided to change Blackhole mapping, we had enough experience with the cluster and its content to avoid previous mistakes and go much faster.

Instead of monthly indexes, we decided to split the cluster into daily indexes. A few tests on a migrating index showed it was the way to go.

With the new mapping dropping a bunch of data, we moved from 3GB for 1 million documents (with a replica) to 2GB for 1 million documents. Going daily reduced the average index from 3TB to 120GB, and a single shard from 100GB to 10GB. Having a large number of machines, this allowed to better use the ressources, starting with the JVM heap, running parallel queries.

The reindexing process

Instead of polling the data from our database clusters, we decided to reuse the data from Blackhole itself. This meant reading and writing on the same cluster simultaneously, adding some fun in the operation.

This time, we did not use separate virtual machines to host the indexing processes. Instead, we decided to run the indexers on the data nodes, read locally and write on their counterpart in the secondary data center. Considering a 10Gb link and a 46ms network latency, that solution was acceptable. It meant we had 70 machines to both read and write to, allowing maximum parallelism.

There are many solutions to copy an Elasticsearch index to another, but most of them neither allow splitting one to many or change the data model. Unfortunately, the new mapping involved deleting some fields and moving other fields somewhere else. Since we did not have the time to build a homemade solution, we decided to go with Logstash.

Logstash has both an Elasticsearch input, for reading, an Elasticsearch output, for writing, and a transform filter to change the data model. The input module accepts a classic Elasticsearch query and the output module can be parallelized.

We ran a few tests on Blackhole to determine which configuration was the best, and ended with 5000 documents scrolls and 10 indexing workers.

Testing with 5000 documents scroll and 10 workers

For these tests, we were running with a production configuration, which explains the refreshes and segment count madness. Indeed, running with 0 replica was faster, but since we're using RAID0, this configuration was a no go.

During the operation, both source and target nodes behaved without problems, specifically on the memory level.

Source node for reindexing

Target node behavior

For the first tests, we ran logstash against a full day of reindexation, using a simple Elasticsearch query:

query => '{ "query": { "range": { "date": { "gte": "yyyy-mm-ddT00:00.000", "lte": "yyyy-mm-dd+1T00:00.000+01:00" } } } }

Unfortunately, for some reasons, we had missing documents because our scroll keepalive of 5 minutes was too short. This made catching up with the data too long as we had to replay the whole day, so we decided to run hourly queries.

Logstash configuration

input {
  elasticsearch {
    hosts => [ "local elasticsearch node" ]
    index => "index to read from"
    size => 5000
    scroll => "20m" # 5 minutes initial
    docinfo => true
    query => '{ "query": { "range": { "date": { "gte": "2015-07-23T10:00.000+01:00", "lte": "2015-07-23T11:00.000+01:00" } } } }'
  }
}
output {
  elasticsearch {
    host => "remote elasticsearch node"
    index => "index to write to"
    protocol => "http"
    index_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
    workers => 10
  }
  stdout {
    codec => rubydebug # because removing the timestamp field makes logstash crash
  }
}
filter {
  mutate {
    rename => { "some field" => "some other field" }
    rename => { "another field" => "somewhere else" }
    remove_field => [ "something", "something else", "another field", "some field", "@timestamp", "@version" ]
  }
}

Reindexing Elasticsearch configuration

We changed only a few settings for that reindexing.

memory:
  index_buffer_size: 50% (instead of 10%)
index:
  store:
    throttle:
      type : "none" (as fast as your SSD can go)
  translog:
    disable_flush: true
  refresh_interval: -1 (instead of 1s)
indices:
  store:
    throttle:
      max_bytes_per_sec: "2gb"

We wanted to limit the Lucene refreshes as much as we could, preferring to manage hundreds of thousand segments instead of limiting our throughput for CPU overhead.

Introducing Yoko and Moulinette

To manage the indexing process, we have created 2 simple tools: Yoko and Moulinette.

Yoko and Moulinette use a simple MySQL database with every index to process, query to run and status. The data model is pretty self explanatory:

CREATE TABLE `yoko` (
  `index_from` varchar(16) NOT NULL,
  `index_to` varchar(16) NOT NULL,
  `logstash_query` text NOT NULL,
  `status` enum("todo", "processing", "done", "complete", "failed") DEFAULT "todo"
);

Before indexing, we fill in the Yoko database with every index we want to migrate along with all the logstash queries we need to run. One line contains the source index, destination index and the query to reindex 1 hour of data.

Yoko is a simple Python daemon that manages the global indexing processes. It:

  • Creates the daily indexes when they don't exist yet with the right mapping.
  • Checks for every "done" daily index and compares the number of documents from the initial index running the logstash query.
  • Moves each successful "done" line to "complete" if the count matches or "failed".
  • Delete each monthly index when every day of a month is "complete".
  • Changes the refresh values when a daily index is "complete".
PUT /index/_settings?master_timeout=120s
{
  "translog.disable_flush" : "false",
    "index" : {
        "refresh_interval" : "1s"
    }
}

Moulinette is the processing script. It's a small daemon written in Bash (with some ugly bashisms) that runs on every indexing node. It fetches lines in "todo" from the yoko table, generates the logstash.conf with the source and destination index, and source and destination node and Logstash query. Then it runs Logstash, and once Logstash exits, switches the line to "done" if Logstash exit code is 0, or "failed" otherwise.

Reindexing in 5 days

Once again, the main problem was being CPU bound. As you can see on that Marvel screenshot, the cluster was put under heavy load during the whole indexing process. Considering that we were both reading and writing on the same cluster, with an indexing rate over 90,000 documents per second with 140,000 documents per second peaks, this is not surprising at all.

Reindexing blackhole, 2 days after

Having a look at the CPU graphs, there was little we could to to improve the throughput without dropping Logstash and relying on a faster solution running on less nodes.

CPU usage

The disks operations show well the scroll / index processing. There was certainly some latency inside Logstash for the transform process, but we didn't track it.

Disks operations

The other problem was losing nodes. We had some hardware issues and lost some nodes here and there. This caused indexing from that node to crash and indexing to that node to stale since Logstash does not exit when the output endpoint crashes.

This caused many lost time checking (almost) manually logs on every node once or twice a day. If an hourly index took more than 3 hours to process, we would consider it lost and restart Moulinette and move the hourly index to "todo".

Lesson learned, Yoko and Moulinette V2 will have a better silent error handling. When an index is blocked for more than 3 hours, Yoko will raise an alert and move the index to "todo". The alert will allow to kill the locked Logstash process and restart Moulinette as soon as there's a problem.

The next step is optimizing the indexes, moving from an average of 1500 Lucene segments post indexing to 24 (1 segment per replica). This aims both at improving the performances and removing completely the deleted documents we had after restarting the indexing post crash. When overwriting or deleting a document, Lucene does not actually delete it but flags it at "deleted" until an optimize is performed.

Our optimize script is extremely simple, starting with the indexes that have the most important number of deleted documents to save space.

#!/bin/sh
HOST=$1
CURL_BIN=$(which curl)
if [ -z "$HOST" ]; then
  echo "Host is missing"
  exit 1
fi
if [ -z "$CURL_BIN" ]; then
  echo "Curl binary is missing"
  exit 1
fi
for indice in $(${CURL_BIN} -XGET http://${HOST}:9200/_cat/indices | sort -rk 7 | awk '{print $3}'); do
  if [ ! -z "$indice" ]; then
    echo $(date +"%Y%m%d %H:%M") Processing indice ${indice}
    ${CURL_BIN} -XPOST http://${HOST}:9200/${indice}/_optimize?max_num_segments=1
    echo
  fi
done
exit 0

Conclusion

Reindexing a large Elasticsearch cluster with major data mode changes was quite interesting. It allowed us to push Elasticsearch and our hardware boundaries to reach a correct throughput. Yoko and Moulinette are now reusable for every Elasticsearch cluster we run at Synthesio, allowing reindexing within a same cluster or cross clusters.

 

原文地址:https://thoughts.t37.net/how-we-reindexed-36-billions-documents-in-5-days-within-the-same-elasticsearch-cluster-cd9c054d1db8#.yl7grlhlq

本文转载自:https://thoughts.t37.net/how-we-reindexed-36-billions-documents-in-5-days-within-the-same-elasticsea

共有 人打赏支持
不道归来
粉丝 2
博文 114
码字总数 14449
作品 0
南京
后端工程师
Spring Data Elasticsearch 和 x-pack 用户名/密码验证连接

使用Spring Data Elasticsearch连接elasticsearch时,正常情况下只需要在application.properites文件中添加如下配置即可连接: 以看到Spring Data Elasticsearch连接elasticsearch很简单。 ...

kipeng300
04/24
0
0
ELK 实验(六)elasticsearch集群搭建

本次实验使用3台虚拟机 192.168.209.168 192.168.209.169 192.168.209.170 cp /usr/elasticsearch-6.2.3/config/elasticsearch.yml /usr/elasticsearch-6.2.3/config/elasticsearch.yml.bak......

pcdog
04/20
0
0
elasticsearch 支持中英文搜索和混合搜索

环境: ubuntu16.04 安装: elasticsearch 5.22 1. 第一步,安装java apt-get install default-jre apt-get install default-jdk 2.第二步,安装elasticsearch. 5.22 wget https://artifacts.......

wensongyu
07/13
0
0
centos 7 安装 elasticsearch-6.2.4

安装ES mkdir /data/software/ tar xvf elasticsearch-6.2.4.tar.gz -C /data/software/ cd /data/software/ ln -sv elasticsearch-6.2.4/ elasticsearch 或者 rpm -ivh elasticsearch-6.2.4......

会说话的鱼
07/04
0
0
Centos7.4部署配置Elasticsearch5.6集群

参考文档 https://www.elastic.co/guide/en/elasticsearch/reference/5.6/index.html https://www.elastic.co/guide/cn/elasticsearch/guide/current/important-configuration-changes.html ......

minminmsn
07/09
0
0
kubernetes addons efk

一、简介 这个附加组件由Elasticsearch, Fluentd和Kibana组合而成。 通过结合这三个工具,我们获得了一个可扩展的,灵活的,易于使用的日志收集和分析管道。 Elasticsearch是一个搜索引擎,...

Bravepro
06/29
0
0
Elasticsearch 安装和配置

安装 elasticsearch 上传 elasticsearch-1.7.3.tar.gz 到 Linux环境 解压缩至 /usr/local 目录,并启动 elasticsearch 启动 elasticsearch 浏览器访问 http://192.168.81.132:9200/ ,出现如下......

o135248
04/16
0
0
Elasticsearch入门

Elasticsearch基本概念 Elasticsearch Elasticsearch是一个开源的可高度扩展的全文搜索和分析引擎。它可以近乎实时的存储、查询和分析数据。基于Lucene的分布式引擎。 Cluster 一个cluster可...

Gen_zhou
2015/10/30
0
0
Elastic Search学习笔记1——安装elasticsearch2.4.6

Elastic Search 简介 1.基于Apache Lucene的开源搜索引擎 2.采用Java编写 RESTful API风格 3.较容易的横向扩展 应用场景 1.海量数据分析引擎 2.数据搜索引擎 3.数据仓库 官网 https://www.el...

晨猫
03/09
0
0
Graylog——日志聚合工具中的后起之秀

日志管理工具总览 先看看 推荐!国外程序员整理的系统管理员资源大全 中,国外程序员整理的日志聚合工具的列表: 日志管理工具:收集,解析,可视化 Elasticsearch - 一个基于Lucene的文档存...

超爱fitnesse
2015/06/08
0
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
13
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
2
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0
java 8 复合Lambda 表达式

comparator 比较器复合 //排序Comparator.comparing(Apple::getWeight);List<Apple> list = Stream.of(new Apple(1, "a"), new Apple(2, "b"), new Apple(3, "c")) .collect(......

Canaan_
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部