[230810] core.async - 01

Table of Contents

1 dispatch

전작에서 보면, dispatch/run 을 통해 콜백을 수행하는 것을 보았을 것이다. dispatch는 스레드 풀을 생성하고 콜백을 수행한다.

clojure.core.async.impl.dispatch 네임스페이스를 보자.

(ns ^{:skip-wiki true} 
  clojure.core.async.impl.dispatch
  (:require [clojure.core.async.impl.protocols :as impl]
	    [clojure.core.async.impl.exec.threadpool :as tp]))  

(defonce ^:private in-dispatch (ThreadLocal.))

(defonce executor
  (delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true))))

(defn in-dispatch-tread?
  "Returns true if the current thread is a go block dispatch pool thread"
  []
  (boolean (.get ^ThreadLocal in-dispatch)))

(defn check-blocking-in-dispatch
  "If the current thread is a dispatch pool thread, throw an exception"
  []
  (when (.get ^ThreadLocal in-dispatch)
    (throw (IllegalStateException. "Invalid blocking call in dispatch thread"))))

(defn run
  "Runs Runnable r in a thread pool thread"
  [^Runnable r]
  (impl/exec @executor r))

코드는 아주 단순하다. run을 수행하면 초기에 만든 스레드풀(executor)에서 콜백을 수행한다.

ThreadLocal 로 각 스레드마다 in-dispatch라는 변수에 true flag를 넣어서 현재 스레드가 core.async에서 생성한 스레드인지 확인할 수 있다. ( in-dispatch-thread? )

또한 core.async에서 만든 스레드 에서는 절대 블로킹이 되면 안된다. 제한된 스레드에서 블로킹이 되면, 작업이 밀리는 현상이 발생할 수 있기 때문이다. ( check-blocking-in-dispatch )

2 protocols - Executor

protocols은 단순히 함께 사용할 기능들을 한 곳에 모아둔 곳이다. java의 interface와 비슷하다. 처음엔 장황하게 생각했으나, 이것이 있음으로서 core.async에서 주요 기능들을 한 곳에 모아둘 수 있어 좋고, 기능들이 모여있어서 무엇을 위한 것인지 알기 쉽다.

(ns clojure.core.async.impl.protocols)

(defprotocol Executor
  (exec [e runnable] "execute runnable asynchronously"))

3 thread pool

하지만 진짜 스레드풀을 만드는 곳은 clojure.core.async.impl.exec.threadpool 이다.

pool-size 로 디본적으로 8개의 스레드를 생성한다. 물론 "clojure.core.async.pool-size" 시스템 프로퍼티를 통해 수정할 수 있다.

thread-pool-executorExecutor 프로토콜을 구현한 객체를 리턴한다.

이녀석이 수행하는 주요한 작업은

  • pool-size 만큼 스레드를 생성한다.
  • Executor 프로토콜을 구현한 객체를 리턴한다.
  • conc/counted-thread-factory 를 통해 스레드 이름을 지정한다. counted-thread-factory 이름을 보니 아마도 스레드 이름을 원하는 방식으로 지정할 수 있게 ThreadFactory를 구현한 것 같다.
(ns clojure.core.async.impl.exec.threadpool
  (:require [clojure.core.async.impl.protocols :as impl]
	    [clojure.core.async.impl.concurrent :as conc])
  (:import [java.util.concurrent Executors]))

(def ^:private pool-size
  "Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a
   delay so property can be set from code after core.async namespace is loaded but before
   any use of the async thread pool."
  (delay (or (Long/getLong "clojure.core.async.pool-size") 8)))

(defn thread-pool-executor
  ([]
    (thread-pool-executor nil))
  ([init-fn]
   (let [executor-svc (Executors/newFixedThreadPool
			@pool-size
			(conc/counted-thread-factory "async-dispatch-%d" true
			  {:init-fn init-fn}))]
     (reify impl/Executor
       (impl/exec [_ r]
	 (.execute executor-svc ^Runnable r))))))

4 thread factory (concurrent)

위에서 예상 한 것처럼 counted-thread-factory 는 스레드 이름을 지정할 수 있게 해주는 것 같다. 그리고 counter (atom 0) 을 통해 각 스레드에 고유한 숫자를 부여한다. ThreadFactory에 대한 자세한 내용은 여기를 참고하자. 또한 Brian Goetz 의 [자바 병렬 프로그래밍(Java Concurrency in Practice)] 책의 8.5(ThreadFactory interface) 장을 참고하자.

(ns ^{:skip-wiki true}
  clojure.core.async.impl.concurrent
  (:import [java.util.concurrent ThreadFactory]))

(set! *warn-on-reflection* true)

(defn counted-thread-factory
  "Create a ThreadFactory that maintains a counter for naming Threads.
     name-format specifies thread names - use %d to include counter
     daemon is a flag for whether threads are daemons or not
     opts is an options map:
       init-fn - function to run when thread is created"
  ([name-format daemon]
    (counted-thread-factory name-format daemon nil))
  ([name-format daemon {:keys [init-fn] :as opts}]
   (let [counter (atom 0)]
     (reify
       ThreadFactory
       (newThread [_this runnable]
	 (let [body (if init-fn
		      (fn [] (init-fn) (.run ^Runnable runnable))
		      runnable)
	       t (Thread. ^Runnable body)]
	   (doto t
	     (.setName (format name-format (swap! counter inc)))
	     (.setDaemon daemon))))))))

(defonce
  ^{:doc "Number of processors reported by the JVM"}
  processors (.availableProcessors (Runtime/getRuntime)))

Date: 2023-08-10 Thu 00:00

Author: 남영환

Created: 2024-08-31 Sat 15:59

Emacs 27.2 (Org mode 9.4.4)

Validate