こんにちは、アプリケーション基盤チームの渡辺です。IntelliJのコード補完はCtrl+;にバインドしています。
アプリケーション基盤チームでは、Necoプロジェクト(アーキテクチャ刷新プロジェクト)の一環として、 次世代の検索基盤を検討していて、その候補としてElasticsearchを調査しています。
先月の記事で再インデクシングと絡めてingest pluginの話をして、 びっくりするぐらい需要が低く、自分のテーマ選択のセンスのなさを痛感したのですが、 こじらせた感じで今日も再インデクシングの話をしたいと思います。
想定読者は、Elasticsearchにある程度慣れている方として、用語やAPI(インデックス, シャード, ScrollAPI, BulkAPIなど)の説明は最小限にします。
利用したElasticsearchのバージョンは5.0.0-alpha4です。2.X系だと無い機能も使っているので、 すぐに採用できる話ではないかもしれませんが、5.0.0が正式リリースされた後のアップグレードへのモチベーションの一つとなれば幸いです。
TL;DR
- ReindexAPIの代わりにScrollAPIとBulkAPIを使ったツールを作成して再インデクシングの高速化を探求した
- ツールをマルチスレッド化してBulkAPIを並列で実行した
- Sliced Scrollを使ってScrollAPIを並列で実行した
- preferenceの指定でローカルシャードのみを再インデクシングした
- NVMeを試してみた
- ツールでボトルネックになりがちなのはJSONの変換とHTTPの圧縮
- リソース使いすぎには要注意なので節度を持って取り組みましょう
何をどう速くするのか
今回は再インデクシングを速くすることがテーマです。再インデクシングにも色々(DBの全件スキャンが必要な場合など)あるのですが、今回は以下の条件を前提に高速化を探求していきます。
- Elasticsearchのクラスタは一つ
- クラスタ内の特定のインデックスからドキュメントを抜き出して、別のインデックスにそのドキュメントをインデクシングする
- 抜き出したドキュメントの加工はしない
つまり、Elasticsearch上の特定のインデックスのドキュメントを、まるっとそのまま別のインデックスにコピーするような場合です。 ユースケースとしては、アナライザの変更やシャード数の変更などが考えられます。
この前提を基に、Reindex APIによる再インデクシングから計測を始めます。 それを、Scroll APIと Bulk APIを使う自作ツール(以降、ReindexToolと呼ぶ)に変えて、 各APIの機能を駆使して並列度を高めたり、お金の力を使ったりして 高速化していきます。
elasticsearch.ymlの設定やヒープのチューニング等はまだ探求中のため、この記事では言及しません。 その辺りのパラメータチューニングに関しては、ある程度の情報がまとまったら別途記事にするかもしれません。
クラスタ構成
マシンスペック
以下のスペックを持つ8台のマシン(実際には異なる物理マシン上でlxcのコンテナとしてElasticsearchを動かしている)でElasticsearchクラスタを作成しました。 Elasticsearchのバージョンは5.0.0-alpha4です。
要素 | 性能 | 備考 |
---|---|---|
CPU | 2.4GHz,12core,24threads | |
Memory | 188GB | Elasticsearchのxmxは8GB |
HDD書き込み | 800MB/s | ddでsequential writeを簡易計測 |
HDD読み込み | 265MB/s | hdparmで簡易計測 |
ネットワーク | 10Gbps | iperfで計測 |
OS | Ubuntu 14.04 |
HDDに関してはRAID等が組み合わさった結果、シーケンシャルライトはそこそこ高速に動作しているように見えますが、 実際はランダムライトだったり、リードとライトが同時に実行されたりして、こんな数字はでないので、参考程度に考えてください。
ReindexToolとElasticsearchクラスタの基本構成
ReindexToolを使った場合の再インデクシングの最も単純な構成は以下の図のようになります。
基本的には、Elasticsearchクラスタ外のマシン上で、単一プロセスとしてReindexToolを実行し、以下の手順により、再インデクシングを実施します。
- 事前に、src-indexと同じマッピングでdest-indexを作成しておく
- ScrollAPIを使ってクラスタからsrc-indexのドキュメントを取得する
- 取得したドキュメントをBulkAPIでdest-indexに追加する
- 2,3の処理をScrollAPIが全てのドキュメントを取得するまで続ける
これは出発地点の基本的な構成ではありますが、ScrollAPIとBulkAPIで再インデクシングをする、という仕組みは最後まで変わりません。
ReindexToolには、「そこそこ速く動いて並行処理も簡単そう」という適当な理由で、Golangを採用しました。 内容的には公開して困るものでもないので、ある程度需要がありそうなら、体裁を整えたうえでそのうち公開するかもしれません。
対象インデックス
再インデクシングの対象となるインデックスについて、保持するデータ(ドキュメント)とマッピングについて説明します。 このインデックスをいかに速くコピーするかが、今回の探求の目的です。
データ
使用するデータは最新の日本語Wikipediaの全記事とします。 https://dumps.wikimedia.org/jawiki/latest/ からダウンロードでき、stream2esを使うことで全件インポートすることができます。 ドキュメントの総数は200万件強です。
./stream2es wiki --source /<path>/<to>/jawiki-latest-pages-articles.xml --log debug --target http://localhost:9200/src # targetのsrc indexは次節のmappingで予め作成しておく
マッピング
wikipediaのドキュメントをインポートするインデックスには、以下の設定のマッピングを使用します。
- _all: disable
- レプリカ数: 0
- シャード数: 16
- refresh_interval: -1
- index.merge.scheduler.max_thread_count: 1
_allは今回は使わないので無効にしています。レプリカ数を0にするのは大規模なインデクシングにおける定石です。 シャード数は特に根拠はないのですが、マシンの台数よりは多くしたいのでマシン台数×2の16シャードにしています。 index.refresh_intervalとindex.merge.scheduler.max_thread_countは公式ドキュメントのindexing-performanceのtipsに従って追加しました。
{ "settings": { "number_of_replicas": 0, "number_of_shards": 16, "index.refresh_interval": "-1", "index.merge.scheduler.max_thread_count" : 1 }, "mappings": { "_default_": { "_all": {"enabled": false } } } }
インデックスの規模
インポートが完了した後のインデックスの状態をcat indices APIを使って確認します。
$ curl 'es-watanabe-1:9200/_cat/indices?v' health status index pri rep docs.count docs.deleted store.size pri.store.size green open src 16 0 2050717 0 8.2gb 8.2gb
約200万ドキュメントで合計のサイズは8.2GBほどになりました。
測定方法
基本的には、再インデクシング完了までの実行時間を計測して比較します。この実行時間を短くすることが今回の目的です。
性能測定ということで、いい感じの性能のグラフとかを出せればよかったんですが、それをやるにはSAN値が足りなかったので、 以下のコマンドの結果を1秒毎にターミナルに流して、目視で性能を確認しながら、ボトルネックっぽいところを探して改善していきました。原始的ですね。
- dstat
$ dstat -tclmdrn ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 12-08 01:55:31| 1 0 99 0 0 0|0.09 0.21 0.36|38.0G 1139M 24.7G 125G|1797B 279k|0.05 23.9 | 0 0 12-08 01:55:32| 0 0 100 0 0 0|0.09 0.21 0.36|38.0G 1139M 24.7G 125G| 0 316k| 0 76.0 | 37k 22k 12-08 01:55:33| 0 0 100 0 0 0|0.09 0.21 0.36|38.0G 1139M 24.7G 125G| 0 308k| 0 72.0 | 37k 21k
$ while true; do curl 'es-watanabe-1:9200/_cat/thread_pool?v&h=id,host,bt,ba,bq,bs,bc,st,sa,sq,ss,sc'; sleep 1;done id host bt ba bq bs bc st sa sq ss sc vM4E es-watanabe-1 fixed 0 0 24 288 fixed 0 0 37 584 btge es-watanabe-2 fixed 0 0 24 13524 fixed 0 0 37 24143 zPof es-watanabe-3 fixed 0 0 24 13497 fixed 0 0 37 24115 3y0A es-watanabe-4 fixed 0 0 24 13609 fixed 0 0 37 24226 IyM- es-watanabe-5 fixed 0 0 24 12979 fixed 0 0 37 24616 iXmJ es-watanabe-6 fixed 0 0 24 13538 fixed 0 0 37 25176 iBLd es-watanabe-7 fixed 0 0 24 13388 fixed 0 0 37 25029 cKjt es-watanabe-8 fixed 0 0 24 13539 fixed 0 0 37 25173
dstatでリソース使用率を見てシステム全体のボトルネックを探しつつ、 bulk APIとsearch APIのthread_poolでどの程度並列に処理が実行されているかを確認できます。
前準備の説明が長くなりましたが、次節より実際に再インデクシングを始めていきます。
Reindex API
まずは、標準のReindexAPIを使って再インデクシングします。ReindexAPIでは内部でScrollAPIとBulkAPIを実行していて、 ScrollAPIのSize(以降、ScrollSizeと呼ぶ)を指定できるので、ScrollSizeを幾つか変更して実行時間を計測しました。
$ curl -X POST 'es-watanabe-1:9200/_reindex' -d '{ "source":{"index":"src", "size": 1000}, "dest" :{"index":"dest"} }'
結果は以下のとおりです。ScrollSizeにより変動はあるものの、最速で大体250秒ぐらいです。
ScrollSize | 実行時間 |
---|---|
1000 | 353秒 |
10000 | 297秒 |
100000 | 261秒 |
300000 | 248秒 |
500000 | 257秒 |
dstatを眺めていても、CPU Usageは数%、ディスク書き込みも数MB/sで、idleの時間も見受けられたため、 処理の並列度を高めてリソース使用率を向上させれば、まだまだ速くなる余地はありそうです。
ReindexToolでScrollAPIとBulkAPIを別スレッドで動かす
ReindexAPIは内部的にScrollAPIとBulkAPIを使っていると書きましたが、より詳細には、 ScrollAPIで取得したドキュメントをBulkAPIでインデクシングして、 それが完了したらScrollAPIの続きのドキュメントを取得してBulkAPIで、、、というように、逐次処理を行っています。
タスクの性質的に、ScrollAPIの処理とBulkAPIの処理は別々に進行しても問題ないはずなので、 ScrollAPIとBulkAPIを別のスレッドで実行するようにすれば、お互いの待ち時間が減少して速くなりそうです。
そこで、ReindexToolにドキュメントのキューを導入し、ScrollAPIとBulkAPIを別スレッドで実行するようにしました。
この構成でScrollSizeを変更した場合の実行時間は以下のようになりました。
ScrollSize | 実行時間 |
---|---|
5000 | 409秒 |
10000 | 397秒 |
20000 | 378秒 |
50000 | 399秒 |
残念なことに、ReindexAPIを用いた場合よりも遅くなってしまいました。 これは、直列部分を並列化した効果よりも、今までElasticsearchクラスタ内部で完了していた処理に対して ReindexToolを導入したことによるオーバヘッドの方が大きいためと考えられます。
しかし、自作のReindexToolを作成したことにより、ScrollAPIとBulkAPIを別々に最適化できるようにはなりました。 次は、これらのAPIにさらなる並列性を導入して、高速化を目指します。
Bulk APIを並列化する
BulkAPIのインデクシング処理は、シャード毎に分割されて並列で実行されます。 つまり、各ノードのBulkAPI用のスレッドが、スレッドプールからノード内のシャード数分払いだされます。 今回のクラスタにおいて、各ノードのBulkAPI用のスレッドプールのサイズは24でした。(cat thread_pool APIのbsの値)
$ curl 'es-watanabe-1:9200/_cat/thread_pool?v&h=id,host,bt,ba,bq,bs,bc,st,sa,sq,ss,sc' id host bt ba bq bs bc st sa sq ss sc vM4E es-watanabe-1 fixed 0 0 24 288 fixed 0 0 37 584
インデックスのシャード数が16で、ノード数が8であることを考えると、ReindexToolがBulkAPIをシングルスレッドで実行している場合、 各ノードのBulkAPI用のアクティブなスレッド(cat thread_pool APIのbaの値)は、たかだか2になるぐらいです。 bsが24なので、少なくともあと12倍はいけるはず、ということでReindexToolのBulkAPIをマルチスレッドで実行するように修正して、並列度を高めます。
また、ScrollAPIで取得したドキュメントをキューに追加する際の単位(BulkSize)も変更できるようにしました。 例えば、ScrollAPIで10000ドキュメントを取得したとして、BulkSizeを1000とすると、 1000ドキュメント毎の塊を10個、キューにエントリとして追加します。
ReindexToolのBulkAPIを実行するスレッド数(以降Bulk並列度と呼ぶ)を4とした場合、 4つのスレッドが各々、キューから1000ドキュメントの塊を取得し、BulkAPIを実行します。
Bulk並列度,BulkSize,ScrollSizeを幾つか変化させて、再インデクシングの実行時間を計測しました。結果は以下のようになります。
Bulk並列度 | BulkSize | ScrollSize | 実行時間 |
---|---|---|---|
12 | 1000 | 1000 | 86秒 |
12 | 2000 | 2000 | 84秒 |
12 | 2000 | 5000 | 92秒 |
24 | 1000 | 1000 | 86秒 |
24 | 2000 | 2000 | 83秒 |
24 | 2000 | 5000 | 92秒 |
ReindexAPIが250秒程度だったことを考えると、大分速くなりました。 dstatを見ても、ScrollAPIとBulkAPIを受け付けるes-watanabe-1では、30%程度のCPU使用率で、 それ以外のノードも10%程度のCPU使用率なので、1スレッドに比べればリソースを消費できるようになってきました。
# dstat # es-watanabe-1 ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 15-08 10:03:12| 28 1 70 0 0 1|3.58 4.39 3.80|66.0G 1123M 87.8G 33.7G| 0 17M| 0 614 | 191M 134M 15-08 10:03:13| 30 1 68 0 0 1|3.58 4.39 3.80|66.0G 1123M 87.8G 33.7G| 0 19M| 0 612 | 172M 119M 15-08 10:03:14| 32 1 66 0 0 1|3.86 4.44 3.82|66.0G 1123M 87.9G 33.6G| 0 53M| 0 778 | 171M 128M # other node ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 15-08 10:03:00| 10 1 89 0 0 0|0.72 1.31 1.20|70.4G 896M 84.5G 32.8G| 0 34M| 0 571 |7057k 16M 15-08 10:03:01| 10 1 88 0 0 0|1.14 1.38 1.23|70.4G 896M 84.5G 32.8G| 0 24M| 0 671 |5870k 10M 15-08 10:03:02| 7 1 92 0 0 0|1.14 1.38 1.23|70.4G 896M 84.5G 32.7G| 0 24M| 0 610 |4788k 12M
cat thread_pool APIの出力結果を見ると、Bulk並列度が24の場合も、baの値は10に届かない程度なので、 まだまだElasticsearchの並列処理能力は余力がありそうです。
# cat thread_pool id host bt ba bq bs bc st sa sq ss sc vM4E es-watanabe-1 fixed 3 0 24 30930 fixed 0 0 37 39803 btge es-watanabe-2 fixed 0 0 24 44118 fixed 0 0 37 63329 zPof es-watanabe-3 fixed 1 0 24 44135 fixed 0 0 37 63326 3y0A es-watanabe-4 fixed 1 0 24 44254 fixed 0 0 37 63444 IyM- es-watanabe-5 fixed 3 0 24 43603 fixed 0 0 37 63832 iXmJ es-watanabe-6 fixed 0 0 24 44175 fixed 0 0 37 64380 iBLd es-watanabe-7 fixed 3 0 24 44051 fixed 0 0 37 64256 cKjt es-watanabe-8 fixed 6 0 24 44186 fixed 0 0 37 64391
Scroll APIを並列化する
ここまでくると、ScrollAPIも並列に実行して、ドキュメントの取得も高速化したくなるのが人情というものです。 しかし、通常のScrollAPIは、全ドキュメントのスナップショットをとり、そのスナップショットを先頭からScrollSize毎に逐次的に取得する手段を提供するだけです。(RDBのカーソルのようなもの)
例えば、全ドキュメント数が6000でScrollSizeが1000の場合は、スナップショットは6つに分割され、先頭から順に1000ドキュメントずつ取得することができます。(下図の1,2,3,4,5,6の順で取得する)
スナップショットの先頭から順番に取得していかなければならないため、このままでは並列化できません。 そこで、Elasticsearch5.0.0から追加されたSliced Scrollの出番になります。
Sliced Scrollでは、ドキュメント全体のスナップショットを指定した数のSliceに分割し、個々のSliceを別々のScrollとして利用することができます。 例えば、Slice数を2とした場合は、下図のように2つのScrollに分割され、それぞれに0,1という識別子が与えられます。
Sliced Scrollを利用するためには、ScrollAPIのパラメータとしてsliceを指定します。 idが各Sliceの識別子でmaxは分割数です。例えば、ScrollAPIを2スレッドで並列に実行するには、 maxを2に指定して、一方のスレッドのidを0に、もう一方のスレッドのidを1にして、ScrollAPIを実行します。
# Thread1 curl -XGET 'localhost:9200/src/_search?scroll=1m' -d '{ "slice": { "id": 0, "max": 2 } }' # Thread2 curl -XGET 'localhost:9200/src/_search?scroll=1m' -d '{ "slice": { "id": 1, "max": 2 } }'
これで、ScrollAPIも並列で実行できるようになりました。結果として構成は下図のようになります。
Slice数を16として16スレッドで並列にScrollAPIを実行した場合、再インデクシングの実行時間は以下のようになります。
Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|
24 | 500 | 16 | 1000 | 66秒 |
24 | 1000 | 16 | 1000 | 68秒 |
24 | 2000 | 16 | 2000 | 68秒 |
24 | 2000 | 16 | 5000 | 71秒 |
そこそこ速くはなってきたのですが、dstatやcat thread_poolを見ると、まだまだリソースを使い切れていないようです。 もっとリソースを使いきってすっきりしたいところです。
# dstat # es-watanabe-1 ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 16-08 00:49:09| 33 1 65 0 0 1|2.67 1.67 1.24|64.2G 957M 87.5G 36.0G| 0 13M| 0 127 | 220M 163M 16-08 00:49:10| 41 2 57 0 0 1|2.67 1.67 1.24|64.2G 957M 87.5G 35.9G| 0 28M| 0 182 | 214M 132M 16-08 00:49:11| 42 1 56 0 0 1|2.67 1.67 1.24|64.2G 958M 87.6G 35.9G| 0 40M| 0 284 | 229M 169M # other node ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 16-08 00:49:03| 10 0 90 0 0 0|0.35 0.34 0.29|53.1G 750M 101G 33.4G| 0 21M| 0 170 |8458k 18M 16-08 00:49:04| 13 1 87 0 0 0|0.35 0.34 0.29|53.1G 750M 101G 33.4G| 0 29M| 0 202 |6299k 13M 16-08 00:49:05| 10 1 90 0 0 0|0.35 0.34 0.29|53.1G 750M 101G 33.4G| 0 31M| 0 257 |5250k 17M
# cat thread_pool id host bt ba bq bs bc st sa sq ss sc vM4E es-watanabe-1 fixed 4 0 24 43111 fixed 0 0 37 62488 btge es-watanabe-2 fixed 1 0 24 56264 fixed 0 0 37 85930 zPof es-watanabe-3 fixed 2 0 24 56315 fixed 0 0 37 85945 3y0A es-watanabe-4 fixed 5 0 24 56430 fixed 0 0 37 86064 IyM- es-watanabe-5 fixed 2 0 24 55774 fixed 0 0 37 86454 iXmJ es-watanabe-6 fixed 1 0 24 56351 fixed 1 0 37 86993 iBLd es-watanabe-7 fixed 3 0 24 56244 fixed 0 0 37 86884 cKjt es-watanabe-8 fixed 2 0 24 56379 fixed 0 0 37 87017
ローカルシャードのみを再インデクシングする
ドキュメントの取得とインデクシングを並列化したので、並列度を高めるという方向性はお腹いっぱいな感じです。 しかし、分散システムであるがゆえのブロック(複数のノードからのドキュメントの取得をまとめる処理など)や、 ネットワークを介したドキュメントの転送、ReindexTool自体の負荷分散などはまだまだ手付かずなので、ここに改善の余地はありそうです。
出発地点に立ち返ると、やりたいことはsrcからdestにドキュメントをコピーするだけでした。 また、構成図を見てわかるように、srcのシャードもdestのシャードも同じノード内に存在しています。
となると、各ノードでReindexToolを動かして、ドキュメントの移動はノード内だけに止めておけば、ノード間の通信も発生せず、なんだか幸せになれそうな気がします。
これを実現するためには、まず、自ノード内のシャードのドキュメントのみを取得する方法が必要です。 その方法として、検索APIのpreferenceパラメータが利用できます。 Elasticsearchの検索APIでは、preferenceパラメータによって、実行するシャードを指定できるので、 以下の手順でローカルシャードに対してのみScrollAPIを実行できます。(微妙感あるので他に良いやり方あったら教えてください!)
- ローカルノードのIDを取得 (Node Info API)
- ローカルノードのプライマリシャードを取得 (Search Shard API)
- 2で取得したシャードをpreferenceパラメータに指定してScrollAPIを実行
2の結果、ローカルシャードはshard1, shard2だったとすると、3のScrollAPIは以下のようになります。
curl -XGET 'localhost:9200/src/_search?scroll=1m&preference=_shards:1,2'
また、通信を無くすためには、ノード内のsrcシャードとdestシャードは同じidを持つように揃える必要があります。 今回は単一回の実験だったため、クラスタのRebalanceを無効化した後、 手動でCluster Reroute APIを叩いて、destシャードを移動して揃えました。
以上の前準備を行った後に、この構成のノードを図で表すと、下図のようになります。 今までの並列実行の仕組みはそのままで、ローカルシャードのみを再インデクシングの対象にしています。
この構成で、再インデクシングの実行時間を計測すると以下のようになりました。
Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|
24 | 500 | 16 | 1000 | 21秒 |
24 | 1000 | 16 | 1000 | 21秒 |
24 | 2000 | 16 | 2000 | 22秒 |
# dstat ----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total- time |usr sys idl wai hiq siq| 1m 5m 15m | used buff cach free| read writ| read writ| recv send 16-08 04:08:51| 80 4 16 0 0 0|7.02 3.80 1.76|54.1G 755M 102G 31.8G| 0 248M| 0 1371 |3544B 6761B 16-08 04:08:52| 83 3 13 0 0 0|8.46 4.15 1.88|54.1G 755M 102G 31.7G| 0 174M| 0 983 |3650B 6769B 16-08 04:08:53| 84 3 13 0 0 0|8.46 4.15 1.88|54.1G 755M 102G 31.6G| 0 114M| 0 748 |3544B 6773B # cat thread_pool id host bt ba bq bs bc st sa sq ss sc N1Ly es-watanabe-1 fixed 23 0 24 2588 fixed 2 0 37 10645 CN8- es-watanabe-2 fixed 20 0 24 10878 fixed 1 0 37 25810 AR56 es-watanabe-3 fixed 24 0 24 10943 fixed 1 0 37 25817 7lai es-watanabe-4 fixed 23 0 24 10932 fixed 1 0 37 25817 CXUS es-watanabe-5 fixed 24 0 24 10067 fixed 1 0 37 25838 p6oN es-watanabe-6 fixed 24 0 24 9883 fixed 1 0 37 25819 Q1u2 es-watanabe-7 fixed 23 0 24 10052 fixed 1 0 37 25856 0Ged es-watanabe-8 fixed 23 0 24 10808 fixed 1 0 37 25857
結果を見ると、実行時間も20秒程度でReindexAPIと比べて10倍以上高速になり、 CPUとディスクのリソース使用率も100%には届かないものの、かなり高まってきています。 これは、通信によるオーバーヘッドやブロックを取り除いたことも一因ですが、 ReindexTool自体の負荷を複数ノードに分散したことも大きいと考えられます。
ローカルシャードを他ノードのシャードに再インデクシングする
「ローカルシャードのみを再インデクシングする」の節では、ローカルシャードからローカルシャードへのドキュメントのコピーを想定して、ノード間の通信が発生しないようにしました。 しかし、この方法は、srcとdestのシャード数が同数の場合のみに適用できる方法です。
シャードを分割するなどして、srcとdestのシャード数が異なる場合は、 ローカルシャードへコピーすることもあれば、別ノードのシャードにコピーすることもあり、 BulkAPIを実行する際のノード間の通信は避けられません。
そこで、Cluster Reroute APIで、ノード間の通信が必ず発生するようにシャードを移動して、再インデクシングの実行時間を計測しました。 結果は以下です。
Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|
24 | 500 | 16 | 1000 | 24秒 |
24 | 1000 | 16 | 1000 | 24秒 |
24 | 2000 | 16 | 2000 | 24秒 |
先ほどの結果が21秒だったことを考えると、若干の劣化があるものの、十分高速に動作するようです。 ということで、シャード数を変更するようなユースケースにおいても、今回の再インデクシングの仕組みは有効と考えられます。
NVMe SSDのマシンで実行する
ディスクアクセスが大量に発生するワークロードにも関わらず、HDDはよく頑張ってくれて、かなり高速に動作するようになりました。 しかし、秒間数百MB程度の書き込みが実行されている状況なので、お金の力でHDDをSSDにしたら、さっくり速くなるのでは、という淡い期待が持てます。 ちょうどいいことに、検証用のNVMeを搭載したマシンがあったので、これを使って実行時間を計測してみます。
CPUとディスクのみですが、マシンスペックは以下です。マシンの関係上、CPUもかなりパワーアップしています。
要素 | 性能 | 備考 |
---|---|---|
CPU | 2.6GHz,20core,40threads | |
SSD書き込み | 1.1GB/s | ddでsequential writeを簡易計測 |
SSD読み込み | 584MB/s | hdparmで簡易計測 |
HDDの場合と同様に、マシン8台でクラスタを組んで、ローカルシャードの再インデクシングを実施したところ、実行時間は以下のようになりました。
CPUがパワーアップしたので並列度も上げて試しました。また、マッピングのindex.merge.scheduler.max_thread_count:1
はHDD用の設定なので、今回はデフォルト値を使っています。
Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|
24 | 500 | 16 | 1000 | 16秒 |
24 | 1000 | 16 | 1000 | 16秒 |
48 | 500 | 32 | 1000 | 15秒 |
48 | 1000 | 32 | 1000 | 16秒 |
速くはなりましたが、十数秒で終わってしまうので、ツールの立ち上げ処理などの時間比率が高くなり、 再インデクシングの時間が正しく計測できているか怪しいです。
そこで、srcのドキュメントを複製することで、ドキュメント数を増やして再度計測してみます。 srcのドキュメント数を5倍に複製した結果のインデックスの状態は以下のようになりました。 1000万ドキュメント、44GBです。
$ curl 'nvmenode-1:9200/_cat/indices' green open src 16 0 10253585 0 44.4gb 44.4gb
これを同様の方法で再インデクシングします。データサイズが大きくなったので、シャード数も変化させてみました。 実行時間は以下です。
シャード数 | Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|---|
16 | 24 | 500 | 16 | 1000 | 71秒 |
16 | 24 | 1000 | 16 | 1000 | 72秒 |
16 | 48 | 500 | 32 | 1000 | 73秒 |
16 | 48 | 1000 | 32 | 1000 | 72秒 |
32 | 24 | 500 | 16 | 1000 | 62秒 |
32 | 24 | 1000 | 16 | 1000 | 62秒 |
44GBの再インデクシングが約60秒で終わりました。 今回のデータセットに対しては、約700MB/s(16万5千docs/s)程度の速度でインデクシングできています。 秒間1GBも見えてきて、Elasticsearch5.0.0の夢が広がりますね。
参考までに、32シャード、1000万ドキュメントのHDDの場合も計測したところ以下のようになりました。 結果として、112秒かかったので、お金の力によって2倍近く速くなったことになります。
シャード数 | Bulk並列度 | BulkSize | Slice数 | ScrollSize | 実行時間 |
---|---|---|---|---|---|
32 | 24 | 1000 | 16 | 1000 | 112秒(HDD) |
ReindexToolのチューニング
最後に、Elasticsearchに関することではないのですが、ReindexToolを作成する中で効果が高かったチューニングが2つほどあったので紹介します。 ReindexToolはGoで書いているので、pprofというプロファイラを使ってチューニングしました。
まず、初期段階のツールでは、ScrollAPIから取得したレスポンスのJSONをパースしてstructとして扱い、 その中から_sourceを抜き出す、という処理を行っていました。 その時のプロファイル結果(開発用のローカル環境で計測)は以下です。
(pprof) top 10 77s of 135.57s total (56.80%) Dropped 364 nodes (cum <= 0.68s) Showing top 10 nodes out of 115 (cum >= 10.31s) flat flat% sum% cum cum% 10.43s 7.69% 7.69% 15.63s 11.53% encoding/json.(*decodeState).scanWhile 10.38s 7.66% 15.35% 10.42s 7.69% encoding/json.stateInString 8.54s 6.30% 21.65% 16.40s 12.10% runtime.scanobject 8.04s 5.93% 27.58% 16.03s 11.82% compress/flate.(*decompressor).huffSym 7.95s 5.86% 33.44% 14.20s 10.47% encoding/json.checkValid 7.60s 5.61% 39.05% 35.35s 26.08% compress/flate.(*decompressor).huffmanBlock 7.12s 5.25% 44.30% 7.12s 5.25% runtime.memmove 6.25s 4.61% 48.91% 14.39s 10.61% encoding/json.unquoteBytes 5.46s 4.03% 52.94% 5.46s 4.03% unicode/utf8.DecodeRune 5.23s 3.86% 56.80% 10.31s 7.60% compress/flate.(*decompressor).moreBits
ほぼ、JSONの処理です。やりたいことはドキュメントのコピーだけなので、JSON的な処理を頑張らずにどうにかしたいものです。 そこで、jsonparserというライブラリを使い、structへの変換などを省略するようにしました。結果は以下です。
(pprof) top10 53.29s of 58.24s total (91.50%) Dropped 229 nodes (cum <= 0.29s) Showing top 10 nodes out of 91 (cum >= 1.29s) flat flat% sum% cum cum% 9.39s 16.12% 16.12% 9.39s 16.12% github.com/buger/jsonparser.stringEnd 8.62s 14.80% 30.92% 17.18s 29.50% compress/flate.(*decompressor).huffSym 7.83s 13.44% 44.37% 36.64s 62.91% compress/flate.(*decompressor).huffmanBlock 6.77s 11.62% 55.99% 6.77s 11.62% runtime.memmove 5.35s 9.19% 65.18% 10.71s 18.39% compress/flate.(*decompressor).moreBits 4.75s 8.16% 73.33% 5.36s 9.20% bufio.(*Reader).ReadByte 4.09s 7.02% 80.36% 9.61s 16.50% compress/flate.(*decompressor).copyHist 2.88s 4.95% 85.30% 5.52s 9.48% compress/flate.forwardCopy 2.39s 4.10% 89.41% 2.39s 4.10% runtime.memclr 1.22s 2.09% 91.50% 1.29s 2.21% syscall.Syscall
2倍以上速くなりました。だいぶいい感じです。あとはhttpのgzipの処理も重そうです。 ローカルシャードのコピーだけなら、gzipする必要はなさそうなので、gzipの処理も外しました。結果は以下です。
(pprof) top10 20470ms of 22450ms total (91.18%) Dropped 213 nodes (cum <= 112.25ms) Showing top 10 nodes out of 88 (cum >= 4380ms) flat flat% sum% cum cum% 10410ms 46.37% 46.37% 10410ms 46.37% github.com/buger/jsonparser.stringEnd 4250ms 18.93% 65.30% 4250ms 18.93% runtime.memmove 2480ms 11.05% 76.35% 2480ms 11.05% runtime.memclr 1600ms 7.13% 83.47% 1650ms 7.35% syscall.Syscall 780ms 3.47% 86.95% 11150ms 49.67% github.com/buger/jsonparser.blockEnd 300ms 1.34% 88.29% 300ms 1.34% runtime.heapBitsForObject 180ms 0.8% 89.09% 210ms 0.94% runtime.greyobject 170ms 0.76% 89.84% 170ms 0.76% runtime.procyield 160ms 0.71% 90.56% 2990ms 13.32% runtime.mallocgc 140ms 0.62% 91.18% 4380ms 19.51% github.com/buger/jsonparser.EachKey
初期の6倍程度速くなりました。おそらくまだチューニングできるとは思うのですが、Go力が足りていないのでここまでにしておきます。
ということで、再インデクシングという単純なツールにおいては、以下の処理がボトルネックになりがちなので注意しましょう。
- HTTPの圧縮展開
- JSONのEncode/Decode
注意事項
Elasticsearchクラスタの性能を振り絞ることで、再インデクシングを高速化してきましたが、以下の点には注意が必要です。
- ユーザー操作の検索リクエストなどを停止できないような状況では、再インデクシングでマシンリソースは使いきらないようにする
- Sliced ScrollのSlice数はシャード数までに押さえておくと高速に動作する。(リファレンスのNoteを参照)
- Bulk並列度を上げ過ぎると、BulkAPIタスクのキューが溢れてrejectが発生する。rejectが発生するとそのタスクが捨てられるので、ドキュメントのコピーが不完全になる。
reject対策の方法は、processorsの設定をしたり、thread_poolの設定をいじったりなど色々あるのですが、 かなりピーキーな設定になるので、限界まで性能を引き出す必要がなければBulk並列度を抑えるのが無難だと思います。節度のあるリソース利用を心がけましょう!
まとめ
本記事では、特定ユースケースにおける再インデクシングの高速化を探求しました。 結果として、ScrollAPIとBulkAPIを使い倒すことによって、ReindexAPIをそのまま使うよりも10倍以上高速に動作するようになりました。 また、wikipediaの日本語ドキュメントというデータセットにおいて、NVMeを使った8ノードのクラスタで、約700MB/s(16万5千docs/s)のスループットを実現しました。
今回は、ドキュメントを加工しないケースの再インデクシングを対象としましたが、 ユースケースが異なる場合は、個々の高速化テクニックを状況に合わせて部分的に採用して頂ければと思います。 また、SlicedScrollなど、5.0.0からの機能もあるので、バージョンアップを検討する際の要因の一つと考えて頂ければ幸いです。
検索システムを運用していくうえで、再インデクシングは何かと頭を悩ませる問題です。 私達も現状の検索システムでこの問題に悩まされてきたので、本記事がこの種の問題に頭を悩ませている方々の何らかの参考になれば嬉しいです。
We are hiring!!!
サイボウズでは Elasticsearch 大好きなエンジニアを募集しています!
キャリア採用 募集要項/イベント | サイボウズ株式会社
参考
How we reindexed 36 billion documents in 5 days within the same Elasticsearch cluster