Elasticsearch 5.0.0で再インデクシングの高速化を探求する

こんにちは、アプリケーション基盤チームの渡辺です。IntelliJのコード補完はCtrl+;にバインドしています。

アプリケーション基盤チームでは、Necoプロジェクト(アーキテクチャ刷新プロジェクト)の一環として、 次世代の検索基盤を検討していて、その候補としてElasticsearchを調査しています。

先月の記事で再インデクシングと絡めてingest pluginの話をして、 びっくりするぐらい需要が低く、自分のテーマ選択のセンスのなさを痛感したのですが、 こじらせた感じで今日も再インデクシングの話をしたいと思います。

想定読者は、Elasticsearchにある程度慣れている方として、用語やAPI(インデックス, シャード, ScrollAPI, BulkAPIなど)の説明は最小限にします。

利用したElasticsearchのバージョンは5.0.0-alpha4です。2.X系だと無い機能も使っているので、 すぐに採用できる話ではないかもしれませんが、5.0.0が正式リリースされた後のアップグレードへのモチベーションの一つとなれば幸いです。

TL;DR

  • ReindexAPIの代わりにScrollAPIとBulkAPIを使ったツールを作成して再インデクシングの高速化を探求した
  • ツールをマルチスレッド化してBulkAPIを並列で実行した
  • Sliced Scrollを使ってScrollAPIを並列で実行した
  • preferenceの指定でローカルシャードのみを再インデクシングした
  • NVMeを試してみた
  • ツールでボトルネックになりがちなのはJSONの変換とHTTPの圧縮
  • リソース使いすぎには要注意なので節度を持って取り組みましょう

何をどう速くするのか

今回は再インデクシングを速くすることがテーマです。再インデクシングにも色々(DBの全件スキャンが必要な場合など)あるのですが、今回は以下の条件を前提に高速化を探求していきます。

  • Elasticsearchのクラスタは一つ
  • クラスタ内の特定のインデックスからドキュメントを抜き出して、別のインデックスにそのドキュメントをインデクシングする
  • 抜き出したドキュメントの加工はしない

つまり、Elasticsearch上の特定のインデックスのドキュメントを、まるっとそのまま別のインデックスにコピーするような場合です。 ユースケースとしては、アナライザの変更やシャード数の変更などが考えられます。

この前提を基に、Reindex APIによる再インデクシングから計測を始めます。 それを、Scroll APIBulk APIを使う自作ツール(以降、ReindexToolと呼ぶ)に変えて、 各APIの機能を駆使して並列度を高めたり、お金の力を使ったりして 高速化していきます。

elasticsearch.ymlの設定やヒープのチューニング等はまだ探求中のため、この記事では言及しません。 その辺りのパラメータチューニングに関しては、ある程度の情報がまとまったら別途記事にするかもしれません。

クラスタ構成

マシンスペック

以下のスペックを持つ8台のマシン(実際には異なる物理マシン上でlxcのコンテナとしてElasticsearchを動かしている)でElasticsearchクラスタを作成しました。 Elasticsearchのバージョンは5.0.0-alpha4です。

要素 性能 備考
CPU 2.4GHz,12core,24threads
Memory 188GB Elasticsearchのxmxは8GB
HDD書き込み 800MB/s ddでsequential writeを簡易計測
HDD読み込み 265MB/s hdparmで簡易計測
ネットワーク 10Gbps iperfで計測
OS Ubuntu 14.04

HDDに関してはRAID等が組み合わさった結果、シーケンシャルライトはそこそこ高速に動作しているように見えますが、 実際はランダムライトだったり、リードとライトが同時に実行されたりして、こんな数字はでないので、参考程度に考えてください。

ReindexToolとElasticsearchクラスタの基本構成

ReindexToolを使った場合の再インデクシングの最も単純な構成は以下の図のようになります。

f:id:cybozuinsideout:20160816171730p:plain

基本的には、Elasticsearchクラスタ外のマシン上で、単一プロセスとしてReindexToolを実行し、以下の手順により、再インデクシングを実施します。

  1. 事前に、src-indexと同じマッピングでdest-indexを作成しておく
  2. ScrollAPIを使ってクラスタからsrc-indexのドキュメントを取得する
  3. 取得したドキュメントをBulkAPIでdest-indexに追加する
  4. 2,3の処理をScrollAPIが全てのドキュメントを取得するまで続ける

これは出発地点の基本的な構成ではありますが、ScrollAPIとBulkAPIで再インデクシングをする、という仕組みは最後まで変わりません。

