[230810] core.async - 01
Table of Contents
1 dispatch
전작에서 보면, dispatch/run 을 통해 콜백을 수행하는 것을 보았을 것이다.
dispatch는 스레드 풀을 생성하고 콜백을 수행한다.
clojure.core.async.impl.dispatch 네임스페이스를 보자.
(ns ^{:skip-wiki true}
clojure.core.async.impl.dispatch
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.exec.threadpool :as tp]))
(defonce ^:private in-dispatch (ThreadLocal.))
(defonce executor
(delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true))))
(defn in-dispatch-tread?
"Returns true if the current thread is a go block dispatch pool thread"
[]
(boolean (.get ^ThreadLocal in-dispatch)))
(defn check-blocking-in-dispatch
"If the current thread is a dispatch pool thread, throw an exception"
[]
(when (.get ^ThreadLocal in-dispatch)
(throw (IllegalStateException. "Invalid blocking call in dispatch thread"))))
(defn run
"Runs Runnable r in a thread pool thread"
[^Runnable r]
(impl/exec @executor r))
코드는 아주 단순하다. run을 수행하면 초기에 만든 스레드풀(executor)에서 콜백을 수행한다.
ThreadLocal 로 각 스레드마다 in-dispatch라는 변수에 true flag를 넣어서
현재 스레드가 core.async에서 생성한 스레드인지 확인할 수 있다. ( in-dispatch-thread? )
또한 core.async에서 만든 스레드 에서는 절대 블로킹이 되면 안된다. 제한된 스레드에서 블로킹이 되면,
작업이 밀리는 현상이 발생할 수 있기 때문이다. ( check-blocking-in-dispatch )
2 protocols - Executor
protocols은 단순히 함께 사용할 기능들을 한 곳에 모아둔 곳이다. java의 interface와 비슷하다. 처음엔 장황하게 생각했으나, 이것이 있음으로서 core.async에서 주요 기능들을 한 곳에 모아둘 수 있어 좋고, 기능들이 모여있어서 무엇을 위한 것인지 알기 쉽다.
(ns clojure.core.async.impl.protocols) (defprotocol Executor (exec [e runnable] "execute runnable asynchronously"))
3 thread pool
하지만 진짜 스레드풀을 만드는 곳은 clojure.core.async.impl.exec.threadpool 이다.
pool-size 로 디본적으로 8개의 스레드를 생성한다. 물론 "clojure.core.async.pool-size" 시스템 프로퍼티를 통해
수정할 수 있다.
thread-pool-executor 는 Executor 프로토콜을 구현한 객체를 리턴한다.
이녀석이 수행하는 주요한 작업은
pool-size만큼 스레드를 생성한다.Executor프로토콜을 구현한 객체를 리턴한다.conc/counted-thread-factory를 통해 스레드 이름을 지정한다.counted-thread-factory이름을 보니 아마도 스레드 이름을 원하는 방식으로 지정할 수 있게 ThreadFactory를 구현한 것 같다.
(ns clojure.core.async.impl.exec.threadpool
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.concurrent :as conc])
(:import [java.util.concurrent Executors]))
(def ^:private pool-size
"Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a
delay so property can be set from code after core.async namespace is loaded but before
any use of the async thread pool."
(delay (or (Long/getLong "clojure.core.async.pool-size") 8)))
(defn thread-pool-executor
([]
(thread-pool-executor nil))
([init-fn]
(let [executor-svc (Executors/newFixedThreadPool
@pool-size
(conc/counted-thread-factory "async-dispatch-%d" true
{:init-fn init-fn}))]
(reify impl/Executor
(impl/exec [_ r]
(.execute executor-svc ^Runnable r))))))
4 thread factory (concurrent)
위에서 예상 한 것처럼 counted-thread-factory 는 스레드 이름을 지정할 수 있게 해주는 것 같다.
그리고 counter (atom 0) 을 통해 각 스레드에 고유한 숫자를 부여한다.
ThreadFactory에 대한 자세한 내용은 여기를 참고하자.
또한 Brian Goetz 의 [자바 병렬 프로그래밍(Java Concurrency in Practice)] 책의 8.5(ThreadFactory interface) 장을 참고하자.
(ns ^{:skip-wiki true}
clojure.core.async.impl.concurrent
(:import [java.util.concurrent ThreadFactory]))
(set! *warn-on-reflection* true)
(defn counted-thread-factory
"Create a ThreadFactory that maintains a counter for naming Threads.
name-format specifies thread names - use %d to include counter
daemon is a flag for whether threads are daemons or not
opts is an options map:
init-fn - function to run when thread is created"
([name-format daemon]
(counted-thread-factory name-format daemon nil))
([name-format daemon {:keys [init-fn] :as opts}]
(let [counter (atom 0)]
(reify
ThreadFactory
(newThread [_this runnable]
(let [body (if init-fn
(fn [] (init-fn) (.run ^Runnable runnable))
runnable)
t (Thread. ^Runnable body)]
(doto t
(.setName (format name-format (swap! counter inc)))
(.setDaemon daemon))))))))
(defonce
^{:doc "Number of processors reported by the JVM"}
processors (.availableProcessors (Runtime/getRuntime)))