Elasticsearch 5.0.0のIngest Node用プラグインを書いた話

こんにちは、アプリケーション基盤チームの渡辺です。好きなライブラリはLombokです。

アプリケーション基盤チームでは、Necoプロジェクト(アーキテクチャ刷新プロジェクト)の一環として、 次世代の検索基盤を検討していて、その候補としてElasticsearchを調査しています。

調査をしている中で、Ingest Node用のプラグインを試作しました。 Ingest Nodeとは、Elasticsearch 5.0.0 (2016/7時点ではalpha4までmaven centralにpublishされている)で追加された機能で、 インデクシング時にドキュメントに変換処理を適用できるなんだかすごいやつです。Ingest Node自体の説明はElastic{ON}のスライドが参考になります。


せっかくプラグインの作り方を調べたので、公開して似たようなことをする人の参考になれば良いな、ということでこの記事を書いています。 内容としては、私のようなElasticsearch初心者の方が、初めてプラグイン開発をする際に参考にできる記事を目指します。

なお、この記事はElasticsearch5.0.0-alpha4の動作を元に書いています。 alphaなので今後変更が入る部分もあるかと思いますが、予めご了承ください。

背景

最初に軽く、なぜIngest Nodeを調べることになったのか、という話をします。

まず、次世代検索基盤の要件として、再インデクシングを簡単にしたい、というものがありました。

例えば、ドキュメントのフィールドの型を変えたり、ネストの構造を変えたり、Analyzerを変えたりするには、マッピングを変更してドキュメントを再インデクシングする必要があります。 検索がより良い結果を返すように、継続的にマッピングを改善するためには、再インデクシングは気軽に実施したいです。

再インデクシングの前後で、ドキュメントに変更がない場合(Analyzerの変更など)は、 Elasticsearch 2.3.3から追加されたReindex APIをそのまま実行すれば良いのですが、フィールドの型などが変わる場合はドキュメントの加工が必要になります。

そこで、Elasticsearch 5.0.0で追加されたIngest Nodeの出番です。 Ingest Nodeを使えば、組み込みの機能を組み合わせるだけでドキュメントの加工ができ、より複雑なことをしたい場合はプラグインを書くことで対応できます。

事前準備

事前準備として、Elasticsearchの公式サイトから、5.0.0-alpha4をダウンロードして、起動してください。 https://www.elastic.co/downloads/elasticsearch

curlでlocalhostの9200ポートにGETして起動を確認できます。

# request
$ curl localhost:9200

# response
{
  "name" : "Smart Alec",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "5.0.0-alpha4",
    "build_hash" : "3f5b994",
    "build_date" : "2016-06-27T16:23:46.861Z",
    "build_snapshot" : false,
    "lucene_version" : "6.1.0"
  },
  "tagline" : "You Know, for Search"
}

Ingest Node

Ingest Nodeでの変換処理における重要な要素として、ProcessorPipelineがあります。 プラグインの開発に進む前に、まずは組み込みの機能を使って、これらの要素の理解を深めます。

Processor

実際の変換処理を行うのがProcessorです。組み込みでいくつか用意されています。 今回は、フィールドの値を大文字にするUppercase Processorを使ってみます。

Pipeline

インデクシングの際に変換を指定するには、Pipelineの定義が必要です。 Pipelineは複数のProcessorで構成され、各Processorによる変換が順番にドキュメントに適用されます。 Uppercase Processorを要素としてPipelineを定義してみます。

$ curl -XPUT localhost:9200/_ingest/pipeline/upper_sample -d '{
  "description": "upper processor sample",
  "processors": [
    {"uppercase": {"field": "foo"}}
  ]
}'

これで、ドキュメントのfooフィールドを大文字に変換するPipelineを定義できました。 URIのパスの末尾の「upper_sample」はこのPipelineのIDで、他のAPIからこのPipelineを指定する際に使用されます。

動作確認をするために、まずはSimulate APIを叩いてみます。

Simulate API

Simulate APIでは、Pipelineとドキュメントを指定して、変換の結果を確認することができます。 先ほど定義したupper_sample Pipelineに対して試してみましょう。

# request
$ curl -XPOST localhost:9200/_ingest/pipeline/upper_sample/_simulate -d '{
  "docs": [
    {"_source": {"foo": "bar"}}
  ]
}'

# response
{"docs":[
  {"doc":{
    "_id":"_id",
    "_type":"_type",
    "_index":"_index",
    "_source":{"foo":"BAR"},
    "_ingest":{"timestamp":"2016-07-01T05:55:38.108+0000"}
  }}
]}

ドキュメントのfooフィールドの値が、barからBARに変換されていることが確認できます。