ReindexToolには、「そこそこ速く動いて並行処理も簡単そう」という適当な理由で、Golangを採用しました。 内容的には公開して困るものでもないので、ある程度需要がありそうなら、体裁を整えたうえでそのうち公開するかもしれません。

対象インデックス

再インデクシングの対象となるインデックスについて、保持するデータ(ドキュメント)とマッピングについて説明します。 このインデックスをいかに速くコピーするかが、今回の探求の目的です。

データ

使用するデータは最新の日本語Wikipediaの全記事とします。 https://dumps.wikimedia.org/jawiki/latest/ からダウンロードでき、stream2esを使うことで全件インポートすることができます。 ドキュメントの総数は200万件強です。

./stream2es wiki --source /<path>/<to>/jawiki-latest-pages-articles.xml --log debug --target http://localhost:9200/src
# targetのsrc indexは次節のmappingで予め作成しておく

マッピング

wikipediaのドキュメントをインポートするインデックスには、以下の設定のマッピングを使用します。

  • _all: disable
  • レプリカ数: 0
  • シャード数: 16
  • refresh_interval: -1
  • index.merge.scheduler.max_thread_count: 1

_allは今回は使わないので無効にしています。レプリカ数を0にするのは大規模なインデクシングにおける定石です。 シャード数は特に根拠はないのですが、マシンの台数よりは多くしたいのでマシン台数×2の16シャードにしています。 index.refresh_intervalとindex.merge.scheduler.max_thread_countは公式ドキュメントのindexing-performanceのtipsに従って追加しました。

{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 16,
    "index.refresh_interval": "-1",
    "index.merge.scheduler.max_thread_count" : 1
  },
  "mappings": {
    "_default_": {
      "_all": {"enabled": false }
    }
  }
}

インデックスの規模

インポートが完了した後のインデックスの状態をcat indices APIを使って確認します。

$ curl 'es-watanabe-1:9200/_cat/indices?v'
health status index   pri rep docs.count docs.deleted store.size pri.store.size
green  open   src     16   0    2050717            0      8.2gb          8.2gb

約200万ドキュメントで合計のサイズは8.2GBほどになりました。

測定方法

基本的には、再インデクシング完了までの実行時間を計測して比較します。この実行時間を短くすることが今回の目的です。

性能測定ということで、いい感じの性能のグラフとかを出せればよかったんですが、それをやるにはSAN値が足りなかったので、 以下のコマンドの結果を1秒毎にターミナルに流して、目視で性能を確認しながら、ボトルネックっぽいところを探して改善していきました。原始的ですね。

  • dstat
$ dstat -tclmdrn
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
12-08 01:55:31|  1   0  99   0   0   0|0.09 0.21 0.36|38.0G 1139M 24.7G  125G|1797B  279k|0.05  23.9 |   0     0
12-08 01:55:32|  0   0 100   0   0   0|0.09 0.21 0.36|38.0G 1139M 24.7G  125G|   0   316k|   0  76.0 |  37k   22k
12-08 01:55:33|  0   0 100   0   0   0|0.09 0.21 0.36|38.0G 1139M 24.7G  125G|   0   308k|   0  72.0 |  37k   21k
$ while true; do curl 'es-watanabe-1:9200/_cat/thread_pool?v&h=id,host,bt,ba,bq,bs,bc,st,sa,sq,ss,sc'; sleep 1;done
id   host          bt    ba bq bs    bc st    sa sq ss    sc
vM4E es-watanabe-1 fixed  0  0 24   288 fixed  0  0 37   584
btge es-watanabe-2 fixed  0  0 24 13524 fixed  0  0 37 24143
zPof es-watanabe-3 fixed  0  0 24 13497 fixed  0  0 37 24115
3y0A es-watanabe-4 fixed  0  0 24 13609 fixed  0  0 37 24226
IyM- es-watanabe-5 fixed  0  0 24 12979 fixed  0  0 37 24616
iXmJ es-watanabe-6 fixed  0  0 24 13538 fixed  0  0 37 25176
iBLd es-watanabe-7 fixed  0  0 24 13388 fixed  0  0 37 25029
cKjt es-watanabe-8 fixed  0  0 24 13539 fixed  0  0 37 25173

dstatでリソース使用率を見てシステム全体のボトルネックを探しつつ、 bulk APIとsearch APIのthread_poolでどの程度並列に処理が実行されているかを確認できます。

前準備の説明が長くなりましたが、次節より実際に再インデクシングを始めていきます。

Reindex API

