[230810] core.async - 01
Table of Contents
1 dispatch
전작에서 보면, dispatch/run
을 통해 콜백을 수행하는 것을 보았을 것이다.
dispatch는 스레드 풀을 생성하고 콜백을 수행한다.
clojure.core.async.impl.dispatch
네임스페이스를 보자.
(ns ^{:skip-wiki true} clojure.core.async.impl.dispatch (:require [clojure.core.async.impl.protocols :as impl] [clojure.core.async.impl.exec.threadpool :as tp])) (defonce ^:private in-dispatch (ThreadLocal.)) (defonce executor (delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true)))) (defn in-dispatch-tread? "Returns true if the current thread is a go block dispatch pool thread" [] (boolean (.get ^ThreadLocal in-dispatch))) (defn check-blocking-in-dispatch "If the current thread is a dispatch pool thread, throw an exception" [] (when (.get ^ThreadLocal in-dispatch) (throw (IllegalStateException. "Invalid blocking call in dispatch thread")))) (defn run "Runs Runnable r in a thread pool thread" [^Runnable r] (impl/exec @executor r))
코드는 아주 단순하다. run을 수행하면 초기에 만든 스레드풀(executor)에서 콜백을 수행한다.
ThreadLocal
로 각 스레드마다 in-dispatch라는 변수에 true
flag를 넣어서
현재 스레드가 core.async에서 생성한 스레드인지 확인할 수 있다. ( in-dispatch-thread?
)
또한 core.async에서 만든 스레드 에서는 절대 블로킹이 되면 안된다. 제한된 스레드에서 블로킹이 되면,
작업이 밀리는 현상이 발생할 수 있기 때문이다. ( check-blocking-in-dispatch
)
2 protocols - Executor
protocols은 단순히 함께 사용할 기능들을 한 곳에 모아둔 곳이다. java의 interface와 비슷하다. 처음엔 장황하게 생각했으나, 이것이 있음으로서 core.async에서 주요 기능들을 한 곳에 모아둘 수 있어 좋고, 기능들이 모여있어서 무엇을 위한 것인지 알기 쉽다.
(ns clojure.core.async.impl.protocols) (defprotocol Executor (exec [e runnable] "execute runnable asynchronously"))
3 thread pool
하지만 진짜 스레드풀을 만드는 곳은 clojure.core.async.impl.exec.threadpool
이다.
pool-size
로 디본적으로 8개의 스레드를 생성한다. 물론 "clojure.core.async.pool-size"
시스템 프로퍼티를 통해
수정할 수 있다.
thread-pool-executor
는 Executor
프로토콜을 구현한 객체를 리턴한다.
이녀석이 수행하는 주요한 작업은
pool-size
만큼 스레드를 생성한다.Executor
프로토콜을 구현한 객체를 리턴한다.conc/counted-thread-factory
를 통해 스레드 이름을 지정한다.counted-thread-factory
이름을 보니 아마도 스레드 이름을 원하는 방식으로 지정할 수 있게 ThreadFactory를 구현한 것 같다.
(ns clojure.core.async.impl.exec.threadpool (:require [clojure.core.async.impl.protocols :as impl] [clojure.core.async.impl.concurrent :as conc]) (:import [java.util.concurrent Executors])) (def ^:private pool-size "Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a delay so property can be set from code after core.async namespace is loaded but before any use of the async thread pool." (delay (or (Long/getLong "clojure.core.async.pool-size") 8))) (defn thread-pool-executor ([] (thread-pool-executor nil)) ([init-fn] (let [executor-svc (Executors/newFixedThreadPool @pool-size (conc/counted-thread-factory "async-dispatch-%d" true {:init-fn init-fn}))] (reify impl/Executor (impl/exec [_ r] (.execute executor-svc ^Runnable r))))))
4 thread factory (concurrent)
위에서 예상 한 것처럼 counted-thread-factory
는 스레드 이름을 지정할 수 있게 해주는 것 같다.
그리고 counter (atom 0)
을 통해 각 스레드에 고유한 숫자를 부여한다.
ThreadFactory에 대한 자세한 내용은 여기를 참고하자.
또한 Brian Goetz 의 [자바 병렬 프로그래밍(Java Concurrency in Practice)] 책의 8.5(ThreadFactory interface) 장을 참고하자.
(ns ^{:skip-wiki true} clojure.core.async.impl.concurrent (:import [java.util.concurrent ThreadFactory])) (set! *warn-on-reflection* true) (defn counted-thread-factory "Create a ThreadFactory that maintains a counter for naming Threads. name-format specifies thread names - use %d to include counter daemon is a flag for whether threads are daemons or not opts is an options map: init-fn - function to run when thread is created" ([name-format daemon] (counted-thread-factory name-format daemon nil)) ([name-format daemon {:keys [init-fn] :as opts}] (let [counter (atom 0)] (reify ThreadFactory (newThread [_this runnable] (let [body (if init-fn (fn [] (init-fn) (.run ^Runnable runnable)) runnable) t (Thread. ^Runnable body)] (doto t (.setName (format name-format (swap! counter inc))) (.setDaemon daemon)))))))) (defonce ^{:doc "Number of processors reported by the JVM"} processors (.availableProcessors (Runtime/getRuntime)))