Scala とか Play の Future と ExecutionContext について
背景
Scala や Play! framework で Future 使うときに、ExecutionContext を渡せと言われる。よく分からない場合は、とりあえず
import scala.concurrent.ExecutionContext.Implicits.global
とやっておけば、とりあえずコンパイルは通るんだけど、よく分からずにやっている人も多いと思う(ノ)。
今回は、そっから先の話。
Future はどうやって実行される?
Future の処理は、ExecutionContext というものによって実行される。ExecutionContext については、以下のドキュメントに記載がある。
Futures and Promises – Scala Documentation
(ちなみに、日本語訳は古いせいか、英語のドキュメント内容が異なる。)
1つの Future がどう実行されるかは ExecutionContext 次第で、以下のようなケースがありうる。ドキュメントの説明をだいたいそのまま書くと、
- 新規スレッドが立ち上がる
- スレッドプールから空いているスレッドを取ってきて、そのスレッド上で実行される
- 実行元のスレッドと同じスレッドで実行される(非推奨)
という感じ。
ま、ここまでは知ってる人も多いと思う。
ExecutionContext の種類
大雑把に以下の3つに分けて説明。
- scala.concurrent.ExecutionContext.Implicits.global
- play.api.libs.concurrent.Execution.Implicits.defaultContext (Play! framework の場合)
- 自分で作る
scala.concurrent.ExecutionContext.Implicits.global
Scala の標準の? ExecutionContext 。長いので、以下 global と呼ぶ。わからない場合はこれを使う。(ただし、Play! framework の場合は、後述の play.api.libs.concurrent.Execution.Implicits.defaultContext を使うと良い。)
この global は、どんな場面でもそれなりに無難に動く設計がされている。この global の挙動も先ほどのドキュメントに記載があるが、具体的には
- ForkJoinPool を使用している
- 並列度は、デフォルトではCPUコア数と同じに設定されている
- 変更する場合は、前回書いた並列コレクションと同様、scala.concurrent.context.* をいじる
play.api.libs.concurrent.Execution.Implicits.defaultContext
Play! の場合は、play.api.libs.concurrent.Execution.Implicits.defaultContex がデフォルトの ExecutionContext で、「大抵のケースではこれを使えばOK」(筆者超訳)と以下のドキュメントに書いてある。
これの設定を変更する場合は、application.conf をいじる。詳細は上のドキュメント参照。
(Play! の場合は、 global は使わないほうが良い、ってのはどっかに書いてあったけど見つからず。公式ドキュメントではなかったかも。理由としては、設定変更とかの管理がしづらい(application.conf では設定できないので)とかそういった理由だったはず。)
自分で作る
Scala 編
例えば、長くかかる処理もすぐに終わる処理も、同じスレッドプールを使われてしまうと困る場合もあると思う。そういった場合などには、自前で ExecutionContext を作るという方法がある。
作り方は、生の Scala の場合、前述の Futures and Promises に書いてあるけど、以下のとおりにする。
ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))
Play 編
Play の場合、ExecutionContext の実装は、Akka の dispatcher なので、application.conf に、デフォルトの設定とは別の Akka の設定を書いておき、それを lookup して使うという方法をする。
詳細は、ドキュメントの Using other thread pools の項を参照。
Akka
ちょっと本題からはずれるけど、Akka の場合は、MessageDispatcher が ExecutionContext の働きをする。詳細は以下のドキュメントを参照。
Actor 内からであれば、context.dispatcher を import すればよい。
class A extends Actor { import context.dispatcher val f = Future("hello") def receive = { case _ => } }
Future 関連の注意点とかその他
ExecutionContext の渡し方
分かるとは思うけど一応。
val ec = ... // ExecutionContext Future { // なんかの処理 }(ec)
blocking って何?
Future 内部でブロッキング処理を行う場合は、以下のように blocking というのを書いておくと良い。
Future { blocking { // なんかのブロック処理 } }
理由としては以下の2つ。
- ExecutionContext 内部で、別扱いで処理してくれる場合がある
- コードを読む人にとって、「あ、ここはブロックするのね」、とわかりやすい
前者に関しては、こちらもドキュメントに記載があるが、global とかは blocking とそうでないのは違った処理がされるが、Executors.newFixedThreadPool で作ったものとかだと、 blocking と書いてあっても無視される。Stack Overflow に詳しい解説を書いてる人がいた。
Await 使う場合はタイムアウト設定したほうが良い
Await.result とかは極力使うなとかドキュメントに書いてあるけど、場合によっては使わざるを得ない。で、その場合は、 Duration.Inf は使わず、適切なタイムアウト値を設定すべきだと思う。
Await するのが複数箇所あった場合、スレッドプールからのスレッド取得待ちでデッドロックしてしまう可能性があり、タイムアウトが設定されていないと詰む。以下のページに、簡単なデッドロックの例が書いてあるので参考まで。
How to avoid thread pool induced deadlocks? – Adam Sznajder – personal blog
関連する情報
以下、自分用メモ。
- Scala School – Concurrency in Scala
- scala-best-practices/4-concurrency-parallelism.md at master · alexandru/scala-best-practices
- Being aware Scala 2.10.0 Futures conceal Fatal exceptions – Mandubian Blog
- concurrency – Scala Futures – built in timeout? – Stack Overflow
まとめ
二流プログラマにとってはこの辺は難しいわー。