まずは、標準のReindexAPIを使って再インデクシングします。ReindexAPIでは内部でScrollAPIとBulkAPIを実行していて、 ScrollAPIのSize(以降、ScrollSizeと呼ぶ)を指定できるので、ScrollSizeを幾つか変更して実行時間を計測しました。

$ curl -X POST 'es-watanabe-1:9200/_reindex' -d '{
  "source":{"index":"src", "size": 1000},
  "dest"  :{"index":"dest"}
}'

結果は以下のとおりです。ScrollSizeにより変動はあるものの、最速で大体250秒ぐらいです。

ScrollSize 実行時間
1000 353秒
10000 297秒
100000 261秒
300000 248秒
500000 257秒

dstatを眺めていても、CPU Usageは数%、ディスク書き込みも数MB/sで、idleの時間も見受けられたため、 処理の並列度を高めてリソース使用率を向上させれば、まだまだ速くなる余地はありそうです。

ReindexToolでScrollAPIとBulkAPIを別スレッドで動かす

ReindexAPIは内部的にScrollAPIとBulkAPIを使っていると書きましたが、より詳細には、 ScrollAPIで取得したドキュメントをBulkAPIでインデクシングして、 それが完了したらScrollAPIの続きのドキュメントを取得してBulkAPIで、、、というように、逐次処理を行っています。

タスクの性質的に、ScrollAPIの処理とBulkAPIの処理は別々に進行しても問題ないはずなので、 ScrollAPIとBulkAPIを別のスレッドで実行するようにすれば、お互いの待ち時間が減少して速くなりそうです。

f:id:cybozuinsideout:20160816171717p:plain

そこで、ReindexToolにドキュメントのキューを導入し、ScrollAPIとBulkAPIを別スレッドで実行するようにしました。

f:id:cybozuinsideout:20160816171740p:plain

この構成でScrollSizeを変更した場合の実行時間は以下のようになりました。

ScrollSize 実行時間
5000 409秒
10000 397秒
20000 378秒
50000 399秒

残念なことに、ReindexAPIを用いた場合よりも遅くなってしまいました。 これは、直列部分を並列化した効果よりも、今までElasticsearchクラスタ内部で完了していた処理に対して ReindexToolを導入したことによるオーバヘッドの方が大きいためと考えられます。

しかし、自作のReindexToolを作成したことにより、ScrollAPIとBulkAPIを別々に最適化できるようにはなりました。 次は、これらのAPIにさらなる並列性を導入して、高速化を目指します。

Bulk APIを並列化する

BulkAPIのインデクシング処理は、シャード毎に分割されて並列で実行されます。 つまり、各ノードのBulkAPI用のスレッドが、スレッドプールからノード内のシャード数分払いだされます。 今回のクラスタにおいて、各ノードのBulkAPI用のスレッドプールのサイズは24でした。(cat thread_pool APIのbsの値)

$ curl 'es-watanabe-1:9200/_cat/thread_pool?v&h=id,host,bt,ba,bq,bs,bc,st,sa,sq,ss,sc'
id   host          bt    ba bq bs    bc st    sa sq ss    sc
vM4E es-watanabe-1 fixed  0  0 24   288 fixed  0  0 37   584

インデックスのシャード数が16で、ノード数が8であることを考えると、ReindexToolがBulkAPIをシングルスレッドで実行している場合、 各ノードのBulkAPI用のアクティブなスレッド(cat thread_pool APIのbaの値)は、たかだか2になるぐらいです。 bsが24なので、少なくともあと12倍はいけるはず、ということでReindexToolのBulkAPIをマルチスレッドで実行するように修正して、並列度を高めます。

また、ScrollAPIで取得したドキュメントをキューに追加する際の単位(BulkSize)も変更できるようにしました。 例えば、ScrollAPIで10000ドキュメントを取得したとして、BulkSizeを1000とすると、 1000ドキュメント毎の塊を10個、キューにエントリとして追加します。

ReindexToolのBulkAPIを実行するスレッド数(以降Bulk並列度と呼ぶ)を4とした場合、 4つのスレッドが各々、キューから1000ドキュメントの塊を取得し、BulkAPIを実行します。

f:id:cybozuinsideout:20160816171802p:plain

Bulk並列度,BulkSize,ScrollSizeを幾つか変化させて、再インデクシングの実行時間を計測しました。結果は以下のようになります。

Bulk並列度 BulkSize ScrollSize 実行時間
12 1000 1000 86秒
12 2000 2000 84秒
12 2000 5000 92秒
24 1000 1000 86秒
24 2000 2000 83秒
24 2000 5000 92秒

