map関数をStream Fusionで効率化

 Stream Fusionの例として,データ構造を作成する生産者と,データ構造を使用する消費者の両方の側面を持つmap関数を見てみましょう。stream-fusionパッケージで提供されているData.List.Streamモジュールのmap関数の定義は以下の通りです。

import qualified Data.Stream as Stream
import Data.Stream (stream ,unstream)

~ 略 ~

map :: (a -> b) -> [a] -> [b]
map _ []     = []
map f (x:xs) = f x : map f xs
{-# NOINLINE [1] map #-}

~ 略 ~

{-# RULES
"map -> fusible" [~1] forall f xs.
    map f xs = unstream (Stream.map f (stream xs))
--"map -> unfused" [1] forall f xs.
--    unstream (Stream.map f (stream xs)) = map f xs
  #-}

 map関数の本体は,PreludeやData.Listで提供される定義と何ら変わりはありません。PreludeやData.Listのmap関数と異なるのは,書き換え規則を使って融合変換が可能な形に書き換える部分です。

 "map -> fusible"規則では,map関数を使用した「map f xs」という式を「unstream (Stream.map f (stream xs))」という式に書き換えます。書き換えられたこの式では,unstreamとstreamという対となる関数が使用されています。これら二つの関数は,Data.Streamモジュールで定義されています。

-- ---------------------------------------------------------------------
-- List/Stream conversion

-- | Construct an abstract stream from a list.
stream :: [a] -> Stream a
stream xs0 = Stream next (L xs0)
  where
    {-# INLINE next #-}
    next (L [])     = Done
    next (L (x:xs)) = Yield x (L xs)
{-# INLINE [0] stream #-}

-- | Flatten a stream back into a list.
unstream :: Stream a -> [a]
unstream (Stream next s0) = unfold_unstream s0
  where
    unfold_unstream !s = case next s of
      Done       -> []
      Skip    s' -> expose s' $     unfold_unstream s'
      Yield x s' -> expose s' $ x : unfold_unstream s'
{-# INLINE [0] unstream #-}

 Data.Streamモジュールのunstreamはリストを返す関数であり,streamはリストを引数に取る関数です。「unstreamでリストを作成し,作成されたリストをstreamで使用する」というパターンが現れたときには,"STREAM stream/unstream fusion"という書き換え規則によってunstreamとstreamが除去されます。

--
-- /The/ stream fusion rule
--

{-# RULES
"STREAM stream/unstream fusion" forall s.
    stream (unstream s) = s
  #-}

 例えば「map f (map g xs)」という式は,map自身の書き換え規則である"map -> fusible"規則と,"STREAM stream/unstream fusion"規則の組み合わせによって,余分なunstreamとstreamが除去されます。

module RewriteStream where
import Prelude hiding (map)
import Data.List.Stream

fg = map (+1) . map (+2)

$ ghc -O2 -c RewriteStream.hs -ddump-simpl-stats
~ 略 ~
4 RuleFired
    1 STREAM map/map fusion
    1 STREAM stream/unstream fusion
    2 map -> fusible
~ 略 ~

    map f (map g xs)
==> { "map -> fusible"規則の適用 }
    unstream (Stream.map f
             (stream (unstream
             (Stream.map g (stream xs)))))
==> { "STREAM stream/unstream fusion"規則の適用 }
    unstream (Stream.map f
             (Stream.map g (stream xs)))

 このように,Stream Fusionでは,融合変換可能な優良生産者を定義するのにunstream関数,融合変換可能な優良消費者を定義するのにstream関数が使われます。

 それでは,unstreamが引数に取り,streamが値として返すStream型を見てみましょう。

------------------------------------------------------------------------
-- The stream data type

-- | A stream.
--
-- It is important that we never construct a bottom stream, because the
-- fusion rule is not true for bottom streams.
--
-- > (replicate 1 True) ++ (tail undefined)
--
-- The suspicion is that under fusion the append will force the bottom.
--
data Stream a = forall s. Unlifted s =>
                          Stream !(s -> Step a s)  -- a stepper function
                                 !s                -- an initial state

-- | A stream step.
--
--   A step either ends a stream, skips a value, or yields a value
--
data Step a s = Yield a !s
              | Skip    !s
              | Done

 Data.StreamモジュールのStream型では,リストに対して行うべき処理が再帰的に表現されています。Streamデータ構成子には,「s -> Step a s」型の関数と,状態「s」が格納されます。「s -> Step a s」型の関数は状態sを利用し,処理の中でsを書き換えます。「s -> Step a s」型の関数が,unstreamのような「Stream型を引数として取り,Stream以外の型を返す関数」の内部で呼ばれることで,処理が再帰的に進みます。

 stream関数とunstream関数を単純に組み合わせた「unstream (stream xs0)」という式を例に考えてみましょう。

 stream関数では,リストxs0をLデータ構成子に格納した「L xs0」を初期状態とし,状態「L xs0」を基にStep型の値を返すnext関数をStream型に格納します。

-- | Construct an abstract stream from a list.
stream :: [a] -> Stream a
stream xs0 = Stream next (L xs0)
  where
    {-# INLINE next #-}
    next (L [])     = Done
    next (L (x:xs)) = Yield x (L xs)
{-# INLINE [0] stream #-}

 Lデータ構成子は,Stream型で利用する状態を評価するための戦略を定義するものです。Lデータ構成子はL型で定義されています。L型やLデータ構成子のLは,遅延評価(Lazy)を意味します。また,これに対応する形で,正格評価(Strict)を与えるためのS型で定義されたSというデータ構成子も提供されています。

-- | Boxes for user's state. This is the gateway for user's types into unlifted
-- stream states. The L is always safe since it's lifted/lazy, exposing/seqing
-- it does nothing.
-- S is unlifted and so is only suitable for users states that we know we can
-- be strict in. This requires attention and auditing. 
--
data    L a = L a  -- lazy / lifted
newtype S a = S a  -- strict / unlifted

 Haskellでは,リストは「遅延評価されるデータ構造」として定義されています。この性質を維持するには,Stream型に格納する状態は遅延評価されなければなりません。しかし,第9回で説明したように,正格評価より遅延評価のほうが性能面で不利になる場合もあります。このため,遅延評価が必要な個所では状態を遅延評価し,性能が必要な個所では状態を正格評価する仕組みが必要になります。この目的で用意されているのが,Lデータ構成子とSデータ構成子です。

 Lデータ構成子とSデータ構成子は,Unliftedクラスで定義されているexposeメソッドを通して,状態に対する評価戦略を制御します。exposeメソッドは,unstream関数などから呼び出されます。

-- | A class of strict unlifted types. The Unlifted constraint in the
-- Stream type above enforces a separation between user's types and the
-- types used in stream states.
--
class Unlifted a where

  -- | This expose function needs to be called in folds/loops that consume
  -- streams to expose the structure of the stream state to the simplifier
  -- In particular, to SpecConstr.
  --
  expose :: a -> b -> b
  expose = seq

  ~ 略 ~

instance Unlifted (L a) where
  expose (L _) s = s
  {-# INLINE expose #-}

instance Unlifted (S a) where
  expose (S a) s = seq a s
  {-# INLINE expose #-}

 exposeメソッドでは,第1引数として与えられた状態の構成子がLである場合には,何も行いません。第1引数として与えられた状態の構成子がSである場合には,seq関数を使ってSデータ構成子に包まれた状態aを正格評価します。Haskellのデフォルトの評価戦略は遅延評価なので,このようにLデータ構成子とSデータ構成子を使うことで,状態に対する評価戦略を切り替えられます。

 では,stream関数に話を戻しましょう。

 ここで格納されるnext関数の振る舞いは,状態「L xs0」に格納されたリストxs0に要素がある場合と,リストxs0が空である場合で異なります。リストxs0に要素がある場合,next関数はリストxs0から取り出した先頭の要素xと,リストxs0から先頭の要素を取り除いたxsをLデータ構成子に格納した新しい状態「L xs」を,Yieldデータ構成子に包んで返します。一方,リストxs0の要素が空の場合には,リストから要素を取り出すことはできません。このため,next関数は処理の終了を表現するDoneデータ構成子を返します。

 stream関数は,状態を使って処理を行うnext関数と初期状態「L xs0」を格納しているだけで,実際にリストの要素をたどっていく再帰的な処理は行わない点に注意してください。リストの要素をたどる再帰的な処理を実際に行うのは,unstreamのような「Stream型を引数として取り,Stream以外の型を返す関数」の役割です。

 unstream関数の定義を見てみましょう。unstream関数では再帰関数のunfold_unstreamを使って,Stream型に格納されたnext関数と,初期状態「s0」を基にリストを作成します。

-- | Flatten a stream back into a list.
unstream :: Stream a -> [a]
unstream (Stream next s0) = unfold_unstream s0
  where
    unfold_unstream !s = case next s of
      Done       -> []
      Skip    s' -> expose s' $     unfold_unstream s'
      Yield x s' -> expose s' $ x : unfold_unstream s'
{-# INLINE [0] unstream #-}

 next関数を状態sに適用した式「next s」が返すStep型の値が「Yield x s'」である場合,unfold_unstreamはxをリストの先頭に加え,状態s'を更新して再帰的に処理を続けます。「next s」が返すStep型の値が「Skip s'」である場合には,unfold_unstreamは状態s'を更新して再帰的に処理を続けます(Skipデータ構成子については,後で説明します)。「next s」が返すStep型の値がDoneである場合には,処理がそこで終わるので,unfold_unstreamはリスト末尾の[ ]を返します。

 つまり,unstream関数の内部にあるunfold_unstream関数が,stream関数が作成した「Stream next (L xs0)」のnext関数を呼ぶことで,stream関数に引数として渡されたリストの要素を再帰的にたどる処理が行われるのです。

 streamとunstreamを単純に組み合わせた「unstream (stream xs0)」という式では,Stream型に「stream関数のnext関数」と初期状態「L xs0」が格納されていることになります。unstreamでは,再帰関数unfold_unstreamを使って,「stream関数のnext関数」と「L xs0」を使った式「next (L xs0)」を評価します。

 このとき,リスト「xs0」に要素があれば,「next (L xs0)」はxs0を先頭の要素xと先頭の要素を取り除いたxsを格納した「Yield x (L xs)」を返します。「Yield x (L xs)」のxをリストの先頭に置き,新しい状態「L xs」を使ってunfold_unstreamを再帰的に呼び出すことで新しいリストを作成します。リスト「xs0」の中に要素がない場合,「next (L xs0)」はDoneを返すので,unfold_unstreamはリスト末尾の[ ]を返します。このようにして「unstream (stream xs)」は,リストxsの要素をたどり,最終的にxsと同じ順序で要素を持つリストを返すことになります。したがって,この形になれば,unstreamとstreamを削除できるのです。

 unstreamとstreamの間で何らかの処理を行う場合についても考えてみましょう。"map -> fusible"規則によって,map関数を使った式「map f xs」は「unstream (Stream.map f (stream xs))」に書き換えられます。

 この式で使われているStream.mapは,Data.Streamモジュールのmap関数です。ただ,Data.Streamモジュールのmap関数をそのまま使ってしまうと,Data.List.Streamモジュールのmap関数と同じ名前なので両者を区別できません。そこで名前の衝突を避けるために,Data.Streamモジュールのインポート宣言でqualifiedキーワードを使い,Data.Streamモジュールからインポートされる要素では修飾された名前を使用するようにしています。ここではqualifiedキーワードに加えてas節を利用しているので,Data.Streamモジュールからインポートされたmap関数は,モジュール名であるData.Streamではなく,as節で与えられたStreamという名前で修飾されます(参考リンク1参考リンク2)。

import qualified Data.Stream as Stream

 したがって,Stream.map関数が行う処理を知りたければ,Data.Streamモジュールのmap関数の定義を見ればいいことになります。

-- map
map :: (a -> b) -> Stream a -> Stream b
map f (Stream next0 s0) = Stream next s0
  where
    {-# INLINE next #-}
    next !s = case next0 s of
        Done       -> Done
        Skip    s' -> Skip        s'
        Yield x s' -> Yield (f x) s'
{-# INLINE [0] map #-}

--
-- a convenient rule for map
--
{-# RULES
    "STREAM map/map fusion" forall f g s.
        map f (map g s) = map (\x -> f (g x)) s
 #-}

 Data.Streamモジュールのmap関数では,Stream型に格納されたnext0関数を使って新しいnext関数を作成し,Stream型に戻します。next関数では,next0関数が返す値に応じて結果を加工します。next0関数が返す値がDoneや「Skip s」のときには何もせず,「Yield x s'」のときには関数fをxに適用した値である「Yield (f x) s'」を返します。

 「unstream (Stream.map f (stream xs))」という式では,unstream関数内にあるunfold_unstream関数は,Data.Streamモジュールのmap関数に格納されたnext関数を使って再帰的に処理を行います。next関数では,stream関数に格納されたnext関数をnext0関数として呼び出し,stream関数に格納された状態「L xs0」を基に処理を行います。この結果,リストのすべての要素に関数fを適用する再帰的な処理が行われます。

 Data.Streamモジュールのmap関数のように「Stream型を引数として取り,Stream型を返す関数」では,Stream型の中の関数と状態を呼び出し,その結果を加工することで処理をつないでいきます。再帰的な処理を駆動するのは「Stream型を引数として取り,Stream以外の型を返す関数」であることに変わりはありません。したがって,unstreamとstreamの間で何か処理を行う場合でも,再帰的な処理を駆動するのはunstream関数の役割です。

 このように,Stream Fusionでは,mapのような再帰関数の処理を,「Stream型を引数として取り,Stream以外の型を返す関数」によって再帰処理を行う部分と,それ以外の部分に分離します。再帰的な処理を行うunstreamは,"STREAM stream/unstream fusion"規則によって除去できるので,streamを使って定義された優良消費者がmap関数を利用する場合には,実際の再帰処理を最終的な優良消費者まで延期できます。

 つまり,Stream Fusionでは,中間データ構造だけでなく,関数と関数の間に発生する再帰的な処理も除去できるのです。第38回で説明したように,GHCでは再帰関数はインライン化されません。これを再帰を使用しない形に変換することでインライン化が可能になります。Stream Fusionを利用することで,処理系が行うインライン化により,プログラムの効率が大きく向上することになります。