この初版はLua 5.0向けに書かれました。後のバージョンでも大部分は関連性がありますが、いくつかの違いがあります。
第4版はLua 5.3を対象としており、Amazonやその他の書店で入手可能です。
書籍を購入することで、Luaプロジェクトの支援にもなります。


9.4 – 非プリエンプティブマルチスレッディング

以前見たように、コルーチンは一種の協調型マルチスレッディングです。各コルーチンはスレッドと同等です。`yield`-`resume`のペアは、あるスレッドから別のスレッドへ制御を切り替えます。しかし、「実際」のマルチスレッディングとは異なり、コルーチンは非プリエンプティブです。コルーチンが実行中の間は、外部から停止させることはできません。明示的に(`yield`の呼び出しを通じて)要求した場合にのみ、実行を中断します。いくつかのアプリケーションでは、これは問題ではなく、むしろその逆です。プリエンプションがない方がプログラミングははるかに簡単です。スレッド間の同期はプログラムで明示的に行われるため、同期バグについて神経質になる必要はありません。コルーチンがクリティカルリージョン外にある場合にのみ中断するようにする必要があります。

しかし、非プリエンプティブマルチスレッディングでは、いずれかのスレッドがブロッキング操作を呼び出すと、操作が完了するまでプログラム全体がブロックされます。ほとんどのアプリケーションでは、これは受け入れられない動作であり、多くのプログラマーがコルーチンを従来のマルチスレッディングの現実的な代替手段として無視する原因となっています。ここで見るように、その問題には興味深い(そして後知恵ながら明白な)解決策があります。

典型的なマルチスレッディングの状況を想定してみましょう。HTTPを介して複数のリモートファイルをダウンロードしたいとします。もちろん、複数のリモートファイルをダウンロードするには、1つのリモートファイルをダウンロードする方法を知っている必要があります。この例では、Diego Nehab氏が開発したLuaSocketライブラリを使用します。ファイルをダウンロードするには、そのサイトへの接続を開き、ファイルへのリクエストを送信し、(ブロック単位で)ファイルを受信し、接続を閉じる必要があります。Luaでは、このタスクを次のように記述できます。まず、LuaSocketライブラリをロードします。

    require "luasocket"

次に、ダウンロードしたいホストとファイルを定義します。この例では、World Wide Web ConsortiumサイトからHTML 3.2リファレンス仕様をダウンロードします。

    host = "www.w3.org"
    file = "/TR/REC-html32.html"

次に、そのサイトの80番ポート(HTTP接続の標準ポート)へのTCP接続を開きます。

    c = assert(socket.connect(host, 80))
この操作は接続オブジェクトを返し、これを使用してファイルリクエストを送信します。
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

receiveメソッドは、常に読み取った内容を含む文字列と、操作の状態を示す別の文字列を返します。ホストが接続を閉じると、受信ループを中断します。

最後に、接続を閉じます。

    c:close()

これで1つのファイルをダウンロードする方法がわかったので、複数のファイルをダウンロードするという問題に戻りましょう。簡単なアプローチは、一度に1つずつダウンロードすることです。しかし、このシーケンシャルなアプローチでは、前のファイルの読み取りを終了してから次のファイルの読み取りを開始するため、遅すぎます。リモートファイルを読み取るとき、プログラムはほとんどの時間をデータの到着を待って費やします。具体的には、ほとんどの時間を`receive`の呼び出しでブロックして費やします。したがって、すべてのファイルを同時にダウンロードすれば、プログラムの実行速度は大幅に向上する可能性があります。次に、接続にデータがない場合、プログラムは別の接続から読み取ることができます。明らかに、コルーチンはこれらの同時ダウンロードを構造化するための便利な方法を提供します。ダウンロードタスクごとに新しいスレッドを作成します。スレッドに利用可能なデータがない場合、スレッドはシンプルなディスパッチャーに制御を譲り、ディスパッチャーは別のスレッドを呼び出します。

コルーチンでプログラムを書き直すために、まず前のダウンロードコードを関数として書き直しましょう。

    function download (host, file)
      local c = assert(socket.connect(host, 80))
      local count = 0    -- counts number of bytes read
      c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
      while true do
        local s, status = receive(c)
        count = count + string.len(s)
        if status == "closed" then break end
      end
      c:close()
      print(file, count)
    end