ReindexAPIが250秒程度だったことを考えると、大分速くなりました。 dstatを見ても、ScrollAPIとBulkAPIを受け付けるes-watanabe-1では、30%程度のCPU使用率で、 それ以外のノードも10%程度のCPU使用率なので、1スレッドに比べればリソースを消費できるようになってきました。

# dstat
# es-watanabe-1
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
15-08 10:03:12| 28   1  70   0   0   1|3.58 4.39 3.80|66.0G 1123M 87.8G 33.7G|   0    17M|   0   614 | 191M  134M
15-08 10:03:13| 30   1  68   0   0   1|3.58 4.39 3.80|66.0G 1123M 87.8G 33.7G|   0    19M|   0   612 | 172M  119M
15-08 10:03:14| 32   1  66   0   0   1|3.86 4.44 3.82|66.0G 1123M 87.9G 33.6G|   0    53M|   0   778 | 171M  128M

# other node
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
15-08 10:03:00| 10   1  89   0   0   0|0.72 1.31 1.20|70.4G  896M 84.5G 32.8G|   0    34M|   0   571 |7057k   16M
15-08 10:03:01| 10   1  88   0   0   0|1.14 1.38 1.23|70.4G  896M 84.5G 32.8G|   0    24M|   0   671 |5870k   10M
15-08 10:03:02|  7   1  92   0   0   0|1.14 1.38 1.23|70.4G  896M 84.5G 32.7G|   0    24M|   0   610 |4788k   12M

cat thread_pool APIの出力結果を見ると、Bulk並列度が24の場合も、baの値は10に届かない程度なので、 まだまだElasticsearchの並列処理能力は余力がありそうです。

# cat thread_pool
id   host          bt    ba bq bs    bc st    sa sq ss    sc
vM4E es-watanabe-1 fixed  3  0 24 30930 fixed  0  0 37 39803
btge es-watanabe-2 fixed  0  0 24 44118 fixed  0  0 37 63329
zPof es-watanabe-3 fixed  1  0 24 44135 fixed  0  0 37 63326
3y0A es-watanabe-4 fixed  1  0 24 44254 fixed  0  0 37 63444
IyM- es-watanabe-5 fixed  3  0 24 43603 fixed  0  0 37 63832
iXmJ es-watanabe-6 fixed  0  0 24 44175 fixed  0  0 37 64380
iBLd es-watanabe-7 fixed  3  0 24 44051 fixed  0  0 37 64256
cKjt es-watanabe-8 fixed  6  0 24 44186 fixed  0  0 37 64391

Scroll APIを並列化する

ここまでくると、ScrollAPIも並列に実行して、ドキュメントの取得も高速化したくなるのが人情というものです。 しかし、通常のScrollAPIは、全ドキュメントのスナップショットをとり、そのスナップショットを先頭からScrollSize毎に逐次的に取得する手段を提供するだけです。(RDBのカーソルのようなもの)

例えば、全ドキュメント数が6000でScrollSizeが1000の場合は、スナップショットは6つに分割され、先頭から順に1000ドキュメントずつ取得することができます。(下図の1,2,3,4,5,6の順で取得する)

f:id:cybozuinsideout:20160816171859p:plain

スナップショットの先頭から順番に取得していかなければならないため、このままでは並列化できません。 そこで、Elasticsearch5.0.0から追加されたSliced Scrollの出番になります。

Sliced Scrollでは、ドキュメント全体のスナップショットを指定した数のSliceに分割し、個々のSliceを別々のScrollとして利用することができます。 例えば、Slice数を2とした場合は、下図のように2つのScrollに分割され、それぞれに0,1という識別子が与えられます。

f:id:cybozuinsideout:20160816171830p:plain

Sliced Scrollを利用するためには、ScrollAPIのパラメータとしてsliceを指定します。 idが各Sliceの識別子でmaxは分割数です。例えば、ScrollAPIを2スレッドで並列に実行するには、 maxを2に指定して、一方のスレッドのidを0に、もう一方のスレッドのidを1にして、ScrollAPIを実行します。

# Thread1
curl -XGET 'localhost:9200/src/_search?scroll=1m' -d '{
    "slice": {
        "id": 0,
        "max": 2
    }
}'

# Thread2
curl -XGET 'localhost:9200/src/_search?scroll=1m' -d '{
    "slice": {
        "id": 1,
        "max": 2
    }
}'

これで、ScrollAPIも並列で実行できるようになりました。結果として構成は下図のようになります。

