Clojure

エージェントと非同期処理

目次

Refと同様に、エージェントは変更可能な状態への共有アクセスを提供します。 Ref複数の場所の協調的な同期的な変更をサポートするのに対し、エージェントは個々の場所の独立した非同期的な変更を提供します。エージェントはそのライフタイムの間、単一の格納場所にバインドされ、その場所の変更(新しい状態への変更)のみをアクションの結果として許可します。アクションは関数(オプションで追加の引数付き)であり、エージェントの状態に非同期的に適用され、その戻り値がエージェントの新しい状態になります。アクションは関数であるため、マルチメソッドにもなり得るため、アクションは多様な可能性があります。また、関数のセットはオープンであるため、エージェントによってサポートされるアクションのセットもオープンであり、これは他の言語によって提供されるパターンマッチングメッセージ処理ループとは大きく対照的です。

Clojureのエージェントは、反応的であり、自律的ではありません。命令的なメッセージループやブロッキング受信はありません。エージェントの状態自体は不変である必要があり(できればClojureの永続的コレクションのインスタンス)、エージェントの状態は、常に任意のスレッドから(deref関数またはreaderマクロ@を使用して)メッセージなしで読み取ることができます。つまり、観測には協力や調整は必要ありません。

エージェントのアクションディスパッチは、(send agent fn args*) の形式を取ります。send(およびsend-off)は常にすぐに返ります。後で、別のスレッドで、次のことが行われます。

  1. 指定されたfnは、エージェントの状態と、引数が提供された場合はargsに適用されます。

  2. fnの戻り値は、エージェントに設定されている場合はバリデータ関数に渡されます。set-validator!の詳細を参照してください。

  3. バリデータが成功した場合、またはバリデータが提供されていない場合、指定されたfnの戻り値がエージェントの新しい状態になります。

  4. エージェントにウォッチャが追加されている場合、それらは呼び出されます。add-watchの詳細を参照してください。

  5. 関数の実行中に他のディスパッチが行われた場合(直接的または間接的に)、それらはエージェントの状態が変更されたまで保持されます。

アクション関数によって例外がスローされた場合、ネストされたディスパッチは発生せず、例外はエージェント自体にキャッシュされます。エージェントにエラーがキャッシュされている場合、後続の操作は、エージェントのエラーがクリアされるまで、すぐに例外をスローします。エージェントエラーはagent-errorで調べることができ、restart-agentでエージェントを再開できます。

すべてのアクションのエージェントは、スレッドプール内のスレッド間でインターリーブされます。任意の時点では、各エージェントに対して最大1つのアクションが実行されています。他の単一のエージェントまたはスレッドから同じエージェントにディスパッチされたアクションは、送信された順序で発生しますが、他のソースから同じエージェントにディスパッチされたアクションとインターリーブされる可能性があります。sendはCPUに制限されたアクションに使用し、send-offはIOでブロックする可能性のあるアクションに適しています。

エージェントはSTMと統合されています。トランザクションで行われたディスパッチは、コミットされるまで保持され、再試行または中止された場合は破棄されます。

Clojureのすべての同時実行サポートと同様に、ユーザーコードのロックは関与しません。

エージェントの使用は、JVMのシャットダウンを防ぐデーモンではないバックグラウンドスレッドのプールを開始することに注意してください。shutdown-agentsを使用してこれらのスレッドを終了し、シャットダウンを許可してください。

この例は、メッセージをリング状に送るテストの実装です。m個のエージェントのチェーンを作成し、次にn個のアクションのシーケンスをチェーンの先頭にディスパッチし、それを介して中継します。

(defn relay [x i]
  (when (:next x)
    (send (:next x) relay i))
  (when (and (zero? i) (:report-queue x))
    (.put (:report-queue x) i))
  x)

(defn run [m n]
  (let [q (new java.util.concurrent.SynchronousQueue)
        hd (reduce (fn [next _] (agent {:next next}))
                   (agent {:report-queue q}) (range (dec m)))]
    (doseq [i (reverse (range n))]
      (send hd relay i))
    (.take q)))

; 1 million message sends:
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"

エージェントの作成:agent

エージェントの検査:deref (@ readerマクロも参照) agent-error error-handler error-mode

エージェントの状態の変更:send send-off restart-agent

エージェントの待機:await await-for

Refバリデータ:set-validator! get-validator

ウォッチャ:add-watch remove-watch

エージェントスレッドの管理:shutdown-agents