リモートファイルの内容には関心がないため、この関数はファイルを標準出力に書き込むのではなく、ファイルサイズをカウントするだけです。(複数のスレッドが複数のファイルを読み取ると、出力はすべてのファイルが混ざり合います。)この新しいコードでは、接続からデータを受信する補助関数(`receive`)を使用します。シーケンシャルアプローチでは、そのコードは次のようになります。
    function receive (connection)
      return connection:receive(2^10)
    end
同時実行の実装では、この関数はブロックせずにデータを受信する必要があります。代わりに、利用可能なデータが十分でない場合は、中断します。新しいコードは次のようになります。
    function receive (connection)
      connection:timeout(0)   -- do not block
      local s, status = connection:receive(2^10)
      if status == "timeout" then
        coroutine.yield(connection)
      end
      return s, status
    end
timeout(0)の呼び出しにより、接続に対するすべての操作が非ブロッキング操作になります。操作の状態が`"timeout"`である場合、操作が完了せずに返されたことを意味します。この場合、スレッドは中断します。`yield`に渡される非偽の引数は、スレッドがまだタスクを実行していることをディスパッチャーに知らせます。(後で、ディスパッチャーがタイムアウトした接続を必要とする別のバージョンを示します。)タイムアウトの場合でも、接続はタイムアウトまで読み取ったものを返すため、`receive`は常に呼び出し元に`s`を返すことに注意してください。

次の関数は、各ダウンロードが個別のスレッドで実行されることを保証します。

    threads = {}    -- list of all live threads
    function get (host, file)
      -- create coroutine
      local co = coroutine.create(function ()
        download(host, file)
      end)
      -- insert it in the list
      table.insert(threads, co)
    end
テーブル`threads`は、ディスパッチャーのために、すべてのアクティブなスレッドのリストを保持します。

ディスパッチャーはシンプルです。主にすべてのスレッドを調べて、1つずつ呼び出すループです。タスクを完了したスレッドはリストから削除する必要があります。実行するスレッドがなくなると、ループを停止します。

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          end
        end
      end
    end

最後に、メインプログラムは必要なスレッドを作成し、ディスパッチャーを呼び出します。たとえば、W3Cサイトから4つのドキュメントをダウンロードするために、メインプログラムは次のようになります。

    host = "www.w3.org"
    
    get(host, "/TR/html401/html40.txt")
    get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
    get(host,"/TR/REC-html32.html")
    get(host,
        "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
    
    dispatcher()   -- main loop
私のマシンでは、コルーチンを使用してこれら4つのファイルをダウンロードするのに6秒かかります。シーケンシャルな実装では、その2倍以上(15秒)かかります。

高速化にもかかわらず、この最後の実装は最適からは程遠いです。少なくとも1つのスレッドに読み取るものがある間は、すべてがうまくいきます。しかし、読み取るデータを持つスレッドがない場合、ディスパッチャーはビジーウェイトを行い、スレッドからスレッドへと移動して、まだデータがないことを確認するだけです。その結果、このコルーチン実装は、シーケンシャルソリューションよりもほぼ30倍多くのCPUを使用します。

この動作を回避するために、LuaSocketの`select`関数を使用できます。これにより、プログラムはソケットのグループのステータス変更を待機しながらブロックできます。実装の変更はわずかです。ディスパッチャーを変更するだけで済みます。新しいバージョンは次のようになります。

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        local connections = {}
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          else    -- timeout
            table.insert(connections, res)
          end
        end
        if table.getn(connections) == n then
          socket.select(connections)
        end
      end
    end
内部ループに沿って、この新しいディスパッチャーは、タイムアウトした接続をテーブル`connections`に収集します。`receive`がこのような接続を`yield`に渡すことを思い出してください。したがって、`resume`はそれらを返します。すべての接続がタイムアウトすると、ディスパッチャーは`select`を呼び出して、それらの接続のいずれかのステータスが変更されるのを待ちます。この最終的な実装は、コルーチンを使用した最初の実装と同じ速度で実行されます。さらに、ビジーウェイトを行わないため、シーケンシャルな実装よりもわずかに多くのCPUを使用するだけです。