f:id:cybozuinsideout:20160816171805p:plain

Slice数を16として16スレッドで並列にScrollAPIを実行した場合、再インデクシングの実行時間は以下のようになります。

Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
24 500 16 1000 66秒
24 1000 16 1000 68秒
24 2000 16 2000 68秒
24 2000 16 5000 71秒

そこそこ速くはなってきたのですが、dstatやcat thread_poolを見ると、まだまだリソースを使い切れていないようです。 もっとリソースを使いきってすっきりしたいところです。

# dstat
# es-watanabe-1
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
16-08 00:49:09| 33   1  65   0   0   1|2.67 1.67 1.24|64.2G  957M 87.5G 36.0G|   0    13M|   0   127 | 220M  163M
16-08 00:49:10| 41   2  57   0   0   1|2.67 1.67 1.24|64.2G  957M 87.5G 35.9G|   0    28M|   0   182 | 214M  132M
16-08 00:49:11| 42   1  56   0   0   1|2.67 1.67 1.24|64.2G  958M 87.6G 35.9G|   0    40M|   0   284 | 229M  169M

# other node
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
16-08 00:49:03| 10   0  90   0   0   0|0.35 0.34 0.29|53.1G  750M  101G 33.4G|   0    21M|   0   170 |8458k   18M
16-08 00:49:04| 13   1  87   0   0   0|0.35 0.34 0.29|53.1G  750M  101G 33.4G|   0    29M|   0   202 |6299k   13M
16-08 00:49:05| 10   1  90   0   0   0|0.35 0.34 0.29|53.1G  750M  101G 33.4G|   0    31M|   0   257 |5250k   17M
# cat thread_pool
id   host          bt    ba bq bs    bc st    sa sq ss    sc
vM4E es-watanabe-1 fixed  4  0 24 43111 fixed  0  0 37 62488
btge es-watanabe-2 fixed  1  0 24 56264 fixed  0  0 37 85930
zPof es-watanabe-3 fixed  2  0 24 56315 fixed  0  0 37 85945
3y0A es-watanabe-4 fixed  5  0 24 56430 fixed  0  0 37 86064
IyM- es-watanabe-5 fixed  2  0 24 55774 fixed  0  0 37 86454
iXmJ es-watanabe-6 fixed  1  0 24 56351 fixed  1  0 37 86993
iBLd es-watanabe-7 fixed  3  0 24 56244 fixed  0  0 37 86884
cKjt es-watanabe-8 fixed  2  0 24 56379 fixed  0  0 37 87017

ローカルシャードのみを再インデクシングする

ドキュメントの取得とインデクシングを並列化したので、並列度を高めるという方向性はお腹いっぱいな感じです。 しかし、分散システムであるがゆえのブロック(複数のノードからのドキュメントの取得をまとめる処理など)や、 ネットワークを介したドキュメントの転送、ReindexTool自体の負荷分散などはまだまだ手付かずなので、ここに改善の余地はありそうです。

出発地点に立ち返ると、やりたいことはsrcからdestにドキュメントをコピーするだけでした。 また、構成図を見てわかるように、srcのシャードもdestのシャードも同じノード内に存在しています。

となると、各ノードでReindexToolを動かして、ドキュメントの移動はノード内だけに止めておけば、ノード間の通信も発生せず、なんだか幸せになれそうな気がします。

これを実現するためには、まず、自ノード内のシャードのドキュメントのみを取得する方法が必要です。 その方法として、検索APIのpreferenceパラメータが利用できます。 Elasticsearchの検索APIでは、preferenceパラメータによって、実行するシャードを指定できるので、 以下の手順でローカルシャードに対してのみScrollAPIを実行できます。(微妙感あるので他に良いやり方あったら教えてください!)

  1. ローカルノードのIDを取得 (Node Info API)
  2. ローカルノードのプライマリシャードを取得 (Search Shard API)
  3. 2で取得したシャードをpreferenceパラメータに指定してScrollAPIを実行

2の結果、ローカルシャードはshard1, shard2だったとすると、3のScrollAPIは以下のようになります。

curl -XGET 'localhost:9200/src/_search?scroll=1m&preference=_shards:1,2'

また、通信を無くすためには、ノード内のsrcシャードとdestシャードは同じidを持つように揃える必要があります。 今回は単一回の実験だったため、クラスタのRebalanceを無効化した後、 手動でCluster Reroute APIを叩いて、destシャードを移動して揃えました。

f:id:cybozuinsideout:20160816171820p:plain

