[230810] core.async - 00 - channel

Table of Contents

1 Motivation

core.async 는 나에게 많이 어려운 느낌이든다.

전부 이해할 수는 없지만 기초 수준이라도 이해를 하고 있는 것과 아닌 것은 큰 차이가 있다고 생각한다.

나를 포함한 초보자들에게 도움이 되었으면 한다.

2 buffer

buffer는 채널에 데이터를 저장하는 방법이다. buffer를 둘러보면 core.async가 데이터를 어떻게 바라보는지 알 수 있다.

buffer의 구현체는 FixedBuffer, DroppingBuffer, SlidingBuffer 그리고 PromisingBuffer가 있다.

(ns clojure.core.async.impl.buffers
  (:require [clojure.core.async.impl.protocols :as impl])
  (:import [java.util LinkedList]
	   [clojure.lang Counted]))

(deftype FixedBuffer [^LinkedList buf ^long n]
  impl/Buffer
  (full? [_this]
    (>= (.size buf) n))
  (remove! [_this]
    (.removeLast buf))
  (add!* [_this val]
    (.addFirst buf val))
  (close-buf! [_this])
  Counted
  (count [_this]
    (.size buf)))

(defn fixed-buffer [^long n]
  (FixedBuffer. (LinkedList.) n))

(deftype DroppingBuffer [^LinkedList buf ^long n]
  impl/UnblockingBuffer
  impl/Buffer
  (full? [_this]
    false)
  (remove! [_this]
    (.removeLast buf))
  (add!* [_this val]
    (when (< (.size buf) n)
      (.addFirst buf val)))
  (close-buf! [_this])
  Counted
  (count [_this]
    (.size buf)))

impl/UnblockingBuffer 는 자바의 마커 인터페이스와 동일한 역할을 한다.

impl/Buffer는 core.async의 프로토콜이다. clojure는 기본적으로 FixedBuffer를 사용한다. 그러므로 buffer가 꽉 찬 상태에서 데이터를 넣으면 블로킹이 발생할 수 있다. (즉, 끝까지 기다리다가 계속 줄을 서다보면 예외가 발생한다.) 이럴 때 DroppingBuffer나 SlidingBuffer로 블로킹을 피할 수 있다.

(ns clojure.core.async.impl.protocols)

(defprotocol ReadPort
  (take! [port fn1-handler]))
(defprotocol WritePort
  (put! [port val fn1-handler]))

(defprotocol Buffer
  (full? [b] "returns true if buffer cannot accept put")
  (remove! [b] "remove and return next item from buffer, called under chan mutex")
  (add!* [b itm] "if room, add item to the buffer, returns b, called under chan mutex")
  (close-buf! [b] "called on chan closed under chan mutex, return ignored"))

