Clojure

core.async ウォークスルー

はじめに

core.asyncライブラリは、チャネルを使用して非同期プログラミングをサポートします。

core.asyncを使用するには、Clojure 1.10.0以上と最新のcore.asyncライブラリへの依存関係を宣言します。

{:deps
 {org.clojure/clojure {:mvn/version "1.11.2"}
  org.clojure/core.async {:mvn/version "1.6.673"}}}

core.asyncでの作業を開始するには、REPLでclojure.core.async名前空間をrequireします。

(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])

または、名前空間に含めます。

(ns my.ns
  (:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))

チャネル

値はキューのようなチャネルで伝達されます。デフォルトでは、チャネルはバッファリングされていません(長さ0)。チャネルを介して値を転送するには、プロデューサーとコンシューマーがランデブーする必要があります。

バッファリングされていないチャネルを作成するには、chanを使用します。

(a/chan)

固定バッファサイズのチャネルを作成するには、数値を渡します。

(a/chan 10)

close!は、putを受け付けなくなるようにチャネルを閉じます。残りの値はまだtakeできます。ドレインされたチャネルはtakeでnilを返します。nilを明示的にチャネルに送信することはできません!

(let [c (a/chan)]
  (a/close! c))

チャネルでは、「満杯」の場合に異なるポリシーを持つカスタムバッファも使用できます。APIには2つの有用な例が用意されています。

;; Use `dropping-buffer` to drop newest values when the buffer is full:
(a/chan (a/dropping-buffer 10))

;; Use `sliding-buffer` to drop oldest values when the buffer is full:
(a/chan (a/sliding-buffer 10))

スレッド

通常のスレッドでは、チャネルを介して通信するために>!!(ブロッキングput)および<!!(ブロッキングtake)を使用します。

(let [c (a/chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (a/close! c))

これらはブロッキング呼び出しであるため、バッファリングされていないチャネルにputしようとすると、メインスレッドがブロックされます。threadfutureと同様)を使用して、プールスレッドで本体を実行し、結果をチャネルで返すことができます。ここでは、バックグラウンドタスクを起動してチャネルに「hello」をputし、現在のスレッドでその値を読み取ります。

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

GoブロックとIOCスレッド

goマクロは、特別なスレッドプールでその本体を非同期的に実行します。ブロックする可能性のあるチャネル操作は、スレッドをブロックする代わりに実行を一時停止します。このメカニズムは、イベント/コールバックシステムで外部にある制御の反転をカプセル化します。goブロック内では、>!(put)および<!(take)を使用します。

ここでは、以前のチャネルの例をgoブロックを使用するように変換します。

(let [c (a/chan)]
  (a/go (>! c "hello"))
  (assert (= "hello" (<!! (a/go (<! c)))))
  (a/close! c))

明示的なスレッドとブロッキング呼び出しの代わりに、プロデューサーにgoブロックを使用します。コンシューマーは、goブロックを使用してtakeし、結果チャネルを返し、そこからブロッキングtakeを行います。

Alts

キュー上のチャネルの1つのキラー機能は、同時に多くのチャネルを待機できる機能です(ソケットのselectのように)。これは、alts!!(通常のスレッド)またはgoブロック内のalts!で行われます。

2つのチャネルのいずれかの入力を結合するaltsを使用してバックグラウンドスレッドを作成できます。alts!!は実行する一連の操作(takeするチャネルまたはputする[チャネル値]のいずれか)を受け取り、成功した値(putの場合はnil)とチャネルを返します。

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/thread (while true
              (let [[v ch] (a/alts!! [c1 c2])]
                (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

出力(stdoutで、replでは表示されない可能性があります)

Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]

goブロックで同じことを行うには、alts!を使用できます。

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/go (while true
          (let [[v ch] (a/alts! [c1 c2])]
            (println "Read" v "from" ch))))
  (a/go (>! c1 "hi"))
  (a/go (>! c2 "there")))

goブロックはスレッドにバインドされていない軽量プロセスであるため、非常に多く持つことができます!ここでは、1000個のチャネルで挨拶をする1000個のgoブロックを作成します。準備ができたら、alts!!を使用してそれらを読み取ります。

(let [n 1000
      cs (repeatedly n a/chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (a/go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (a/alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeoutは、指定されたミリ秒待機してから閉じるチャネルを作成します。

(let [t (a/timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

alts!とタイムアウトを組み合わせて、時間指定されたチャネル待機を行うことができます。ここでは、チャネルに値が到着するまで100ミリ秒待ってから諦めます。

(let [c (a/chan)
      begin (System/currentTimeMillis)]
  (a/alts!! [c (a/timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

詳細情報

詳細については、以下を参照してください。

オリジナルの作成者: Alex Miller