コンテンツにスキップ

Haskell/Concurrency

出典: フリー教科書『ウィキブックス(Wikibooks)』

並列性

[編集]

Haskell での並列性は,主に Haskell スレッドで実現されます。Haskell スレッドは,ユーザー空間のスレッドで,実行時に実装されます。Haskell スレッドは,OSが備えているスレッドに比べ,空間的にも時間的にもともにより効率的です。セマフォ(信号方式)のような伝統的な同期と異なり,Haskell が採用している STM (ソフトウェア・トランザクショナル・メモリ)は,共有メモリへの並列アクセスを大変簡単にしています。

並列性を実現するモジュールは,Control.Concurrent.* と Control.Monad.STM です。

どんな場面で使われるか

[編集]

どんな場面で使われるかよりも重要なのは,どんな場合に使うべきでないかです。Haskell での並列性は,マルチプロセッサ・コアを使うという目的には使われません。そういう場合,別の概念,「並行 ( parallelism )」が使われます。「並列性 ( concurrency) 」は,入出力(IO)のように,シングルコアが様々な事柄を切り替えて処理していく場合に使われます。

例えば,「静的」な Web サーバ(静的な Web サーバとは,画像のような静的なコンテンツのみを扱うサーバのことです)を考えてみましょう。理想的には,このような Web サーバは,計算機の計算資源の消費ができるだけ少ないことが望ましい一方で,できるだけ迅速にデータを送受信できることが求められます。全体の速度を決める律速段階は入出力( I/O )であり,速度を早めるためにはハードウェアへの投資が必要です。ハードウェアへの投資は高くつくので,一つのプロセッサコアが複数の接続を扱うやりかたをできるだけ効率的にすることが求められます。

C言語で実装するなら,それぞれの select() 関数で順番に接続(コネクション)を選択したり監視するソケットを切り替える,大きなループ処理が使われるでしょう。それぞれの接続は,その接続の状況(例えばHTTPヘッダを受信しているのか,ヘッダ情報を解析しているのか,ファイルを送信しているかなど)を表す構造体を持っていることでしょう。しかしながら,Concurrent Haskell の場合,もっと小さなループで受信ソケットのみを監視し,受け入れた接続それぞれに新しいスレッドを割り当てて起動するだけの小さなループで処理できます。新しいスレッドは IO モナドとして記述され,順番に,HTTPヘッダ情報の受信,ヘッダ情報の解析,適切なファイルの送信といった処理をこなしていきます。

内部的には,Haskell のコンパイラは新しいスレッドの割当て・起動をスレッドの状況を特定する小さな記憶領域割当てに変換しますので,C言語でやっていることと違いはありません。様々なスレッドを使っての並行処理は,結局は一つの大きな繰り返し処理と考えることができます。それゆえ,スレッドがそれぞれ独立しているようにプログラムを記述したとしても,内部では,コンパイラはこれを大きな繰り返し処理に変換し,select() 関数,あるいはその他システムに応じて適切に選ばれた機能呼び出しなどが順番に処理されていくことになります。

例: ファイルを並行的にダウンロードする

[編集]
 downloadFile :: URL -> IO ()
 downloadFile = undefined
 downloadFiles :: [URL] -> IO ()
 downloadFiles = mapM_ (forkIO . downloadFile)

STM( Software Transactional Memory )

[編集]

STM ( Software Transactional Memory ) とは,データベースでのトランザクション処理に似た処理を,メモリを対象に行う仕組みのことです。 STM を使うことで,ロック処理を考えずに済むようになります。

STM を使うには,Control.Monad.STM. をインポートする必要があります。STM モナドに切り替えるには,低級関数が使われます。STM は,(複数のスレッド間での)コミュニケーションを実現するための複数の方式( TVar, TMVar, TChan および TArray )を提供しています。

以下の例では,TChan を使い,2つのスレッドが互いにコミュニケーションをとっています。main 関数でチャネルがつくられ,このチャネルが reader/wirterThread 関数で使われています。readerThread は TChan で新しい入力を待ち,受け取ったらこれを表示します。writerThread はいつくかの Int 値をチャネルに書き込み,自ら停止しています。

例: TChan をつかったコミュニケーション

[編集]
import Control.Monad.STM
import Control.Concurrent
import Control.Concurrent.STM.TChan

oneSecond = 1000000

writerThread :: TChan Int -> IO ()
writerThread chan = do
        atomically $ writeTChan chan 1
        threadDelay oneSecond
        atomically $ writeTChan chan 2
        threadDelay oneSecond
        atomically $ writeTChan chan 3
        threadDelay oneSecond

readerThread :: TChan Int -> IO ()
readerThread chan = do
        newInt <- atomically $ readTChan chan
        putStrLn $ "read new value: " ++ show newInt
        readerThread chan

main = do
        chan <- atomically $ newTChan
        forkIO $ readerThread chan
        forkIO $ writerThread chan
        threadDelay $ 5 * oneSecond