他のスレッドに例外を発生させるthrowTo関数

 Control.Exceptionモジュールでは,対象として指定したスレッドで例外を発生させるthrowToという関数を提供しています。

Prelude Control.Exception> :t throwTo
throwTo :: (Exception e) => GHC.Conc.ThreadId -> e -> IO ()

 throwToを使って他のスレッドに例外を発生させることで,そのスレッドの処理に割り込んで中断・終了できるようになります。

 試してみましょう。

module AsyncException where
import Control.Concurrent
import Control.Exception
import Control.Monad

foreverTest'' = do
    tid <- forkIO $ forever $ print "infinite loop."
    threadDelay 5
    throwTo tid (ErrorCall $ "throw exception to " ++ show tid ++ ".")
    threadDelay 1000000
    mid <- myThreadId
    print $ show mid ++ " finished."

 第12回で説明したように,親スレッドが終了すれば子スレッドも終了します。そこで,子スレッドの終了が,親スレッドの終了によるものではなくthrowToによるものであることを保証するため,「threadDelay 1000000」で親スレッドの終了を引き伸ばしています。

 実行すると,例外の発生によって子スレッドの処理が中断・終了し,しばらくして最後のメッセージが表示されます。

Prelude AsyncException> foreverTest''
"infinite loop."
"infinite loop."
"infinite loop."
"infinite loop."
~ 略 ~
"infinite loop."
"infinite loop."
"infinite lo<interactive>: throw exception to ThreadId 24.
"ThreadId 23 finished."

 なお,すでに終了したといった理由でその時点では存在しないスレッドに対して例外が通知される場合があります。そうした通知は,単に無視されます。

module AsyncException where
import Control.Concurrent
import Control.Exception
import Control.Monad

foreverTest''' = do
    tid <- forkIO $ forever $ print "infinite loop."
    threadDelay 5
    throwTo tid (ErrorCall $ "throw exception to " ++ show tid ++ ".")
    threadDelay 1000000
    throwTo tid (ErrorCall $ "throw exception to " ++ show tid ++ ".")
    mid <- myThreadId
    print $ show mid ++ " finished."

*AsyncException> foreverTest'''
"infinite loop."
"infinite loop."
"infinite loop."
~ 略 ~
"infinite loop."
"infinite loop."
"infinite loop."
"i<interactive>: throw exception to ThreadId 30.
"ThreadId 29 finished."

 処理の中断・終了手段としての例外は,第12回で説明したkillThreadを実装するのにも使われています。実は,killThreadはthrowTo関数を使う構文糖衣だったのです。

killThread :: ThreadId -> IO ()
killThread tid = throwTo tid ThreadKilled

Prelude Control.Exception> :i ThreadKilled
data AsyncException = ... | ThreadKilled | ...
        -- Defined in GHC.IOBase

 同様に,一定時間内に処理が完了しなければ実行を打ち切る「タイムアウト処理」を記述することもできます。GHC 6.10.1のSystem.Timeoutモジュールでは,以下のような形でtimeout関数が定義されています。数字の単位はマイクロ秒です。

data Timeout = Timeout Unique deriving Eq
INSTANCE_TYPEABLE0(Timeout,timeoutTc,"Timeout")

instance Show Timeout where
    show _ = "<<timeout>>"

instance Exception Timeout

~ 略 ~

timeout :: Int -> IO a -> IO (Maybe a)
timeout n f
    | n <  0    = fmap Just f
    | n == 0    = return Nothing
    | otherwise = do
        pid <- myThreadId
        ex  <- fmap Timeout newUnique
        handleJust (\e -> if e == ex then Just () else Nothing)
                   (\_ -> return Nothing)
                   (bracket (forkIO (threadDelay n >> throwTo pid ex))
                            (killThread)
                            (\_ -> fmap Just f))

 handleJust関数を使っている部分が,この関数の核になる処理です。handleJustは,特定の例外に対する処理を記述するcatchJustという関数の引数を入れ替えたものです。catch関数は,例外の種類を判別するのに例外型を直接使うのに対し,catchJustやhandleJustは,例外型をMaybe型に変換する関数を利用します。

catchJust
        :: Exception e
        => (e -> Maybe b)         -- ^ Predicate to select exceptions
        -> IO a                   -- ^ Computation to run
        -> (b -> IO a)            -- ^ Handler
        -> IO a
catchJust p a handler = catch a handler'
  where handler' e = case p e of
                        Nothing -> throw e
                        Just b  -> handler b

~ 略 ~
handleJust :: Exception e => (e -> Maybe b) -> (b -> IO a) -> IO a -> IO a
handleJust p =  flip (catchJust p)

 timeout関数の中でhandleJust関数により処理すべき例外は,bracket関数で覆われているI/Oアクションの内部で発生します。そのI/Oアクションでは,forkIOで新しいスレッドを作成し,元のスレッドでの処理の実行を管理しています。一定時間内に処理が終われば,新しく作ったスレッドをkillThreadで終了させることで,例外の発生を防ぎます。一方,一定時間内に処理が終わらなければ,新しいスレッドがTimeout例外を通知し,handleJustを使ってNothingを返すよう例外処理を行います。つまりtimeout関数は,一定時間内に処理が終われば結果の値をJustに包んで返し,処理が終わらなければNothingを返すようにを定義されているのです。

 なお,timeout関数の最初のガード節で,第1引数の値が負の数であれば時間無制限で処理を行い,0であればNothingを返すよう定義されています。

