[20240817] XTDB Internals 2 - LocalBufferPool

Table of Contents

이번에는 MemoryBufferPool 에 이어 LocalBufferPool 을 알아보자. [bufferpool.clj]​

  1. 구성 요소:
    • allocator : Arrow 메모리 할당기
    • memory-store : 메모리 내 캐시 (Caffeign 라이브러리 사용)
    • disk-store : 로컬 디스크 저장소 경로
    • max-size : 최대 캐시 크기
  2. 주요 기능 :
    • getBuffer : 주어진 키에 해당하는 버퍼를 가져온다. 먼저 메모리 캐시를 확인하고, 없으면 디스크에서 로드한다.
    • putObject : 버퍼를 디스크에 저장한다.
    • listAllObjects, listObjects : 저장된 객체들의 목록을 반환한다.
    • openArrowWriter : Arrow 파일 작성을 위한 ArrowWriter 인터페이스를 제공한다.
  3. 캐싱 전략 :
    • 메모리 캐시는 Caffeign 라이브러리를 사용하여 구현, 최대 크기와 가중치 기반 제거 정책을 가짐.
    • 디스크 저장소는 파일 시스템을 사용하여 구현
  4. 동시성 처리 :
    • 메모리 스토어에 대한 접근은 동기화되어 있어 스레드 안전성을 보장함.
  5. 리소스 관리 :
    • Closeable 인터페이스를 구현하여 사용이 끝나면 리소스를 정리
  6. 예외 처리 :
    • 객체가 존재하지 않을 경우 적절한 예외를 발생시킨다.

LocalBufferPool은 로컬 환경에서 XTDB의 성능을 최적화 하기 위한 중요한 컴포넌트로, 메모리와 디스크를 효율적으로 활용하여 데이터 접근 속도를 향상시키는 역할을 한다.

