前回は,Haskellで並列プログラミングを行うための方法の一つとして,並行プログラミングについて語りました。「広く使われている並行/並列プログラミング技術」と「並列処理専用のプログラミング技術」の違い,「Haskellでの並列プログラミング専用の機能」と「スレッドを使った並行プログラミング機能」の違い,といったことを把握するうえで必要な説明でした。

 ただ,ネットワーク・プログラミングやWebプログラミングなど,並行プログラミング機能を使うことが前提である状況では,理論的な説明よりも実際の使い方のほうが重要です。特にSTMは,トランザクション・メモリーという特殊性から,とても取っつきにくいように見えます。実際には条件によって分岐する通常のI/O処理とたいして変わらないのですが,そうした感覚でSTMをとらえるには,STMを利用して実際にプログラミングを行った経験が必要になります。

 そこで今回は,スレッドやSTMを利用した実際の並行プログラミングの手法について説明したいと思います。

前回の補足

 前回の説明で,本来,newEmptyTMVarIOの定義を紹介するべきところが,誤ってnewTMVarIOの定義になっていました。newEmptyTMVarIOの定義は以下の通りです。

newEmptyTMVarIO :: IO (TMVar a)
newEmptyTMVarIO = do
  t <- newTVarIO Nothing
  return (TMVar t)

 newTMVarIOとは異なり,初期値が空であることを示すNothingであることがわかると思います。

 また,並列論理和演算子の定義が間違っていました。前回示した定義は,第1引数と第2引数のうち,無限ループになることなく先に答を返したほうの値を採用するようになっていました。

a |||| b = unsafePerformIO $ parSTM (return a) (return b)

parIO :: IO a -> IO a -> IO a
parIO a1 a2
 = do m <- newEmptyTMVarIO
      c1 <- forkIO (child m a1)
      c2 <- forkIO (child m a2)
      r <- atomically $ takeTMVar m
      killThread c1
      killThread c2
      return r
 where
     child m a = catch (do
         b <- a
         atomically $ putTMVar m b)
       (\e -> return ())

 この定義だと,第1引数と第2引数のどちらかが無限ループである場合には正常に動作します。しかし,どちらも無限ループでない場合には,先に出たほうの答を採用するため,第1引数と第2引数の間に論理和の関係が成立しません。

*Por> False |||| True
False

 したがって,正しくは以下のように実装する必要があります(参考リンク)。

import Control.Monad

a |||| b = por [a,b]

por :: [Bool] -> Bool
por xs = unsafePerformIO $ do
    r <- newEmptyTMVarIO
    n <- newTMVarIO 0
    ts <- flip mapM xs $ \x -> forkIO $ do
        x' <- catch (evaluate x) (\e -> return False)
        when x' $ atomically (putTMVar r ())
        atomically (takeTMVar n >>= putTMVar n . (+1))
    b <- atomically $ 
        (takeTMVar r >> return True) `orElse`
        (takeTMVar n >>= \x -> guard (x == length xs) >> return False)
    mapM_ killThread ts
    return b

 それぞれのスレッドで引数に渡された値の評価が終わったとき,結果がTrueであればrに処理の終了を意味する( )という値を設定し,Falseや例外を発するものであればnの値を加算します。一方,porの動作するスレッドでは,rとnの状態を検査しています。rが空ではなく( )という値を持っていれば,引数のうちどれかがTrueになっているので,全体としてTrueを返します。nがリストの大きさ(この場合は2)と同じであれば,どの引数もFalseなので,全体としてFalseを返しています(なお,この実装にはいくつか未説明の機能を使っています。そうした機能については後で説明します)。

 このように実装すれば,引数のどれも無限ループでなくても論理和関係が成り立ちます。

*Por> False |||| True
True
*Por> True |||| False
True
*Por> f |||| g
True
*Por> g |||| f
True
*Por> g |||| f |||| g |||| g
True

