これまで,4回にわたって並行/並列プログラミングについて説明しました。その最後を締めくくる話題として,メッセージ通信を取り上げたいと思います。
メッセージ通信は第10回で分類した並行計算・並列計算・分散計算のうち,主に分散計算を行うことに焦点を当てた技術です。ただ,分散計算以外にも有用な局面はあります。
メッセージ通信の利点とは?
まず,分散処理とメッセージ通信の関係について説明しましょう。
これまで紹介してきた並行/並列処理のための機能では,並行Haskellのような「状態」を持つものにせよ,並列Haskellのような「状態」を持たないものにせよ,「一つの共有されたメモリーを使用する」という考えのもとでメモリーを扱ってきました。これに対し,メッセージ通信ではメモリーは共有せず,その名の通り,メッセージの送受信を通して値をやり取りします。
ネットワーク上の通信は,同一コンピュータ上での計算よりも高くつくため,たいていの場合,データ並列Haskellのような自動並列化機能によって隠ぺいするのは適しません(参考リンク1,参考リンク2)。つまり,メッセージ通信を隠ぺいすることなく,陽に表現するような機能が必要になります。このため,分散処理ではメッセージ通信が広く使われているのです。
では,分散処理以外で,メッセージ通信が有用なのはどんな場合でしょうか? まず,分散処理と同様に,共有メモリーを使用できないか,共有メモリーを使用することが適さない場合です。例えば,実行単位の異なる別のプロセスとやり取りをする場合がこれに相当します。
また,「共有メモリーではなくメッセージ通信を使う」というイメージを持つこと自体が役に立つ場合もあります。例えば,スレッド間でメモリーを共有することに伴うバグを回避するには,あえて「状態を共有する」というイメージを捨てて,メッセージ通信で陽に値をやり取りするのも一つの手です。ただ,現在のHaskellで並列プログラミングを行う場合,並列Haskellやデータ並列Haskellのようにそもそも状態を持たせないか,あるいは第12回で紹介したSTMを使って状態を処理するのが常道です。つまり,Haskellを使っていれば,こうした意味でメッセージ通信が有用になることはありません。
ただ,メッセージ通信のイメージは,「状態の共有の排除」だけでなく,バッファを利用する際に役立つこともあります。メッセージ通信が相手の処理の終了を待たない非同期的なものである場合,後に処理するべきメッセージを蓄えておくバッファを持つよう実装されているのが普通です。Haskellでメッセージ通信を利用すると,バッファをイメージしやすいという利点があります。
System.Processとメッセージ通信
では,実際にメッセージ通信を利用する例を見てみましょう。最初に紹介するのは,実行単位の異なる別のプロセスとメッセージをやり取りする例です。Haskellでは,外部プログラムをプロセスとして利用するためのモジュールとして,System.Processが用意されています。
System.Processはすでに第5回で説明しました。ただ,スレッドとプロセスの違いやメッセージ通信の概念には触れていなかったため,かなり大雑把な説明になっていました,まずは,このあたりを詳しく見ていきましょう。
スレッドは共有メモリーを使うのに対し,プロセスはメッセージ通信を使うという違いがあります。そこで,スレッドとプロセスを対比させて説明しましょう。
Haskellでは,スレッドを生成するのにfork**,スレッドを終了させるのにkillThread,スレッドの終了待ちをするのにMVarを利用します。同様に,System.Processでは,外部のプログラムを使ってプロセスを生成するのにrunInteractive**,プロセスの終了にterminateProcess,プロセスの終了待ちをするのにwaitForProcessを使います(runInteractive**には,第5回で説明したrunInteractiveProcessのほかに,シェルにコマンドを送り込むrunInteractiveCommandがあります,参考リンク)。
Prelude System.Process> :t runInteractiveProcess runInteractiveProcess :: FilePath -> [String] -> Maybe FilePath -> Maybe [(String, String)] -> IO (GHC.IOBase.Handle, GHC.IOBase.Handle, GHC.IOBase.Handle, ProcessHandle) Prelude System.Process> :t runInteractiveCommand runInteractiveCommand :: String -> IO (GHC.IOBase.Handle, GHC.IOBase.Handle, GHC.IOBase.Handle, ProcessHandle) Prelude System.Process> :t terminateProcess terminateProcess :: ProcessHandle -> IO () Prelude System.Process> :t waitForProcess waitForProcess :: ProcessHandle -> IO GHC.IOBase.ExitCode
並行処理 | 生成 | 終了 | 同期 |
---|---|---|---|
スレッド | forkIO,forkOS | killThread | MVar |
プロセス | runInteractiveProcess,runInteractiveCommand | terminateProcess | waitForProcess |
具体的にスレッドとプロセスの違いを見ていきましょう。スレッドは共有メモリーを使うため,I/Oアクションは,何の制約も受けず,現在の状態を取得することが可能でした。一方,System.ProcessではrunInteractive**の返り値である標準入力(stdin),標準出力(stdout),標準エラー(stderr)へのハンドル越しに値(メッセージ)をやり取りします。
標準出力・標準エラーの受信側になるハンドルを使用したプロセスは,中身が出力されるまでI/Oアクションを阻止(ブロック)します。このとき,標準出力・標準エラーが結果を返せば何の問題もありません。しかし,もしrunInteractive**を使って実行している処理が終了しなければどうでしょうか? 標準出力・標準エラーはいつまでも結果を返さず,ハンドルを使用した処理を永久にブロックし続けることになります。
標準入力のハンドルにも同様の欠点があります。たとえ相手先のプロセスが終了していても,無駄にメッセージの送信を試みようとしてしまいます。この結果,プログラム上でエラーが生じてしまうかもしれません。
これでは不便です。どうにかならないでしょうか?
こんな場合に役に立つのが,waitForProcessとgetProcessExitCodeの二つの関数です。
Prelude System.Process> :t waitForProcess waitForProcess :: ProcessHandle -> IO GHC.IOBase.ExitCode) Prelude System.Process> :t getProcessExitCode getProcessExitCode :: ProcessHandle -> IO (Maybe GHC.IOBase.ExitCode)
waitForProcessとgetProcessExitCodeは,それぞれ別の方法で問題を解決しようとします。
waitForProcessは,プロセスの終了まで処理をブロックして待ちます(現在のGHCの実装では-threadedオプションをつけてマルチスレッド版の実行時システムを使うようにしないと,waitForProcessは全スレッドをブロックしてしまいます)。つまり,waitForProcess以降は,プロセスがすでに終了していることがコード的に保証されることになります。プロセスが終了していることが明らかなら,プログラマがメッセージを送らないようにできます。この方法は,残念ながら標準出力・標準エラーの問題は解決できませんが,標準入力の問題には有効です。
もう一つの方法は,getProcessExitCodeを使うものです。プロセスのハンドルを使ってプロセスが終了しているかどうかを問い合わせます。getProcessExitCodeでは対象のプロセスが終了していればJustに包む形で終了コードを表すExitCode型の値を,プロセスが終了していなければNothingを返します。この方法を使えば,外部プログラムが処理の実行状況を問い合わせるための機能を持っていなくても,プロセスが終了しているかどうかを問い合わせることが可能になります。一つのプロセスに一つの処理を行わせている場合,つまりプロセスの終了と処理の終了が直接結びついている場合には,getProcessExitCodeを使うことで,必要に応じて処理を切り替えられます。
例えば,プロセスが終了していない場合には,そのプロセスの終了に依存しない別の代替処理を進められるようになります。また,プロセスが終了できずその結果を利用できない場合には,結果に依存する処理を打ち切るような振る舞いを記述することもできます。プロセスの終了を愚直に待ち続ける必要はなくなります。
ただ,プロセスの終了時に答えを返す場合ばかりではありません。一つのメッセージの入力つき一つ(あるいは複数)のメッセージを返す対話的(interactive)なプロセスを使うこともあります。このようなときには,runInteractive**は何を行うのでしょうか?
結論から言うと,runInteractive**は標準入出力をそれぞれバッファとして扱い,他のプロセスとは非同期的に仕事をこなしていくことになります。バッファの中身が空になれば,その間入力に依存するrunInteractive**上のプロセス,またはrunInteractive**の結果である出力に依存するプロセスがブロックされますが,そうでない間はお互いのメッセージのやり取りを気にせずに独立して動作します。ちょうど,パイプライン並列化を行ったり,コマンドラインでパイプ処理を行うのと同様です。外部プログラムと処理をつなぐというSystem.Processの役割を考えると当然でしょう。
このように,メッセージ通信には同期式のものと非同期式のものの2種類があります。一般に,メッセージ通信を同期式と非同期式に分ける場合には,メッセージの送信側に着目するが普通です。つまり,メッセージ送信時に送信側のプロセスをブロックするものを同期式,送信側をブロックすることなく継続してI/Oアクションの実行を行わせるものを非同期式と呼びます。送信側で定義しているのは,受信側のバッファが空のときには必然的にブロックすることになるため,分類が難しいためでしょう。
ただし,waitForProcessのように「受信側のプロセスをブロックすることで同期式のメッセージを通信を実現しようとする」特殊な例も存在します。それぞれのプロセスが独立して動作可能であるためブロックしなくてもよいのか,使用する各プロセスが一つのプロセスのように一体になって動作する必要があるのか,といった点を見極めることが大切です。
System.Processにおける同期式のメッセージと非同期式のメッセージを表にすると,以下のようになります。標準入出力は,一つの処理だけを行うときには同期式であり,外部プログラムとの間で対話的な処理を行うときには非同期式です。
メッセージ | 同期式 | 非同期式 |
---|---|---|
送信 | waitForProcess,標準入力 | getProcessExitCode,terminateProcess,標準入力 |
受信 | ExitCode,標準出力,標準エラー | Maybe ExitCode,標準出力,標準エラー |