以上の前準備を行った後に、この構成のノードを図で表すと、下図のようになります。 今までの並列実行の仕組みはそのままで、ローカルシャードのみを再インデクシングの対象にしています。

f:id:cybozuinsideout:20160816171807p:plain

この構成で、再インデクシングの実行時間を計測すると以下のようになりました。

Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
24 500 16 1000 21秒
24 1000 16 1000 21秒
24 2000 16 2000 22秒
# dstat
----system---- ----total-cpu-usage---- ---load-avg--- ------memory-usage----- -dsk/total- --io/total- -net/total-
     time     |usr sys idl wai hiq siq| 1m   5m  15m | used  buff  cach  free| read  writ| read  writ| recv  send
16-08 04:08:51| 80   4  16   0   0   0|7.02 3.80 1.76|54.1G  755M  102G 31.8G|   0   248M|   0  1371 |3544B 6761B
16-08 04:08:52| 83   3  13   0   0   0|8.46 4.15 1.88|54.1G  755M  102G 31.7G|   0   174M|   0   983 |3650B 6769B
16-08 04:08:53| 84   3  13   0   0   0|8.46 4.15 1.88|54.1G  755M  102G 31.6G|   0   114M|   0   748 |3544B 6773B


# cat thread_pool
id   host          bt    ba bq bs    bc st    sa sq ss    sc
N1Ly es-watanabe-1 fixed 23  0 24  2588 fixed  2  0 37 10645
CN8- es-watanabe-2 fixed 20  0 24 10878 fixed  1  0 37 25810
AR56 es-watanabe-3 fixed 24  0 24 10943 fixed  1  0 37 25817
7lai es-watanabe-4 fixed 23  0 24 10932 fixed  1  0 37 25817
CXUS es-watanabe-5 fixed 24  0 24 10067 fixed  1  0 37 25838
p6oN es-watanabe-6 fixed 24  0 24  9883 fixed  1  0 37 25819
Q1u2 es-watanabe-7 fixed 23  0 24 10052 fixed  1  0 37 25856
0Ged es-watanabe-8 fixed 23  0 24 10808 fixed  1  0 37 25857

結果を見ると、実行時間も20秒程度でReindexAPIと比べて10倍以上高速になり、 CPUとディスクのリソース使用率も100%には届かないものの、かなり高まってきています。 これは、通信によるオーバーヘッドやブロックを取り除いたことも一因ですが、 ReindexTool自体の負荷を複数ノードに分散したことも大きいと考えられます。

ローカルシャードを他ノードのシャードに再インデクシングする

「ローカルシャードのみを再インデクシングする」の節では、ローカルシャードからローカルシャードへのドキュメントのコピーを想定して、ノード間の通信が発生しないようにしました。 しかし、この方法は、srcとdestのシャード数が同数の場合のみに適用できる方法です。

シャードを分割するなどして、srcとdestのシャード数が異なる場合は、 ローカルシャードへコピーすることもあれば、別ノードのシャードにコピーすることもあり、 BulkAPIを実行する際のノード間の通信は避けられません。

f:id:cybozuinsideout:20160816171810p:plain

そこで、Cluster Reroute APIで、ノード間の通信が必ず発生するようにシャードを移動して、再インデクシングの実行時間を計測しました。 結果は以下です。

Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
24 500 16 1000 24秒
24 1000 16 1000 24秒
24 2000 16 2000 24秒

先ほどの結果が21秒だったことを考えると、若干の劣化があるものの、十分高速に動作するようです。 ということで、シャード数を変更するようなユースケースにおいても、今回の再インデクシングの仕組みは有効と考えられます。

NVMe SSDのマシンで実行する

ディスクアクセスが大量に発生するワークロードにも関わらず、HDDはよく頑張ってくれて、かなり高速に動作するようになりました。 しかし、秒間数百MB程度の書き込みが実行されている状況なので、お金の力でHDDをSSDにしたら、さっくり速くなるのでは、という淡い期待が持てます。 ちょうどいいことに、検証用のNVMeを搭載したマシンがあったので、これを使って実行時間を計測してみます。

CPUとディスクのみですが、マシンスペックは以下です。マシンの関係上、CPUもかなりパワーアップしています。

要素 性能 備考
CPU 2.6GHz,20core,40threads
SSD書き込み 1.1GB/s ddでsequential writeを簡易計測
SSD読み込み 584MB/s hdparmで簡易計測