(defprotocol Handler
  (active? [h] "returns true if has callback. Must work w/o lock")
  (blockable? [h] "returns true if this handler may be blocked, otherwise it must not block")
  (lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
  (commit [h] "commit to fulfilling its end of the transfer, returns cb. Must be called within lock"))

(defprotocol UnblockingBuffer)

3 channel

지극히 나의 생각에 core.async에서 가장 중요한 기능은 channel에 있다. go macro도 중요한 기능이지만 이것은 상태머신을 만들어 주는 것이지만 그것만으로 core.async가 동작하지는 않는다.

채널은 데이터를 주고 받는 통로이다.

해당 코드를 보면 엄청나게 긴 clojure 코드가 있다. core.async channels source code

여기서 중요한 것은 ManyToManyChannel이다.

(deftype ManyToManyChannel [^LinkedList takes ^LinkedList puts ^Queue buf closed ^Lock mutex add!])

일단 몇가지는 무시하고자 한다. add!, closed 는 무시하자. add! 는 버퍼에 값을 넣을 때 사용하고, closed는 채널이 닫혔는지 확인할 때 사용한다. 채널이 닫히면 그대로 종료되고 대기하는 taker들은 nil을 리턴받고 그대로 종료된다.

3.1 mutex

mutex는 자바의 Lock 인터페이스를 구현한 것이다. 자바의 Lock 인터페이스는 여기에서 확인할 수 있다.

mutex는 채널의 상태를 보호하기 위해 사용된다. 채널에 ReadPort(take!), WritePort(put!) 이 수행될 때 곧바로 mutex를 획득해서 동기화한다.

impl/WritePort
(put! [port val fn1-handler]
  (when (nil? val)
    (throw (Exception. "Can't put nil on channel")))
  (.ock mutex)  ;; mutex 획득

,,,
impl/ReadPort
(take! [port fn1-handler]
  (.lock mutex) ;; mutex 획득

3.2 puts

puts와 타입힌트로 유추할 수 있듯이 LinkedList를 사용한다. puts는 채널에 값을 넣을 때 사용한다. impl/WritePort의 put! 함수의 마지막에 보면 puts에 값을 넣는 것이 보인다.

(do
  (when (and (impl/active? handler) (impl/blockable? handler))
    (assert-unlock mutex
		   (< (.size puts) impl/MAX-QUEUE-SIZE)
		   (str "No more than " impl/MAX-QUEUE-SIZE
			" pending puts are allowed on a single channel."
			" Consider using a windowed buffer."))
    (.add puts [handler val]))
  (.unlock mutex) ;; mutex 해제

특이한 것이 MAX-QUEUE-SIZE가 1024로 정해져있다. 이를 넘으면 예외가 발생한다.

(def ^:const ^{:tag 'int} MAX-QUEUE-SIZE 1024)
#_END_SRC

그럼 버퍼는 언제 사용하는가?

put! 함수를 한정해서 보면 buf에 값을 넣는 경우는 아래와 같다. 

#+BEGIN_SRC clojure
(if (and buf (not (impl/full? buf)) (not (.isEmpty takes)))
  ,,,
  (let [done? (reduced (add! buf val))]

즉, buf에 자리가 있고, takes가 비어있지 않으면 buf에 값을 넣는다. 이렇게 buf의 값이 추가되면 taker들을 가져다가 value를 넘겨준다.

(if (pos? (count buf))
  (let [iter (.iterator takes)
	take-cbs (loop [takers []]
		  (if (and (.hasNext iter) (pos? (count buf)))
		    (let [^Lock taker (.next iter)]  ;; taker를 가져온다. 
		      (.lock taker)
		      (let [ret (and (impl/active? taker) (impl/commit taker))]
			(.unlock taker)
			(if ret
			  (let [val (impl/remove! buf)]
			    (.remove iter)
			    (recur (conj takers (fn [] (ret val)))))  ;; value를 넘겨준다.
			  (recur takers))))
		    takers))]

take-cbs 는 이후에 executor에 의해 실행된다.

(doseq [f take-cbs]
  (dispatch/run f))

만약 위 조건이 false라면 이제 takes가 있는지 간접적으로 다시 테스트한다. 없으면, put-cb take-cb 는 nil이 될 것이다.

(let [iter (.iterator takes)
      [put-cb take-cb] (when (.hasNext iter)  ;; takes를 순회함
			(loop ,,,))]

만약에 put-cb와 take-cb가 있다면 put한 것을 take로 받아간 것이니까 성공시킨다.

(if (and put-cb take-cb)
  (do
    (.unlock mutex)
    (dispatch/run (fn [] (take-cb val)))
    (box true))

아니라면 좀 더 복잡하다.

우선 buf가 있는데 꽉찬게 아닌지 테스트한다. (그럼 take가 없다는 말이 될 것이다. 이미 간점적으로 위에서 테스트했으니까)

(if (and buf (not (impl/full? buf)))
  (do 
    (.lock handler)
    (let [put-cb (and (impl/active? handler) (impl/commit handler))]
      (.unlock handler)
      (if put-cb
	(let [done? (reduced? (add! buf val))]  ;; buf에 값을 넣는다.
	  (when done?
	    (abort this))
	  (.unlock mutex)
	  (box true))
	(do (.unlock mutex)
	  nil)

	)))))

마지막으로 buf가 있는데 꽉찬 경우이다. 이 경우에는 buf에 값을 넣을 수 없으니까 puts에 넣는다. 아까 본 코드이지만 다시 보자.

(do
  (when (and (impl/active? handler) (impl/blockable? handler))
    (assert-unlock mutex
		  (< (.size puts) impl/MAX-QUEUE-SIZE)
		  (str "No more than " impl/MAX-QUEUE-SIZE
			" pending puts are allowed on a single channel."
			" Consider using a windowed buffer."))
    (.add puts [handler val]))
  (.unlock mutex)
  nil)

Date: 2023-08-10 Thu 00:00

Author: 남영환

Created: 2024-01-04 Thu 09:13

Emacs 27.2 (Org mode 9.4.4)

Validate