(defrecord LocalBufferPool [allocator, ^Cache memory-store, ^Path disk-store, ^long max-size]
  IBufferPool
  (getBuffer [_ k]
    (when k
      (or (cache-get memory-store k)
	  (locking disk-store
	    (let [buffer-cache-path (.resolve disk-store k)]
	      ;; TODO could this not race with eviction? e.g exists for this cond, but is evicted before we can map the file into the cache?
	      (when-not (util/path-exists buffer-cache-path)
		(throw (os/obj-missing-exception k)))

	      (try
		(let [nio-buffer (util/->mmap-path buffer-cache-path)]
		  (cache-compute memory-store k #(util/->arrow-buf-view allocator nio-buffer)))
		(catch ClosedByInterruptException _
		  (throw (InterruptedException.)))))))))

  (putObject [_ k buffer]
    (try
      (let [tmp-path (create-tmp-path disk-store)]
	(util/write-buffer-to-path buffer tmp-path)

	(let [file-path (.resolve disk-store k)]
	  (util/create-parents file-path)
	  (util/atomic-move tmp-path file-path)))
      (catch ClosedByInterruptException _
	(throw (InterruptedException.)))))

  (listAllObjects [_]
    (util/with-open [dir-stream (Files/walk disk-store (make-array FileVisitOption 0))]
      (vec (sort (for [^Path path (iterator-seq (.iterator dir-stream))
		       :let [relativized (.relativize disk-store path)]
		       :when (and (Files/isRegularFile path (make-array LinkOption 0))
				  (not (.startsWith relativized ".tmp")))]
		   relativized)))))

  (listObjects [_ dir]
    (let [dir (.resolve disk-store dir)]
      (if (Files/exists dir (make-array LinkOption 0))
	(util/with-open [dir-stream (Files/newDirectoryStream dir)]
	  (vec (sort (for [^Path path dir-stream]
		       (.relativize disk-store path)))))
	[])))

  (openArrowWriter [_ k rel]
    (let [tmp-path (create-tmp-path disk-store)]
      (util/with-close-on-catch [file-ch (util/->file-channel tmp-path util/write-truncate-open-opts)
				 unl (.startUnload rel file-ch)]
	(reify ArrowWriter
	  (writeBatch [_]
	    (try
	      (.writeBatch unl)
	      (catch ClosedByInterruptException e
		(throw (InterruptedException.)))))

	  (end [_]
	    (.endFile unl)
	    (.close file-ch)

	    (let [file-path (.resolve disk-store k)]
	      (util/create-parents file-path)
	      (util/atomic-move tmp-path file-path)))

	  (close [_]
	    (util/close unl)
	    (when (.isOpen file-ch)
	      (.close file-ch)))))))

  EvictBufferTest
  (evict-cached-buffer! [_ k]
    (.invalidate memory-store k)
    (.awaitQuiescence (ForkJoinPool/commonPool) 100 TimeUnit/MILLISECONDS))

  Closeable
  (close [_]
    (free-memory memory-store)
    (util/close allocator)))

1 open-local-storage (초기화)

실제로 초기화하는 코드는 아래와 같다.

(defn ->memory-buffer-cache ^Cache [^long max-cache-bytes]
  (-> (Caffeine/newBuilder)
      (.maximumWeight max-cache-bytes)
      (.weigher (reify Weigher
		  (weigh [_ _k v]
		    (.capacity ^ArrowBuf v))))
      (.removeListener (reify RemovalListener
			 (onRemoval [_ _k v _]
			   (util/close v))))
      (.build)))

(defn open-local-storage ^xtdb.IBufferPool [^BufferAllocator allocator, ^Storage$LocalStorageFactory factory]
  (->LocalBufferPool (.newChildAllocator allocator "buffer-pool" 0 Long/MAX_VALUE)
		     (->memory-buffer-cache (.getMaxCacheBytes factory))
		     (doto (-> (.getPath factory) (.resolve storage-root)) util/mkdirs)
		     (.getMaxCacheBytes factory)))

  1. 주요동작
    • 새로운 LocalBufferPool 인스턴스 생성
    • Arrow 메모리 할당기의 자식 할당기를 생성
    • 메모리 버퍼 캐시 초기화
    • 로컬 디스크 저장소 경로를 설정하여 필요한 디렉토리를 생성함
  2. 메모리 관리
    • allocator.newChildAllocator 를 사용하여 버퍼 풀을 위한 전용 메모리 할당기를 생성함.
    • 최대 메모리 크기는 Long/MAXVALUE 로 설정되어 있음. 실질적으로 시스템 메모리에 의해서만 제한됨
  3. 캐시 설정
    • ->memory-buffer-cache 함수를 호출하여 Caffeign 기반의 메모리 캐시를 초기화한다.
    • 캐시의 최대 크기는 factory.getMaxCacheBytes() 로 설정됩니다.
  4. 디스크 저장소
    • factory.getPath() 로 지정된 경로에 storage-root 를 추가하여 디스크 저장소 경로를 설정함.
    • util/mkdirs 함수로 필요한 디렉토리 구조 생성
  5. 버전 관리
    • storage-root 에는 버전 정보가 포함되어 있어, 스토리지 포맷의 변경을 관리할 수 있다.

2 Storage$LocalStorageFactory

  1. 구조:
    • @Serializable 어노테이션이 붙어 있어 직렬화가 가능합니다.
    • @SerialName("!Local")로 명명되어 있어, 직렬화 시 이 이름으로 식별됩니다.
    • Storage.Factory 인터페이스를 구현합니다.
  2. 주요 속성:
    • path: 데이터가 저장될 디렉토리 경로
    • maxCacheEntries: 최대 캐시 항목 수 (기본값: 1024)
    • maxCacheBytes: 최대 캐시 크기 (바이트 단위, 기본값: 536,870,912 바이트 또는 512MB)
  3. 저장소 생성:
    • openStorage 메서드는 BufferAllocator를 받아 IBufferPool 인스턴스를 반환합니다.
  4. 사용예시:

     Xtdb.openNode {
       localStorage(path = Paths.get("test-path")) {
         maxCacheEntries = 1024
         maxCacheBytes = 536870912
       }
       // 기타 설정...
    }
    
  @file:UseSerializers(PathWithEnvVarSerde::class)

package xtdb.api.storage

object Storage {
  /**
    * Represents a factory interface for creating storage instances.
    * The default implementation is [InMemoryStorageFactory] which stores data in memory
    */
  @Serializable
  sealed interface Factory {
    fun openStorage(allocator: BufferAllocator): IBufferPool
  }

  /**
   * Implementation for the storage module that persists data to the local file system, under the **path** directory.
   *
   * Example usage, as part of a node config:
   * ```kotlin
   * Xtdb.openNode {
   *    localStorage(path = Paths.get("test-path")) {
   *        maxCacheEntries = 1024,
   *        maxCacheBytes = 536870912
   *    },
   *    ...
   * }
   * ```
   * 
   * @property path The directory path where data will be stored.
   */
  @Serializable
  @SerialName("!Local")
  data class LocalStorageFactory(
    val path: Path,
    var maxCacheEntries: Long = 1024,
    var maxCacheBytes: Long = 536870912,
  ) : Factory {
    fun maxCacheEntries(maxCacheEntries: Long) = apply { this.maxCacheEntries = maxCacheEntries }
    fun maxCacheBytes(maxCacheBytes: Long) = apply { this.maxCacheBytes = maxCacheBytes }
    override fun openStorage(allocator: BufferAllocator) =
	requiringResolve("xtdb.buffer-pool/open-local-storage").invoke(allocator, this) as IBufferPool
  }

  @JvmStatic
  fun localStorage(path: Path) = LocalStorageFactory(path)

  @JvmSynthetic
  fun Xtdb.Config.localStorage(path: Path, configure: LocalStorageFactory.() -> Unit) =
      storage(LocalStorageFactory(path).also(configure))
}

3 requireResolve

[xtdb.util.Require.kt]​

@file:JvmSynthetic
package xtdb.util

import clojure.java.api.Clojure
import clojure.lang.Symbol
import clojure.lang.Var

private val REQUIRING_RESOLVE = Clojure.`var`("clojure.core", "requiring-resolve")

fun requiringResolve(nsname: String) = REQUIRING_RESOLVE.invoke(Symbol.intern(nsname)) as Var
  1. 파일수준 어노테이션
    • @file:JvmSynthetic 이 어노테이션은 이 파일의 모든 선언이 Java에서 사용되지 않을 것임을 나타냅니다. Kotlin에서만 사용될 내용임을 컴파일러에게 알려줍니다.
  2. Clojure 함수 참조
    • private val REQUIRING_RESOLVE = Clojure.`var`("clojure.core", "requiring-resolve")
    • 이 줄은 Clojure의 clojure.core/requiring-resolve 함수에 대한 참조를 가져옵니다. 이 함수는 Clojure에서 네임스페이스를 동적으로 로드하고 심볼을 해결하는 데 사용됩니다.
  3. requiringResolve 함수 정의:
    • fun requiringResolve(nsname: String) = REQUIRING_RESOLVE.invoke(Symbol.intern(nsname)) as Var
    • 이 함수는 문자열 nsname을 받아 Clojure Var 객체를 반환합니다.
    • Symbol.intern(nsname)은 주어진 문자열을 Clojure 심볼로 변환합니다.
    • REQUIRINGRESOLVE.invoke(…)는 Clojure의 requiring-resolve 함수를 호출합니다.
    • 결과를 Var 타입으로 캐스팅합니다.

이것으로 이전에 본 openStorage 메서드가 어떻게 동작하는지 더 잘 이해할 수 있다.

  1. requiringResolve("xtdb.buffer-pool/open-local-storage") 가 호출되면, 이는 Clojure의 requiring-resolve 함수를 사용하여 xtdb.buffer-pool 네임스페이스를 로드하고 open-local-storage 함수에 대한 참조를 가져온다.
  2. 반환된 Var 객체의 invoke 메서드를 호출하여 실제 Clojure 함수를 실행

이 방식은 XTDB가 Kotlin/Java와 Clojure 코드를 효과적으로 연결하고, 필요한 기능을 동적으로 로드할 수 있게 해줍니다. 이는 XTDB의 모듈화된 아키텍처와 확장성을 지원하는 핵심 메커니즘입니다.

3.1 requiring-resolve

clojure 함수 requiring-resolve 가 하는 일은 그럼 무엇인가?

  1. 심볼 해결: 주어진 네임스페이스-한정 심볼(namespace-qualified symbol)을 해결하려고 시도합니다.
  2. 자동 require: 만약 초기 해결 시도가 실패하면, 해당 심볼의 네임스페이스를 자동으로 require(로드)하고 다시 해결을 시도합니다.
  3. 지연 로딩: 필요한 시점에 네임스페이스를 로드할 수 있게 해줍니다.
  4. 에러 처리: 심볼 해결에 실패하면 예외를 발생시킵니다.
(defn requiring-resolve
  "Resolves namespace-qualified sym per 'resolve'. If initial resolve
fails, attempts to require sym's namespace and retries."
  {:added "1.10"}
  [sym]
  (if (qualified-symbol? sym)
    (or (resolve sym)
	(do (-> sym namespace symbol serialized-require)
	    (resolve sym)))
    (throw (IllegalArgumentException. (str "Not a qualified symbol: " sym)))))

사용예시:

(defn debug? [] (= (env :debug?) "true"))

;; If you need to do conditional requires such as the one below:
(if (debug?)
  (do
    (require '[nrepl.cmdline :refer [-main] :rename {-main -nrepl-main}])
    (resolve 'nrepl.cmdline))
  (declare -nrepl-main))

(defn -main [& args]
  (when (debug?)
    (future (apply -nrepl-main args))))

;; This is strictly better!
(defn -main [& args]
  (when (debug?)
    ;; Swap to require and resolve in one step!
    (future (apply (requiring-resolve 'nrepl.cmdline/-main) args))))

기본적으로 resolve 함수는 주어진 심볼에 해당하는 var 를 찾아 반환하는 것이다.

여기서 require 는 네임스페이스의 코드를 읽어 로드하는 것이다. requiring-resolve 는 코드를 로드하는 과정까지 함께 한다.

XTDB와 같은 시스템에서는 이 두 개념이 동적으로 버퍼풀을 초기화할 때 아래 두 기능이 필요한 것이다.

  • require는 필요한 모듈이나 라이브러리를 로드하기
  • resolve는 런타임에 필요한 특정 기능을 동적으로 찾아내기

requiring-resolve는 이 둘을 결합하여, 필요한 네임스페이스를 자동으로 로드하면서 동시에 특정 심볼을 해결할 수 있게 해준다.

4 LocalStorage 를 호출하는 곳

우리는 위에서 LocalStorageFactory 에 대해 알아보았다.

그러면 처음 LocalStorage 를 생성하는 곳은 어디인가?

내가 보기엔 일반적으로 이곳으로 시작할 것 같다.

(defmethod xtn/apply-config! ::local [^Xtdb$Config config _ {:keys [path max-cache-bytes max-cache-entries]}]
  (.storage config (cond-> (Storage/localStorage (util/->path path))
		     max-cache-bytes (.maxCacheBytes max-cache-bytes)
		     max-cache-entries (.maxCacheEntries max-cache-entries))))

path 값은 어떻게 가져오는 것일까? 공식문서에 보면 설정을 하는 방법이 제시되어있다.

[xtdb/storage.html]​

storage: !Local
  # -- required

  # The path to the local directory to persist the data to.
  # (Can be set as an !Env value)
  path: /var/lib/xtdb/storage

  # -- optional

  # The maximum number of entries to store in the in-memory cache.
  # maxCacheEntries: 1024

  # The maximum number of bytes to store in the in-memory cache.
  # maxCacheBytes: 536870912

4.1 (.storage config ,,,)

(.storage config ,,,) 함수는 딱봐도 storage 를 이곳에 저장하는 기능 같지만 혹시 모르니 한번 확인해보자.

package xtdb.api

object Xtdb {

    @Serializable
    data class Config(
	var txLog: Log.Factory = inMemoryLog(),
	var storage: Storage.Factory = inMemoryStorage(),
	var metrics: Metrics.Factory? = null,
	var defaultTz: ZoneId = ZoneOffset.UTC,
	@JvmField val indexer: IndexerConfig = IndexerConfig()
    ) {
	private val modules: MutableList<XtdbModule.Factory> = mutableListOf()

	fun storage(storage: Storage.Factory) = apply { this.storage = storage }

	fun getModules(): List<XtdbModule.Factory> = modules
	fun module(module: XtdbModule.Factory) = apply { this.modules += module }
	fun modules(vararg modules: XtdbModule.Factory) = apply { this.modules += modules }
	fun modules(modules: List<XtdbModule.Factory>) = apply { this.modules += modules }

	@JvmSynthetic
	fun indexer(configure: IndexerConfig.() -> Unit) = apply { indexer.configure() }

	fun open() = requiringResolve("xtdb.node.impl/open-node").invoke(this) as IXtdb
	fun txLog(txLog: Log.Factory) = apply { this.txLog = txLog }
	fun defaultTz(defaultTz: ZoneId) = apply { this.defaultTz = defaultTz }

	@JvmSynthetic
	fun metrics(metrics: Metrics.Factory) = apply { this.metrics = metrics }
    }

    @JvmStatic
    fun configure() = Config()

    @JvmStatic
    @JvmOverloads
    fun openNode(config: Config = Config()) = config.open()

    /**
     * Opens a node using a YAML configuration file - will throw an exception if the specified path does not exist
     * or is not a valid `.yaml` extension file.
     */
    @JvmStatic
    fun openNode(path: Path): IXtdb {
	if (path.extension != "yaml") {
	    throw IllegalArgumentException("Invalid config file type - must be '.yaml'")
	} else if (!path.toFile().exists()) {
	    throw IllegalArgumentException("Provided config file does not exist")
	}

	val yamlString = Files.readString(path)
	val config = nodeConfig(yamlString)

	return config.open()
    }

    @JvmSynthetic
    fun openNode(configure: Config.() -> Unit) = openNode(Config().also(configure))
}

그리고 아래 localStorage 함수를 이용하여 LocalStorageFactory를 만드는 함수가 있긴하지만 실제로 사용하는 코드인지는 모르겠다. 테스트용으로 만든 코드 같기도 하다.

(defn dir->buffer-pool
  "Creates a local storage buffer pool from the given directory."
  ^xtdb.IBufferPool [^BufferAllocator allocator, ^Path dir]
  (let [bp-path (util/tmp-dir "tmp-buffer-pool")
	storage-root (.resolve bp-path storage-root)]
    (util/copy-dir dir storage-root)
    (.openStorage (Storage/localStorage bp-path) allocator)))

5 getBuffer

  1. 키(k) 가 있는지 확인
  2. cache에 있는지 확인
  3. 디스크 저장소 동기화 접근: locking 으로 disk-store(Path) 동기화
  4. 버퍼 캐시 경로 생성 : disk-store에서 키 k에 해당하는 경로를 생성합니다.
  5. 파일 존재 확인: 없으면 예외
  6. 파일을 메모리에 매핑 : (util/->mmap-path buffer-cache-path)
  7. 메모리 캐시에 버퍼 저장 : (cache-compute memory-store k #(util/->arrow-buf-view allocator nio-buffer)) 메모리 매핑된 버퍼를 Arrow 버퍼 뷰로 변환하고 메모리 캐시에 저장합니다.

TODO 주석은 중요한 고려사항:

  • 버퍼 확인과 실제 로딩 사이에 경쟁 조건(race condition)이 발생할 수 있음을 지적.
  • 파일 존재 확인 후 실제 매핑 전에 파일이 제거될 수 있는 상황을 고려해야 함을 나타냄.
(getBuffer [_ k]
    (when k
      (or (cache-get memory-store k)
	  (locking disk-store
	    (let [buffer-cache-path (.resolve disk-store k)]
	      ;; TODO could this not race with eviction? e.g exists for this cond, but is evicted before we can map the file into the cache?

	      (when-not (util/path-exists buffer-cache-path)
		(throw (os/obj-missing-exception k)))

	      (try
		(let [nio-buffer (util/->mmap-path buffer-cache-path)]
		  (cache-compute memory-store k #(util/->arrow-buf-view allocator nio-buffer)))
		(catch ClosedByInterruptException _
		  (throw (InterruptedException.)))))))))

5.1 ->mmap-path

->mmap-path 함수는 Java NIO(New I/O)의 메모리 매핑 기능을 사용하여 파일을 메모리에 매핑하는 유틸리티 함수.

(defn ->mmap-path
  (^java.nio.MappedByteBuffer [^Path path]
   (->mmap-path path FileChannel$MapMode/READ_ONLY))

  (^java.nio.MappedByteBuffer [^Path path ^FileChannel$MapMode map-mode]
   (with-open [in (->file-channel path (if (= FileChannel$MapMode/READ_ONLY map-mode)
					 #{:read}
					 #{:read :write}))]
     (.map in map-mode 0 (.size in))))

  (^java.nio.MappedByteBuffer [^Path path ^FileChannel$MapMode map-mode ^long start ^long len]
   (with-open [in (->file-channel path (if (= FileChannel$MapMode/READ_ONLY map-mode)
					 #{:read}
					 #{:read :write}))]
     (.map in map-mode start len))))

Direct Memory와 Memory-Mapped 파일의 차이점 정리함.

Direct Memory:

  • JVM 힙 외부 메모리임
  • 네이티브 I/O 작업용
  • ByteBuffer.allocateDirect()로 할당
  • GC 영향 없음, 네이티브 I/O에 효율적
  • 명시적 메모리 관리 필요

Memory-Mapped 파일 (->mmap-path 생성):

  • 파일을 가상 메모리에 매핑
  • 대용량 파일 고속 읽기/쓰기용
  • FileChannel.map()으로 생성
  • 대용량 파일 처리 효율적
  • 가상 메모리 사용량 주의 필요

주요 차이:

  1. 용도: Direct Memory는 네트워크 I/O, Memory-Mapped는 파일 처리
  2. 데이터 소스: Direct Memory는 순수 메모리, Memory-Mapped는 파일 기반
  3. 지속성: Direct Memory는 휘발성, Memory-Mapped는 지속적
  4. 크기 제한: Direct Memory는 시스템 메모리, Memory-Mapped는 파일 시스템 용량 기준

XTDB 등 데이터베이스 시스템에서 활용:

  • Direct Memory: 캐시, 임시 데이터 구조에 적합
  • Memory-Mapped 파일: 데이터 파일, 로그 파일 등 영구 저장소에 적합

결론: 둘 다 효율적 I/O 처리 메커니즘 제공. 사용 사례와 요구사항에 따라 선택 필요.

5.2 getBuffer 는 어떻게 사용되는가.

테스트 코드를 구경해보자.

이렇게 getBuffer 는 가져올 때, 문자열 값을 Path 로 변환하는 로직이 있다. 아마도 키 값을 우리는 keyword로 요청하면 그것을 문자열로 바꾸고 또 Path 로 변경해서 사용하는 것인가? 싶다.

(util/with-open [buf (.getBuffer bp (util/->path "aw"))]
  (let [{:keys [root]} (util/read-arrow-buf buf)]
    (util/close root)))

실제로 사용하는 코드는 어디있을까?

5.2.1 xtdb.metadata

load-chunks-metadata 함수를 그대로 설정.

(defn- get-bytes ^bytes [^IBufferPool buffer-pool, ^Path obj-key]
  (util/with-open [buffer (.getBuffer buffer-pool obj-key)]
    (let [bb (.nioBuffer buffer 0 (.capacity buffer))
	  ba (byte-array (.remaining bb))]
      (.get bb ba)
      ba)))

(defn- load-chunks-metadata ^java.util.NavigableMap [{:keys [^IBufferPool buffer-pool]}]
  (let [cm (TreeMap.)]
    (doseq [cm-obj-key (.listObjects buffer-pool chunk-metadata-path)]
      (with-open [is (ByteArrayInputStream. (get-bytes buffer-pool cm-obj-key))]
	(let [rdr (transit/reader is :json {:handlers (merge serde/transit-read-handlers
							     arrow-read-handlers)})]
	  (.put cm (obj-key->chunk-idx cm-obj-key) (transit/read rdr)))))
    cm))

이 함수는 load-chunks-metadata 초기화할 때 바로 사용함.

(defmethod ig/prep-key ::metadata-manager [_ opts]
  (merge {:buffer-pool (ig/ref :xtdb/buffer-pool)}
	 opts))

(defmethod ig/init-key ::metadata-manager [_ {:keys [cache-size ^IBufferPool buffer-pool], :or {cache-size 128} :as deps}]
  (let [chunks-metadata (load-chunks-metadata deps)
	table-metadata-cache (-> (Caffeine/newBuilder)
				 (.maximumSize cache-size)
				 (.removalListener (reify RemovalListener
						     (onRemoval [_ _path table-metadata _reason]
						       (util/close table-metadata))))
				 (.build))]
    (MetadataManager. buffer-pool
		      table-metadata-cache
		      chunks-metadata
		      (->> (vals chunks-metadata) (reduce merge-fields {})))))

5.2.2 util/->path

역시 변환만 하는 녀석이다.

(defn ->path ^Path [path-ish]
  (cond
    (instance? Path path-ish) path-ish
    (instance? File path-ish) (.toPath ^File path-ish)
    (uri? path-ish) (Paths/get ^URI path-ish)
    (string? path-ish) (let [uri (URI. path-ish)]
			 (if (.getScheme uri)
			   (Paths/get uri)
			   (Paths/get path-ish (make-array String 0))))
    :else ::s/invalid))

6 putObject

동작:

  1. 임시 경로 생성 (tmp-path)
  2. 버퍼를 임시 경로에 쓰기 (write-buffer-to-path)
  3. 최종 파일 경로 생성 (file-path)
  4. 필요시 부모 디렉토리 생성
  5. 임시 파일을 최종 위치로 원자적 이동

주요 특징:

  1. 임시 파일 사용으로 데이터 무결성 보장
  2. 원자적 파일 이동으로 동시성 문제 방지
(putObject [_ k buffer]
  (try
    (let [tmp-path (create-tmp-path disk-store)]
      (util/write-buffer-to-path buffer tmp-path)

      (let [file-path (.resolve disk-store k)]
	(util/create-parents file-path)
	(util/atomic-move tmp-path file-path)))
    (catch ClosedByInterruptException _
      (throw (InterruptedException.)))))

임시 파일을 먼저 생성하는 이유:

  1. 원자성 보장:
    • 전체 쓰기 작업이 완료될 때까지 기존 데이터 보존
    • 중간에 실패해도 기존 데이터 손상 없음
  2. 동시성 관리:
    • 다른 프로세스나 스레드의 간섭 방지
    • 읽기 작업과의 충돌 예방
  3. 데이터 무결성:
    • 쓰기 중 시스템 충돌 시에도 일관성 유지
    • 부분적으로 기록된 파일로 인한 문제 방지
  4. 성능 최적화:
    • 메모리에서 디스크로 한 번에 쓰기 가능
    • 파일 시스템 캐시 효율적 사용
  5. 롤백 용이성:
    • 오류 발생 시 임시 파일만 삭제하면 됨
    • 복잡한 복구 과정 불필요

이런 방식으로 안전하고 효율적인 파일 쓰기 구현.

6.1 atomic-move

목적: 파일의 원자적 이동

인자:

  • from-path: 원본 경로 (Path)
  • to-path: 목적지 경로 (Path)

동작:

  • Java NIO의 Files.move 메소드 사용
  • ATOMICMOVE 옵션 적용

특징:

  • 원자성 보장: 이동 작업이 완전히 성공하거나 실패
  • 중간 상태 없음: 부분적 이동 방지
  • 파일 시스템 레벨에서 지원 (가능한 경우)

주의: 파일 시스템이 원자적 이동을 지원하지 않으면 예외 발생 가능

(defn atomic-move [^Path from-path ^Path to-path]
  (Files/move from-path to-path (into-array CopyOption [StandardCopyOption/ATOMIC_MOVE]))
  to-path)

6.1.1 java.nio.file.StandardCopyOption

[공식문서]​

주요 StandardCopyOption 값들:

  • REPLACEEXISTING: 기존 파일 대체 (Replace an existing file if it exists.)
  • COPYATTRIBUTES: 파일 속성 복사 (Copy attributes to the new file.)
  • ATOMICMOVE: 원자적 이동 수행 (Move the file as an atomic file system operation.)

7 listAllObjects

디스크 저장소의 모든 객체 목록화

동작:

  1. Files.walk로 디렉토리 순회
  2. iterator-seq로 Path 객체 시퀀스 생성
  3. for 구문으로 각 Path 처리:
    1. relativize로 상대 경로 생성
    2. isRegularFile로 일반 파일 확인
    3. ".tmp"로 시작하지 않는 파일만 선택
  4. 결과를 정렬하고 벡터로 변환

리턴: 상대 경로의 정렬된 벡터

특징:

  • 재귀적 디렉토리 탐색
  • 임시 파일 제외
  • 상대 경로 사용
(listAllObjects [_]
  ;; FileVisitOption                   
  (util/with-open [dir-stream (Files/walk disk-store (make-array FileVisitOption 0))]
    (vec (sort (for [^Path path (iterator-seq (.iterator dir-stream))
		     :let [relativized (.relativize disk-store path)]
		     :when (and (Files/isRegularFile path (make-array LinkOption 0))
				(not (.startsWith relativized ".tmp")))]
		 relativized)))))

7.1 walk 함수

(make-array FileVisitOption 0) 의미가 궁금해서 공식문서를 찾아봄

varargs 로 넘기기 위해 FileVisitOption 타입의 빈배열(길이 0) 으로 넣어버림.

public static Stream<Path> walk(Path start,
				FileVisitOption... options)
			 throws IOException
Return a Stream that is lazily populated with Path by walking the file tree rooted at a given starting file.
The file tree is traversed depth-first, the elements in the stream are Path objects that are obtained as if by resolving the relative path against start.
This method works as if invoking it were equivalent to evaluating the expression:

 walk(start, Integer.MAX_VALUE, options)

In other words, it visits all levels of the file tree.
The returned stream encapsulates one or more DirectoryStreams. If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close method is invoked after the stream operations are completed. Operating on a closed stream will result in an IllegalStateException.

Parameters:
start - the starting file
options - options to configure the traversal

Returns:
the Stream of Path

Throws:
SecurityException - If the security manager denies access to the starting file. In the case of the default provider, the checkRead method is invoked to check read access to the directory.
IOException - if an I/O error is thrown when accessing the starting file.

Since:
1.8
See Also:
walk(Path, int, FileVisitOption...)

[java nio File Visitor]​

7.2 .relativize

이는 읽는 파일의 path를 루트 패스(disk-store) 기준으로 상대경로로 만드는 것이다. 아마 이것을 키 값으로 사용하는 듯?

XTDB의 listAllObjects에서 relativize 사용 예시:

가정:

  • disk-store 경로: "/data/xtdb"
  • 실제 파일 경로: "/data/xtdb/chunks/001.data"

코드:

(let [path (Paths/get "/data/xtdb/chunks/001.data")
      relativized (.relativize disk-store path)]
  relativized)

리턴 : relativized 값: "chunks/001.data"

설명:

  1. disk-store ("/data/xtdb")를 기준으로
  2. 전체 경로 ("/data/xtdb/chunks/001.data")의 상대 경로 계산
  3. 공통 부분 제거 후 나머지 경로 반환

목적:

  • 저장소 루트 기준 파일 위치 표현
  • 이동 가능한 상대 경로 유지
  • 저장소 구조 독립적인 파일 참조

7.3 isRegularFile

설명으로는 불투명한 파일이 있는 일반파일인지 확인한다?

아마도 불투하다는 것은 데이터가 저장되어 있는 데이터만 읽겠다는 것으로 보임.

public static boolean isRegularFile(Path path,
				    LinkOption... options)
Tests whether a file is a regular file with opaque content.

The options array may be used to indicate how symbolic links are handled for the case that the file is a symbolic link.
By default, symbolic links are followed and the file attribute of the final target of the link is read.
If the option NOFOLLOW_LINKS is present then symbolic links are not followed.

Where it is required to distinguish an I/O exception from the case that the file is not a regular file
then the file attributes can be read with the readAttributes method and the file type tested with the BasicFileAttributes.isRegularFile() method.

Parameters:
path - the path to the file
options - options indicating how symbolic links are handled

Returns:
true if the file is a regular file; false if the file does not exist, is not a regular file, or it cannot be determined if the file is a regular file or not.

Throws:
SecurityException - In the case of the default provider, and a security manager is installed, its checkRead method denies read access to the file.

[isRegularFile DOC]​

일단 느낌상 그렇다지만 스택오버플로우에 좀 더 정리된 내용이 있어서 기록함. (맞는 것인지는 모르겠음)

  1. 응답 1

    For example in UNIX, a regular file is one that is not special in some way.
    Special files include symbolic links and directories.
    A regular file is a sequence of bytes stored permanently in a file system.
    
    ---
    
    I figure rm -i is an alias, possibly rm -i. The "regular" part doesn't mean anything in particular,
    it only means that it's not a pipe, device, socket or anything other "special".
    
    it means the file is not a symlink, pipe, rand, null, cpu, etc. Perhaps you have heard the linux philosophy everything is a text.
    This isn't literally true, but it suggests a dominant operational context where string processing tools can be applied to filesystem elements directly.
    In this case, it means that in a more literal fashion. To see the detection step in isolation, try the command file, as in file /etc/passwd or file /dev/null.
    
    
  2. 응답 2

    From Files Reference - AIX IBM
    
    A file is a collection of data that can be read from or written to. A file can be a program you create, text you write, data you acquire, or a device you use. Commands, printers, terminals, and application programs are all stored in files. This allows users to access diverse elements of the system in a uniform way and gives the operating system great flexibility. No format is implied when a file is created.
    
    There are three types of files
    
    Regular - Stores data (text, binary, and executable).
    Directory - Contains information used to access other files.
    Special - Defines a FIFO (first-in, first-out) file or a physical device.
    Regular files are the most common. When a word processing program is used to create a document, both the program and the document are contained in regular files.
    
    Regular files contain either text or binary information. Text files are readable by the user.
    Binary files are readable by the computer. Binary files can be executable files that instruct the system to accomplish a job.
    Commands, shell scripts, and other programs are stored in executable files.
    
    Directories contain information the system needs to access all types of files, but they do not contain the actual file data.
    As a result, directories occupy less space than a regular file and give the file-system structure flexibility and depth.
    Each directory entry represents either a file or subdirectory and contains the name of a file and the file's i-node (index node reference) number.
    The i-node number represents the unique i-node that describes the location of the data associated with the file. Directories are created and controlled by a separate set of commands. See "Directories" in Operating system and device management for more information.
    
    Special files define devices for the system or temporary files created by processes.
    There are three basic types of special files: FIFO (first-in, first-out), block, and character.
    FIFO files are also called pipes. Pipes are created by one process to temporarily allow communication with another process.
    These files cease to exist when the first process finishes. Block and character files define devices.
    
    All this above is from the first link. I've checked in many other sources regarding Operational Systems differences and it seems this one is the most common definition on all sources i've found.
    

뭐 java nio 와 완벽하게 맞춰진 데이터는 아니지만 regular 파일이란, 일반적으로 데이터를 저장하는 용도의 파일을 말하는 듯?

[stack over flow link]​

8 listObjects

(listObjects [_ dir]
  (let [dir (.resolve disk-store dir)]
    (if (Files/exists dir (make-array LinkOption 0))
      (util/with-open [dir-stream (Files/newDirectoryStream dir)]
	(vec (sort (for [^Path path dir-stream]
		     (.relativize disk-store path)))))
      [])))

이 함수는 주어진 디렉토리 내의 파일 목록을 반환합니다.

디렉토리가 존재하는 경우

  • (Files/newDirectoryStream dir) : 디렉토리의 내용을 읽기 위한 스트림을 생성합니다.
  • (for [^Path path dir-stream] ...) : 디렉토리 내의 각 항목에 대해 반복합니다.
  • (.relativize disk-store path) : 각 경로를 disk-store에 대한 상대 경로로 변환합니다.
  • (vec (sort ...)) : 결과를 정렬하고 벡터로 변환합니다.

9 openArrowWriter

(openArrowWriter [_ k rel]
  (let [tmp-path (create-tmp-path disk-store)]
    (util/with-close-on-catch [file-ch (util/->file-channel tmp-path util/write-truncate-open-opts)
			       unl (.startUnload rel file-ch)]
      (reify ArrowWriter
	(writeBatch [_]
	  (try
	    (.writeBatch unl)
	    (catch ClosedByInterruptException e
	      (throw (InterruptedException.)))))

	(end [_]
	  (.endFile unl)
	  (.close file-ch)

	  (let [file-path (.resolve disk-store k)]
	    (util/create-parents file-path)
	    (util/atomic-move tmp-path file-path)))

	(close [_]
	  (util/close unl)
	  (when (.isOpen file-ch)
	    (.close file-ch)))))))

Arrow 포맷의 데이터를 디스크에 쓰는 역할

  • 임시 파일 생성: create-tmp-path로 임시 경로를 생성합니다.
  • 리소스 관리: util/with-close-on-catch로 파일 채널과 언로드 객체를 안전하게 관리합니다.
  • ArrowWriter 인터페이스 구현: 익명 객체로 ArrowWriter를 구현합니다.
  • writeBatch 메소드: 배치 데이터를 쓰며, 인터럽트 발생 시 예외를 던집니다.
  • end 메소드:
    • 언로드 작업을 종료합니다.
    • 파일 채널을 닫습니다.
    • 최종 파일 경로를 생성합니다.
    • 임시 파일을 최종 위치로 원자적으로 이동시킵니다.
  • close 메소드: 언로드 객체와 열린 파일 채널을 안전하게 닫습니다.

이 메소드는 데이터를 안전하게 디스크에 쓰고, 임시 파일 사용으로 데이터 일관성을 보장하며, 리소스를 적절히 관리합니다.

9.1 writeBatch

writeBatch 는 이전 xtdb-internals-1 에 이미 설명이 있다. 코드만 옮겨놓자. 채널을 받아서 그곳에 vectors 를 저장하는 듯.

이렇게 구성된 데이터는 Arrow 포맷으로 쉽게 직렬화되어 파일에 저장되거나 네트워크로 전송될 수 있습니다.

// Apache Arrow 파일 포맷에서 사용하는 매직 바이트 인듯?
private val MAGIC = "ARROW1".toByteArray()

inner class RelationUnloader(private val ch: WriteChannel) : AutoCloseable {

  private val vectors = this@Relation.vectors.values
  private val schema = Schema(vectors.map { it.field })
  private val recordBlocks = mutableListOf<ArrowBlock>()

  init {
    // 파일 시작에 매직 바이트 쓰기.
    ch.write(MAGIC)
    ch.align()
    MessageSerializer.serialize(ch, schema)
  }

  fun writeBatch() {
    // 각 필드(컬럼)의 메타데이터를 저장
    // 주로 각 필드의 유효한 데이터 개수(null이 아닌 값)를 포함합니다.
    // 데이터의 논리적 구조
    val nodes = mutableListOf<ArrowFieldNode>()
    // 실제 데이터 값을 저장합니다.
    // Arrow의 컬럼나 포맷에 따라 여러 버퍼가 사용될 수 있습니다.
    // 예를 들어, 가변 길이 데이터는 오프셋과 값 버퍼가 필요합니다.
    val buffers = mutableListOf<ArrowBuf>()

    vectors.forEach { it.unloadBatch(nodes, buffers) }

    ArrowRecordBatch(rowCount, nodes, buffers).use { recordBatch ->
	MessageSerializer.serialize(ch, recordBatch)  // 직렬화
	    .also { recordBlocks.add(it) }
    }
  }

  fun endStream() {
    ch.writeIntLittleEndian(MessageSerializer.IPC_CONTINUATION_TOKEN)
    ch.writeIntLittleEndian(0)
  }

  fun endFile() {
    endStream()

    val footerStart = ch.currentPosition
    ch.write(ArrowFooter(schema, emptyList(), recordBlocks), false)

    val footerLength = ch.currentPosition - footerStart
    check(footerLength > 0) { "Footer length must be positive" }
    ch.writeIntLittleEndian(footerLength.toInt())
    // 파일 마지막에 다시 매직 바이트 쓰기 
    ch.write(MAGIC)
  }

  override fun close() {
      ch.close()
  }
}

Date: 2024-08-17 Sat 00:00

Author: Younghwan Nam

Created: 2024-12-21 Sat 16:39

Emacs 27.2 (Org mode 9.4.4)

Validate