HDDの場合と同様に、マシン8台でクラスタを組んで、ローカルシャードの再インデクシングを実施したところ、実行時間は以下のようになりました。 CPUがパワーアップしたので並列度も上げて試しました。また、マッピングのindex.merge.scheduler.max_thread_count:1はHDD用の設定なので、今回はデフォルト値を使っています。

Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
24 500 16 1000 16秒
24 1000 16 1000 16秒
48 500 32 1000 15秒
48 1000 32 1000 16秒

速くはなりましたが、十数秒で終わってしまうので、ツールの立ち上げ処理などの時間比率が高くなり、 再インデクシングの時間が正しく計測できているか怪しいです。

そこで、srcのドキュメントを複製することで、ドキュメント数を増やして再度計測してみます。 srcのドキュメント数を5倍に複製した結果のインデックスの状態は以下のようになりました。 1000万ドキュメント、44GBです。

$ curl 'nvmenode-1:9200/_cat/indices'
green open src   16 0  10253585 0  44.4gb  44.4gb

これを同様の方法で再インデクシングします。データサイズが大きくなったので、シャード数も変化させてみました。 実行時間は以下です。

シャード数 Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
16 24 500 16 1000 71秒
16 24 1000 16 1000 72秒
16 48 500 32 1000 73秒
16 48 1000 32 1000 72秒
32 24 500 16 1000 62秒
32 24 1000 16 1000 62秒

44GBの再インデクシングが約60秒で終わりました。 今回のデータセットに対しては、約700MB/s(16万5千docs/s)程度の速度でインデクシングできています。 秒間1GBも見えてきて、Elasticsearch5.0.0の夢が広がりますね。

参考までに、32シャード、1000万ドキュメントのHDDの場合も計測したところ以下のようになりました。 結果として、112秒かかったので、お金の力によって2倍近く速くなったことになります。

シャード数 Bulk並列度 BulkSize Slice数 ScrollSize 実行時間
32 24 1000 16 1000 112秒(HDD)

ReindexToolのチューニング

最後に、Elasticsearchに関することではないのですが、ReindexToolを作成する中で効果が高かったチューニングが2つほどあったので紹介します。 ReindexToolはGoで書いているので、pprofというプロファイラを使ってチューニングしました。

まず、初期段階のツールでは、ScrollAPIから取得したレスポンスのJSONをパースしてstructとして扱い、 その中から_sourceを抜き出す、という処理を行っていました。 その時のプロファイル結果(開発用のローカル環境で計測)は以下です。

(pprof) top 10
77s of 135.57s total (56.80%)
Dropped 364 nodes (cum <= 0.68s)
Showing top 10 nodes out of 115 (cum >= 10.31s)
      flat  flat%   sum%        cum   cum%
    10.43s  7.69%  7.69%     15.63s 11.53%  encoding/json.(*decodeState).scanWhile
    10.38s  7.66% 15.35%     10.42s  7.69%  encoding/json.stateInString
     8.54s  6.30% 21.65%     16.40s 12.10%  runtime.scanobject
     8.04s  5.93% 27.58%     16.03s 11.82%  compress/flate.(*decompressor).huffSym
     7.95s  5.86% 33.44%     14.20s 10.47%  encoding/json.checkValid
     7.60s  5.61% 39.05%     35.35s 26.08%  compress/flate.(*decompressor).huffmanBlock
     7.12s  5.25% 44.30%      7.12s  5.25%  runtime.memmove
     6.25s  4.61% 48.91%     14.39s 10.61%  encoding/json.unquoteBytes
     5.46s  4.03% 52.94%      5.46s  4.03%  unicode/utf8.DecodeRune
     5.23s  3.86% 56.80%     10.31s  7.60%  compress/flate.(*decompressor).moreBits

ほぼ、JSONの処理です。やりたいことはドキュメントのコピーだけなので、JSON的な処理を頑張らずにどうにかしたいものです。 そこで、jsonparserというライブラリを使い、structへの変換などを省略するようにしました。結果は以下です。

(pprof) top10
53.29s of 58.24s total (91.50%)
Dropped 229 nodes (cum <= 0.29s)
Showing top 10 nodes out of 91 (cum >= 1.29s)
      flat  flat%   sum%        cum   cum%
     9.39s 16.12% 16.12%      9.39s 16.12%  github.com/buger/jsonparser.stringEnd
     8.62s 14.80% 30.92%     17.18s 29.50%  compress/flate.(*decompressor).huffSym
     7.83s 13.44% 44.37%     36.64s 62.91%  compress/flate.(*decompressor).huffmanBlock
     6.77s 11.62% 55.99%      6.77s 11.62%  runtime.memmove
     5.35s  9.19% 65.18%     10.71s 18.39%  compress/flate.(*decompressor).moreBits
     4.75s  8.16% 73.33%      5.36s  9.20%  bufio.(*Reader).ReadByte
     4.09s  7.02% 80.36%      9.61s 16.50%  compress/flate.(*decompressor).copyHist
     2.88s  4.95% 85.30%      5.52s  9.48%  compress/flate.forwardCopy
     2.39s  4.10% 89.41%      2.39s  4.10%  runtime.memclr
     1.22s  2.09% 91.50%      1.29s  2.21%  syscall.Syscall