ハンドルを使ったI/O処理の制御

 前回は,STMを使うことで共有メモリーにより安全に処理を行えることを示しました。しかし,複数スレッドの中で行われるI/O処理は,STMによってキャンセル可能な共有メモリーに対する操作だけとは限りません。実際にはファイルの読み書きやGUIの操作など,いったん行うとキャンセルできないアクションを,複数のスレッドを使って行いたいこともあるでしょう。そんなときにはどうすればよいでしょうか?

 解決策の一つは,I/O処理の制御をファイル記述子(FD:file descriptor,ファイル・ディスクリプタ)やハンドルを使って行うのと同じように,共有メモリー上の「状態」を使ってI/O処理の制御を行うことです。

 こうした処理を行ったことのない人には,ピンと来ないかもしません。まずは,どのようにしてハンドルを利用するのかを見てみましょう。

 ファイルに対してある操作を行うプログラムを作成しようと思ったとき,最初に思いつくのは以下のようなコードでしょう。

import System.Environment
import System.IO

changeFile process file = do
    dat <- readFile file
    writeFile file $ process dat

main = do
    args <- getArgs
    let file = head args
        process = unlines . map (drop 2) . lines
    changeFile process file

 しかし,このプログラムはうまく動作しません。第7回 入出力と遅延評価の間を取り持つIOモナドで説明したように,readFileは必要な分だけ処理を行う遅延I/Oとして定義されています。このため,ファイルのすべての内容を読み込むまでは,ハンドルを解放することはありません。結果として,readFileが終わらないうちに始まったwriteFileを使う操作は拒絶されることになります(参考リンク)。

*Main> :main sample.txt
*** Exception: sample.txt: openFile: permission denied (Permission denied)

 こうしたエラーを回避するには,ファイルを読み込んでから新しい中身を書き込むまでのタイミングを変える必要があります。これは,return $! xとNFDataクラスのrnfメソッドを使い,writeFileよりも先にファイルの中身を読み込ませることで実現できます。

import Control.Parallel.Strategies

changeFile process file = do
    dat <- readFile file
    return $! rnf dat
    writeFile file $ process dat

*Main> :main sample.txt
*Main>

 これですべての問題が解決したでしょうか? よく考えると,そうではないことがわかります。このプログラムは,readFileでファイルのすべての内容を読み込んでから,ファイルに対する操作を行うようになっています。

 プログラムで使っているlinesは「文字列中の行区切り記号(line delimiter)に基づき,一つの文字列を複数の文字列のリストに分解する関数」であり,unlinesは「文字列の末尾に行区切り記号を加え,一つの文字列に統合する関数」です(参考リンク)。

*Main> :t lines
lines :: String -> [String]
*Main> lines "1 line\n2line\n3 line"
["1 line","2line","3 line"]
*Main> :t unlines
unlines :: [String] -> String
*Main> unlines it
"1 line\n2line\n3 line\n"

 linesとunlinesに挟まれたmap (drop 2)は,linesで分解されたそれぞれの行すべてに対して操作を行うことを前提にしています。

 しかし,もし操作で必要なのがファイルの内容のごく一部で,なおかつファイルがあまりにも巨大である場合には,これはあまり良い定義ではありません。そこで,遅延評価を生かして,必要な内容だけを読み取るように書き換えてみましょう。

changeFile process file = do
    dat <- readFile file
    let modDat = process dat
    return $! rnf modDat
    writeFile file modDat

main = do
    args <- getArgs
    let file = head args
        process = unlines . map (drop 2) . take 5 . lines
    changeFile process file

*Main> :main sample.txt
*** Exception: sample.txt: openFile: permission denied (Permission denied)

 再び処理が衝突してしまいました。何が悪かったのでしょうか? もう一度readFileの定義について考えてみてください。readFileはファイルのすべての内容を取得するまでハンドルを解放しません。つまり,ファイルの内容を必要なだけ読み取るように書き換えたことで,readFileがハンドルを解放しなくなってしまったのです。

 この問題を回避するには,readFileによるハンドルの暗黙の利用をやめて,ハンドルを明示的に使用するようにプログラムを書き換える必要があります。

changeFile process file = do
    h <- openFile file ReadMode
    dat <- hGetContents h
    let modDat = process dat
    return $! rnf modDat
    hClose h
    writeFile file modDat

 こうすれば,hCloseを使ったハンドル解放のタイミングを自分の手で制御できるので,特に問題が生じることはありません。

*Main> :main sample.txt
*Main>

 このように,ハンドルを直接使用するのではなく,ハンドルがどう利用されるかを推測して処理を制御するのは限界があります。ここまでの説明で,I/O処理の制御の本質が「状態」の操作にあることを理解してもらえたと思います。