Prelude System.Timeout Control.Monad> timeout 10 $ return ()
Just ()
Prelude System.Timeout Control.Monad> timeout 10 $ forever $ return ()
Nothing
Prelude System.Timeout Control.Monad> timeout 0 $ return ()
Nothing
Prelude System.Timeout Control.Monad> timeout (-10) $ forever $ return ()
Interrupted.

 killThread関数やtimeout関数のような処理は便利ですが,いくつか注意点があります。まず,throwToの対象であるスレッドがFFIで外部関数を呼び出している場合,呼び出しが完了するまでは例外が発生しません。また,2008年11月時点でのthrowTo関数は,完全な非同期動作を行うわけではありません。対象のスレッドへの例外の通知が完了して例外が発生するまでは,元のスレッドの実行に戻ってこないという実装になっています(参考リンク)。

 このため外部関数に依存する処理では,思った通りに処理を終了させることができなかったり,その影響で他のスレッドがしばらくの間停止することになるかもしれません。throwToやそれを利用したkillThread,timeoutなどを使う場合には,こうした点に留意してください。

非同期例外と例外安全性

 throwToを使った非同期例外の導入は,例外処理に大きな影響を及ぼします。ある関数を例外安全なものとして実装しようとする場合,少なくとも「例外が発生してもリソース漏れが発生しない」という基本的な保証を満たす必要があります。

 逐次処理でこの保証を満たすには,関数が例外を送出しないよう注意すれば大丈夫でした。しかし,非同期例外ではこれだけでは不十分です。他のスレッドからの例外の通知が,関数内部には存在しなかった例外を持ち込んでくるからです。並行処理の環境下では,例外処理を失敗させるようなあらゆる可能性を防ぐ「無失敗(no-fail)保証」を持った処理が必要になります。

 他のスレッドからの例外の通知による失敗を防ぐには,具体的にはどうすればよいでしょうか? その手段として,Control.Exceptionモジュールでblockという関数が提供されています。block関数は,throwToから送られた非同期例外をブロックし,その間に必要な処理を行うためのものです。逆にブロックを解除するためのunblockという関数もあります。

Prelude Control.Exception> :t block
block :: IO a -> IO a
Prelude Control.Exception> :t unblock
unblock :: IO a -> IO a

 ブロック漏れやブロックの解除漏れを防ぐため,blockやunblockはアクションではなくアクションを対象とした「組み合わせ子(combinator,または結合子)」の形で提供されています(参考リンク)。また,あるアクションがブロック・モード(blocked mode)で動作しているか非ブロック・モード(unblocked mode)で動作しているかを調べるblockedという関数も提供されています。

Prelude Control.Exception> :t blocked
blocked :: IO Bool

Prelude Control.Exception> blocked
False
Prelude Control.Exception> block blocked
True
Prelude Control.Exception> unblock blocked
False
Prelude Control.Exception> block $ unblock blocked
False

 blockedを使って調べればわかるように,入れ子になったblockやunblockは,単なるblockやunblockと同じものとみなされます。

Prelude Control.Exception> block $ block $ unblock blocked
False
Prelude Control.Exception> block $ block $ blocked
True
Prelude Control.Exception> block $ block $ unblock blocked
False
Prelude Control.Exception> block $ unblock $ block blocked
True
Prelude Control.Exception> unblock $ unblock blocked
False
Prelude Control.Exception> unblock $ unblock $ block blocked
True

 現行のGHCのbracket*関数やfinally関数は,他のスレッドから送られてきた非同期例外によって前処理や後処理が妨げられないように,blockを使って実装されています。

bracket
        :: IO a         -- ^ computation to run first (\"acquire resource\")
        -> (a -> IO b)  -- ^ computation to run last (\"release resource\")
        -> (a -> IO c)  -- ^ computation to run in-between
        -> IO c         -- returns the value from the in-between computation
bracket before after thing =
  block (do
    a <- before
    r <- unblock (thing a) `onException` after a
    after a
    return r
 )

~ 略 ~

finally :: IO a         -- ^ computation to run first
        -> IO b         -- ^ computation to run afterward (even if an exception
                        -- was raised)
        -> IO a         -- returns the value from the first computation
a `finally` sequel =
  block (do
    r <- unblock a `onException` sequel
    sequel
    return r
  )

~ 略 ~

bracket_ :: IO a -> IO b -> IO c -> IO c
bracket_ before after thing = bracket before (const after) (const thing)

~ 略 ~

bracketOnError
        :: IO a         -- ^ computation to run first (\"acquire resource\")
        -> (a -> IO b)  -- ^ computation to run last (\"release resource\")
        -> (a -> IO c)  -- ^ computation to run in-between
        -> IO c         -- returns the value from the in-between computation
bracketOnError before after thing =
  block (do
    a <- before
    unblock (thing a) `onException` after a
  )

 現在の処理だけ,unblockを使って明示的に割り込み可能にしているのがわかりますね。onExceptionは,例外が発生したときにだけ行う処理を記述するための関数です。blockedと同じくControl.Exceptionモジュールで提供されています。

onException :: IO a -> IO b -> IO a
onException io what = io `catch` \e -> do what
                                          throw (e :: SomeException)

 上の例では,onExceptionを使って,例外発生時にも同じ後処理を行わせるようにしています。ただしbracketOnErrorだけは,例外発生時にのみ後処理を行うようにしています。

 このように,bracket*やfinallyは非同期例外の存在を考慮して実装されています。このため,これらの関数を使えば並行処理の環境下でも比較的簡単に例外安全な処理を書けます。例外安全なI/Oアクションを記述する場合には,まずbracket*やfinallyを使うことを考えましょう。これらの関数では行き届かない部分があれば,blockやunblockの利用を検討してみてください。