2倍以上速くなりました。だいぶいい感じです。あとはhttpのgzipの処理も重そうです。 ローカルシャードのコピーだけなら、gzipする必要はなさそうなので、gzipの処理も外しました。結果は以下です。

(pprof) top10
20470ms of 22450ms total (91.18%)
Dropped 213 nodes (cum <= 112.25ms)
Showing top 10 nodes out of 88 (cum >= 4380ms)
      flat  flat%   sum%        cum   cum%
   10410ms 46.37% 46.37%    10410ms 46.37%  github.com/buger/jsonparser.stringEnd
    4250ms 18.93% 65.30%     4250ms 18.93%  runtime.memmove
    2480ms 11.05% 76.35%     2480ms 11.05%  runtime.memclr
    1600ms  7.13% 83.47%     1650ms  7.35%  syscall.Syscall
     780ms  3.47% 86.95%    11150ms 49.67%  github.com/buger/jsonparser.blockEnd
     300ms  1.34% 88.29%      300ms  1.34%  runtime.heapBitsForObject
     180ms   0.8% 89.09%      210ms  0.94%  runtime.greyobject
     170ms  0.76% 89.84%      170ms  0.76%  runtime.procyield
     160ms  0.71% 90.56%     2990ms 13.32%  runtime.mallocgc
     140ms  0.62% 91.18%     4380ms 19.51%  github.com/buger/jsonparser.EachKey

初期の6倍程度速くなりました。おそらくまだチューニングできるとは思うのですが、Go力が足りていないのでここまでにしておきます。

ということで、再インデクシングという単純なツールにおいては、以下の処理がボトルネックになりがちなので注意しましょう。

  • HTTPの圧縮展開
  • JSONのEncode/Decode

注意事項

Elasticsearchクラスタの性能を振り絞ることで、再インデクシングを高速化してきましたが、以下の点には注意が必要です。

  • ユーザー操作の検索リクエストなどを停止できないような状況では、再インデクシングでマシンリソースは使いきらないようにする
  • Sliced ScrollのSlice数はシャード数までに押さえておくと高速に動作する。(リファレンスのNoteを参照)
  • Bulk並列度を上げ過ぎると、BulkAPIタスクのキューが溢れてrejectが発生する。rejectが発生するとそのタスクが捨てられるので、ドキュメントのコピーが不完全になる。

reject対策の方法は、processorsの設定をしたり、thread_poolの設定をいじったりなど色々あるのですが、 かなりピーキーな設定になるので、限界まで性能を引き出す必要がなければBulk並列度を抑えるのが無難だと思います。節度のあるリソース利用を心がけましょう!

まとめ

本記事では、特定ユースケースにおける再インデクシングの高速化を探求しました。 結果として、ScrollAPIとBulkAPIを使い倒すことによって、ReindexAPIをそのまま使うよりも10倍以上高速に動作するようになりました。 また、wikipediaの日本語ドキュメントというデータセットにおいて、NVMeを使った8ノードのクラスタで、約700MB/s(16万5千docs/s)のスループットを実現しました。

今回は、ドキュメントを加工しないケースの再インデクシングを対象としましたが、 ユースケースが異なる場合は、個々の高速化テクニックを状況に合わせて部分的に採用して頂ければと思います。 また、SlicedScrollなど、5.0.0からの機能もあるので、バージョンアップを検討する際の要因の一つと考えて頂ければ幸いです。

検索システムを運用していくうえで、再インデクシングは何かと頭を悩ませる問題です。 私達も現状の検索システムでこの問題に悩まされてきたので、本記事がこの種の問題に頭を悩ませている方々の何らかの参考になれば嬉しいです。

We are hiring!!!

サイボウズでは Elasticsearch 大好きなエンジニアを募集しています!
キャリア採用 募集要項/イベント | サイボウズ株式会社

参考

How we reindexed 36 billion documents in 5 days within the same Elasticsearch cluster