Reindex API

PipelineはReindex APIにも適用できます。まずは、Reindex APIの対象となるインデックスを用意します。

# request
$ curl -XPOST localhost:9200/reindex_src/sample -d '{"foo": "bar baz"}'

# response
{
  "_index":"reindex_src",
  "_type":"sample",
  "_id":"AVWlB8IxgIfAWOLZwcdI",
  "_version":1,
  "forced_refresh":false,
  "_shards":{"total":2,"successful":1,"failed":0},
  "created":true
}

reindex_srcをreindex_destに再インデクシングします。 destのpipelineにupper_sample Pipelineを指定します。

# request
$ curl -XPOST localhost:9200/_reindex -d '{
  "source": {
    "index": "reindex_src"
  },
  "dest": {
    "index": "reindex_dest",
    "pipeline": "upper_sample"
  }
}'

# response
{
  "took":100,
  "timed_out":false,
  // ... 長いので省略
}

reindex_destのドキュメントを取得すると、fooフィールドの値が大文字に変換されていることが確認できます。

# request
$ curl localhost:9200/reindex_dest/_search

# respone
{
  "took":2,
  // ... 長いので省略
  "hits":[
    {
      "_index":"reindex_dest",
      "_type":"sample",
      "_id":"AVWgQkdgX1vV7c4s6MzR",
      "_score":1.0,
      "_source":{"foo":"BAR BAZ"}}
  ]
}

プラグインを作る

Ingest Nodeでの変換の仕方をざっくり理解したところで、本題であるプラグインの作成について説明します。

作成したプラグインをElasticsearchに追加することで、独自の変換ロジックを実装したProcessorを追加することができます。 プラグインを作成するには以下の3つが必要です。

  • Pluginクラスの実装
    NodeModuleにProcessorを登録する
  • Processorクラスの実装
    変換ロジックを実装する
  • plugin-descriptor.properties
    プラグインのクラス名の指定などを行う

これらの成果物をelasticsearchというディレクトリにまとめて、zip化したものがプラグインになります。 今回はJava8とgradleで開発します。作成するサンプルの最終的なソースコードはこちらのリポジトリにあります。
https://github.com/mwatanabe/ingest-plugin-sample

変換仕様

変換仕様は何でもよいのですが、単一の組み込みProcessorでは難しいものが良いと思うので、以下の仕様で作ります。

  • 変換対象となるフィールドをPipelineの定義時に指定できる
  • 特定の文字列を別の文字列で置き換える。それぞれの文字列はPipelineの定義時に指定できる
  • 変換した文字列の個数をtypo_countというフィールドに保存する

仕様のイメージが掴みづらい場合は、先にSimulate APIでの動作確認を見ていただくのが良いかと思います。

build.gradle

build.gradleは以下のようになり、gradle buildを実行すると成果物のzipが生成されます。 plugin-descriptor.propertiesはsrc/main/resources/plugin-metadataディレクトリに配置することを想定しています。

apply plugin: 'java'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.elasticsearch:elasticsearch:5.0.0-alpha4'
}

task build(type: Zip, dependsOn: [':jar']) {
    from files(libsDir)
    from 'src/main/resources/plugin-metadata'
    into 'elasticsearch'
}

Processorクラスの実装

変換処理を行うクラスを実装します。

package sample;

import java.util.Map;

import org.elasticsearch.common.Strings;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;

public class TypoReplaceProcessor extends AbstractProcessor {
    public static final String TYPE = "typo";

    private final String field;
    private final String target;
    private final String replacement;

    public TypoReplaceProcessor(String tag, String field, String target, String replacement) {
        super(tag);
        this.field = field;
        this.target = target;
        this.replacement = replacement;
    }

    @Override
    public void execute(IngestDocument document) throws Exception {
        String value = document.getFieldValue(field, String.class);
        int count = Strings.countOccurrencesOf(value, target);
        String replaced = value.replaceAll(target, replacement);

        document.setFieldValue(field, replaced);
        document.setFieldValue("typo_count", count);
    }

    @Override
    public String getType() {
        return TYPE;
    }

    public static class Factory extends AbstractProcessorFactory<TypoReplaceProcessor> {
        @Override
        public TypoReplaceProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
            String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
            String target = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target");
            String replacement = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "replacement");

            return new TypoReplaceProcessor(processorTag, field, target, replacement);
        }
    }
}

コードの細かい話は置いておいて、ProcessorとFactoryについて説明すると、以下のようになります。

