Python3.4 の新機能 asyncio を使ってみる

山本泰宇です。こんにちは。

運用本部では最近 Python3 への移行を進めています。そちらの話は別途ご紹介する予定ですが、今回は Python3.4 で追加された asyncio モジュールの使いかた(というか落し穴)を解説します。先に結論を書いておくと、Python3.5 の改良を待つのが吉という話です。

サンプルとして複数の SSH を一斉に呼び出してうまいこと処理してくれる passh を GitHub で公開しています。実際に社内で利用しているものをデチューンして使いやすくまとめたものです。

本題の asyncio ですが、Python3 で非同期 I/O 処理を実現するために追加されたイベント駆動処理のフレームワークです。最初に書いておきますが、以下で言及する難ありな点のいくつかは Python3.5 で改良される予定のようです。[PEP-0492, PEP 492 vs. PEP 3152, new round]

1. asyncio は Python の中の独自世界

言語ネイティブに非同期処理が実装されている Go のような言語と違い、ライブラリとして後付けされているため通常の Python の書き方ができません。例えば子プロセスの処理に subprocess を使うと同期 I/O になってしまいます。asyncio 流の書き方をしなければいけないので、Python に習熟している人でも学習コストが高めです。

実際、18.5 asyncio の目次を見るだけでも怯むのではないでしょうか。冒頭に書かれていますが、API はまだ安定しているとは言いがたく、Python3.4.4 や Python3.5 で変更される予定です。

2. コルーチンは generator

asyncio ではコルーチン(coroutine, 協調的マルチタスクの仕組み)という概念が導入されています。公式な解説を読んでも良くわからないと思いますが、重要なのは冒頭の "A coroutine is a generator" です。付け加えるなら yield from でサブの generator に処理を移譲することが多い(必須ではない) generator です。

yield from が含まれる関数やメソッドは暗黙的に generator になるので、普通に呼び出しても本体が実行されません。例えば以下のコードがあるとします。

def B():
    ...

@asyncio.coroutine
def A():
    B()

B が普通の関数であれば問題なく A から B の処理が呼び出されます。が、あるとき B の中でスリープする処理を追加するとします。

def B():
    ...
    yield from asyncio.sleep(1)
    ...

この変更を加えると、A から B の処理は呼び出されなくなってしまいます。yield from があると B は generator になってしまうからです。B に yield from が加わると、呼び出し元の A も以下のように修正が必要になります。

@asyncio.coroutine
def A():
    yield from B()

これはまず最初にはまる罠です。リファクタリングなどでうっかり yield from が入ると、通常の関数が generator になってしまうからです。さすがに罠すぎるので、PEP-0492 で改良が予定されています。

3. loop.close() は呼ぶべきではない

公式のサンプルコードのいたるところで、末尾にイベントループをクローズするコードが出てきます。

そこでプログラムが終了するのであればいいのですが、ライブラリ等でプログラムがその後も継続する場合、loop.close() を呼んではいけません。イベントループはスレッド毎に自動的に作られるデフォルトのものがあるのですが、一度クローズすると、その後使えなくなります。

例えば passh.PAssh.run() がもし loop.close() していたら、そのスレッドで再度 passh.PAssh を使おうとすると例外が発生するようになってしまいます。

対案としては asyncio.new_event_loop() でスレッドデフォルトではないイベントループを作って使うことも考えられますが、実際にやると茨の道です。asyncio は各所で暗黙的に Future や Task を作るのですが、その際にスレッドデフォルトのイベントループが使われてしまうためです。できなくはないけど...といったところでしょうか。

4. asyncio.wait の例外処理

複数のコルーチンを実行して結果を待つという典型的な処理で使われるのが asyncio.wait ですが、例外の処理に罠があります。公式のサンプルコード では例外が特に処理されていませんが、これを真似するとインタプリタ終了時にようやく例外が表示されるといったことになります。

import asyncio, sys

@asyncio.coroutine
def A():
    print('hoge', file=sys.stder)  # sys.stderr の間違い
    yield True

asyncio.get_event_loop().run_until_complete(asyncio.wait([A()]))
# 例外が飛ばない

asyncio は色々な箇所で暗黙的にコルーチンを asyncio.Future でくるみます。コルーチンで発生した例外は Future に格納され、後で asyncio.Future.result() を呼ぶと例外が再 raise されます。run_until_completeresult() を暗黙的に呼び出すので、大抵の場合は例外が発生します。

が、asyncio.wait は複数のコルーチンの Future の集合を返す Future を返す(not typo)ので、個々のコルーチンの Future.result() は呼び出されません。そのため公式のサンプルコード通りだと例外が発生せず、インタプリタを抜ける段でようやくデストラクタ処理が作動して気付くというわけです。

async.wait を使用するコードが無事動いたように見えても、実は例外が発生して何も処理されていないかもしれません。正しくは以下のように処理します。

done, _ = asyncio.get_event_loop().run_until_complete(asyncio.wait([A()]))
for future in done:
    future.result()  # 例外があれば raise する

5. 最初の例外で止めたい

前述のように asyncio.wait は例外が発生しても全部のコルーチンを実行します。非同期処理でシングルスレッドなのですから、最初の例外で止めたいですね。

すぐ思いつく loop.stop() は確かにイベントループを止めますが、止めたコルーチンがゴミとして残ってしまい、エラー処理後に再度イベントループを使おうとするとエラーになってしまいます。

正しい方法は、以下のようにキャンセルすることです。

@asyncio.coroutine
def A():
    try:
        ...
    except Exception as e:
        ...
        for task in asyncio.Task.all_tasks():
            task.cancel()

タスクをキャンセルすると、run_until_completeasyncio.CancelledError を投げるので、注意してください。

まとめ

以上のような罠にはまりながら作ったのが passh です。注意点を annotate してありますので、参考になれば幸いです。

使ってみた所感としては、asyncio を使いたければ Python3.5 を待つのが良いのではないかと感じました。以前月読のために自作のイベント駆動フレームワークを作ったのですが、自作するより少し楽はできます。