TypoReplaceProcessor

  • AbstractProcessorを継承してexeuteメソッドとgetTypeメソッド実装する。
  • executeメソッドの引数として変換対象のドキュメント(IngestDocument)が渡ってくる。つまりはここが変換ロジックを書くメソッド。
  • getTypeメソッドはProcessorのtypeを返す。これはPipelineの定義において、Processorを指定するための識別子になる。

Factory

  • Processorのインスタンスを生成する。
  • FactoryはPipeline定義時のProcessorへのconfigを参照できる。

Pluginクラスの実装

PluginではNodeModuleにProcessorのFactoryを登録します。 Pluginクラスを継承して、onModuleという名前のpublicメソッドを定義すると、Elasticsearchがreflectionで良い感じに処理してくれるようです。

package sample;

import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin;

public class SamplePlugin extends Plugin {

    public void onModule(final NodeModule nodeModule) {
        nodeModule.registerProcessor(TypoReplaceProcessor.TYPE, (registry) -> new TypoReplaceProcessor.Factory());
    }
}

plugin-descriptor.properties

プラグインをelasticsearchに取り込むために必要な設定ファイルを書きます。読み込みたいPluginのクラス名を指定しています。

description=sample plugin for Elasticsearch
version=1.0.0
name=sample
classname=sample.SamplePlugin
java.version=1.8
elasticsearch.version=5.0.0-alpha4

ビルド

gradle buildを実行するとbuild/distributionsディレクトリにプラグインのzipが作成されます。

動作確認

まずは作成したプラグインをElasticsearchにインストールして再起動します。

# プラグインのインストール
bin/elasticsearch-plugin install /<path>/<to>/ingest-plugin-sample.zip

# Elasticsearchの起動
bin/elasticsearch

これでTypoReplaceProcessorが使えるようになるはずです。 getTypeメソッドは"typo"を返すようにしたので、Processorの識別子は"typo"です。 cybouzuをcybozuに修正するProcessorを定義して、Simulate APIで確認します。

# request
curl -XPOST localhost:9200/_ingest/pipeline/_simulate -d '{
  "pipeline": {
    "description": "cybozu typo pipeline",
    "processors": [{
      "typo": {
        "field": "foo",
        "target": "cybouzu",
        "replacement": "cybozu"}
      }
    ]
  },
  "docs": [{
    "_source": {"foo": "cybouzu cybouzu cybouzu"}
  }]
}'

# response
{"docs":[
  {"doc":
    {
      "_id":"_id",
      "_index":"_index",
      "_type":"_type",
      "_source":{"typo_count":3,"foo":"cybozu cybozu cybozu"},
      "_ingest":{"timestamp":"2016-07-01T02:33:01.130+0000"}
    }
  }
]}

responseの_sourceフィールドを見ると、3つのtypoが修正されてcount用のフィールドも追加されているのが確認できました。 これで、プラグインの開発、インストール、動作確認は完了です。

デバッグ

動作確認はできましたが、デバッグのために変換対象のドキュメント(IngestDocument)の中身を見たいこともあると思います。 Elasticsearchの起動時にjdwp用のデバッグポートを指定すれば、IntelliJなどのIDEからattachできます。(よくあるリモートデバッグですね。)

ES_JAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000" bin/elasticsearch

小ネタ

Reindex APIは内部的にはBulkRequestを利用します。 そうなると、Ingest Nodeが変換対象のドキュメントを複数受け取った場合、変換処理は並列に行われるのか、という点が気になりました。

そこで、Processor内でブロッキング処理(Thread.sleep)を書いて、Reindex APIにかかる時間を計測してみたところ、 ドキュメント数×sleep時間だけ処理時間が増えることが確認できました。

thread_pool APIで再インデクシング中のスレッド数を確認したところ、bulk.activeが1を示していました。 ということで、BulkRequestのドキュメントの変換処理はシングルスレッドで行われているようなので、 少なくともalpha4ではProcessor内でブロッキング処理を書くには注意が必要そうです。(alphaなのでGAになる時には変わってるかも。)

BulkRequestをPipelineが処理するコードはこの辺りでした。

おわりに

ElasticsearchのIngest Nodeについて、簡単な動作確認とプラグインの書き方の説明をしました。 2つのJavaクラスを実装して、設定ファイルを書いて、ビルドしてzipにまとめればプラグインを作成できます。

Ingest Nodeはalpha段階である5.0.0の機能なので、今後も色々変更はあるかと思いますが、 再インデクシングという文脈においてとても有用に感じているので、今後も情報を追っていきたいと思います。

少し長くなりましたが、この記事が読んでくださった方々のプラグイン開発の参考になれば幸いです。

We are hiring!!!

サイボウズでは Elasticsearch 大好きなエンジニアを募集しています!
キャリア採用 募集要項/イベント | サイボウズ株式会社

参考