[20240817] XTDB Internals 3 - RemoteBufferPool

Table of Contents

이제 RemoteBufferPool, RemoteStorage 에 대해서 알아볼 것이다.

1 RemoteStorage

/**
 * Implementation for the storage module that persists data remotely within a specified [objectStore],
 * while maintaining a local cache of the working set cache under the [localDiskCache] directory.
 *
 * Any implementer of [ObjectStoreFactory] can be used as the [objectStore]. We currently offer:
 * * AWS S3 (under **xtdb-aws**)
 * * Azure Blob Storage (under **xtdb-azure**)
 * * Google Cloud Storage (under **xtdb-google-cloud**)
 * 
 * Example usage, as part of a node config:
 * ```kotlin
 * Xtdb.openNode {
 *    remoteStorage(
 *       objectStore = objStoreImpl(...) { ... },
 *       localDiskCache = Paths.get("test-path")
 *    ) {
 *       maxCacheEntries = 1024,
 *       maxCacheBytes = 536870912,
 *       maxDiskCachePercentage = 75,
 *       maxDiskCacheBytes = 10737418240
 *    },
 *    ...
 * }
 * ```
 *
 * @property objectStore configuration of the object store to use for remote storage.
 * @property localDiskCache local directory to store the working-set cache in.
 */
@Serializable
@SerialName("!Remote")
data class RemoteStorageFactory(
    val objectStore: ObjectStoreFactory,
    val localDiskCache: Path,
    var maxCacheEntries: Long = 1024,
    var maxCacheBytes: Long = 536870912,
    var maxDiskCachePercentage: Long = 75,
    var maxDiskCacheBytes: Long? = null
) : Factory {

    fun maxCacheEntries(maxCacheEntries: Long) = apply { this.maxCacheEntries = maxCacheEntries }
    fun maxCacheBytes(maxCacheBytes: Long) = apply { this.maxCacheBytes = maxCacheBytes }
    fun maxDiskCachePercentage(maxDiskCachePercentage: Long) = apply { this.maxDiskCachePercentage = maxDiskCachePercentage }
    fun maxDiskCacheBytes(maxDiskCacheBytes: Long) = apply { this.maxDiskCacheBytes = maxDiskCacheBytes }

    override fun openStorage(allocator: BufferAllocator) =
	requiringResolve("xtdb.buffer-pool/open-remote-storage").invoke(allocator, this) as IBufferPool
}

@JvmStatic
fun remoteStorage(objectStore: ObjectStoreFactory, localDiskCachePath: Path) =
    RemoteStorageFactory(objectStore, localDiskCachePath)

@JvmSynthetic
fun Xtdb.Config.remoteStorage(
    objectStore: ObjectStoreFactory,
    localDiskCachePath: Path,
    configure: RemoteStorageFactory.() -> Unit,
) = storage(RemoteStorageFactory(objectStore, localDiskCachePath).also(configure))  

RemoteStorage는 XTDB(Extensible Transaction Database)의 데이터 원격 저장 및 관리를 위한 구현체이다. 주요 특징은 다음과 같다:

  1. 원격 객체 저장소를 사용하여 데이터를 저장한다.
  2. 작업 세트의 로컬 캐시를 유지한다.
  3. AWS S3, Azure Blob Storage, Google Cloud Storage 등 다양한 객체 저장소를 지원한다.
  4. 캐시 크기, 디스크 캐시 퍼센티지 등 다양한 매개변수 조정이 가능하다.

RemoteStorageFactory 클래스의 주요 속성은 다음과 같다:

  1. objectStore: 원격 저장소 구현체
  2. localDiskCache: 로컬 캐시 디렉토리 경로
  3. maxCacheEntries: 최대 캐시 항목 수 (기본값: 1024)
  4. maxCacheBytes: 최대 캐시 크기 (바이트, 기본값: 536,870,912)
  5. maxDiskCachePercentage: 최대 디스크 캐시 비율 (기본값: 75%)
  6. maxDiskCacheBytes: 최대 디스크 캐시 크기 (바이트, 기본값: null)

알고 싶은 것이 여러개 있지만 일단, 이것을 실행하는 코드를 보자.

2 xtn/apply-config!

integrant 설정같은 듯?

(defmethod xtn/apply-config! ::remote [^Xtdb$Config config _ {:keys [object-store local-disk-cache max-cache-bytes max-cache-entries max-disk-cache-bytes max-disk-cache-percentage]}]
  (.storage config (cond-> (Storage/remoteStorage (let [[tag opts] object-store]  ;; Storage/remoteStorage를 호출하여 기본 원격 저장소 설정을 생성
						    (->object-store-factory tag opts))  ;; 객체 저장소 팩토리로 변환
						  (util/->path local-disk-cache))  ;; local-disk-cache를 path로 변환
		     max-cache-bytes (.maxCacheBytes max-cache-bytes)
		     max-cache-entries (.maxCacheEntries max-cache-entries)
		     max-disk-cache-bytes (.maxDiskCacheBytes max-disk-cache-bytes)
		     max-disk-cache-percentage (.maxDiskCachePercentage max-disk-cache-percentage))))

구성 옵션:

  • object-store: 원격 객체 저장소 설정
  • local-disk-cache: 로컬 디스크 캐시 경로
  • max-cache-bytes: 최대 캐시 크기 (바이트)
  • max-cache-entries: 최대 캐시 항목 수
  • max-disk-cache-bytes: 최대 디스크 캐시 크기 (바이트)
  • max-disk-cache-percentage: 최대 디스크 캐시 비율

2.1 Configuration 방법

[공식 문서]​

A persistent storage implementation that:

  • Persists data remotely to a provided object store.
  • Maintains an local-disk cache and in-memory cache of the working set.
storage: !Remote
  # -- required

  # Configuration of the Object Store to use for remote storage
  # Each of these is configured separately - see below for more information.
  objectStore: <ObjectStoreImplementation>

  # Local directory to store the working-set cache in.
  # (Can be set as an !Env value)
  localDiskCache: /var/lib/xtdb/remote-cache

  ## -- optional

  # The maximum number of entries to store in the in-memory cache.
  # maxCacheEntries: 1024

  # The maximum number of bytes to store in the in-memory cache.
  # maxCacheBytes: 536870912

  # The max percentage of space to use on the filesystem for the localDiskCache directory (overriden by maxDiskCacheBytes, if set).
  # maxDiskCachePercentage: 75

  # The upper limit of bytes that can be stored within the localDiskCache directory (unset by default).
  # maxDiskCacheBytes: 107374182400

여기서 내가 좀 더 파보고 싶은 것은

  • objectStore 의 구현체들
  • localDiskCache 구현체

2.1.1 Configuration 에서 사용할 objectStore S3 구현체

[xtdb s3 사용 설명서]​

S3 모듈은 SNS 토픽과 SQS 대기열을 사용하여 S3에 있는 파일 목록의 로컬 복사본을 유지하므로 S3에 개체를 나열하는 데 드는 비용과 시간이 절약됩니다.
CloudFormation 스택을 사용하지 않는 경우, 버킷에서 알림을 받도록 SNS 토픽 설정과 유사한 설정을 하고, XTDB가 S3, SNS 및 SQS에 필요한 모든 관련 권한을 가지고 있는지 확인합니다.

뭐 그렇다고 한다. 그래서 인프라 설정에는 다음과 같은 요구사항이 있다.

  1. S3 버킷
  2. 개체 생성/삭제에 대한 S3 버킷의 알림을 받기 위한 SNS 토픽
  3. 버킷에 SNS 토픽에 게시할 수 있는 권한이 부여되어야 한다:

    SNSTopicPolicy:
      Type: AWS::SNS::TopicPolicy
      Properties:
        PolicyDocument:
          Statement:
    	- Effect: Allow
    	  Principal:
    	    Service: 's3.amazonaws.com'
    	  Action: sns:Publish
    	  Resource: !Ref SNSTopic
    	  Condition:
    	    ArnEquals:
    	      aws:SourceArn: !Ref S3BucketArn
    	    StringEquals:
    	      aws:SourceAccount: !Ref 'AWS::AccountId'
        Topics:
          - !Ref SNSTopic
    
  4. XTDB에 권한을 부여하는 IAM 정책:
    • S3 버킷
    • SNS 토픽 구독
    • SQS 큐 생성 및 운영

설정방법은 아래와 같다.

storage: !Remote
  objectStore: !S3
    # -- required

    # The name of the S3 bucket to use for the object store
    # (Can be set as an !Env value)
    bucket: "my-s3-bucket"

    # The ARN of the SNS topic which is collecting notifications from the S3 bucket.
    # (Can be set as an !Env value)
    snsTopicArn: "arn:aws:sns:region:account-id:my-sns-topic"

    # -- optional
    # A file path to prefix all of your files with
    # - for example, if "foo" is provided, all XTDB files will be located under a "foo" sub-directory
    # (Can be set as an !Env value)
    # prefix: my-xtdb-node

  localDiskCache: /var/cache/xtdb/object-store

2.1.2 S3 objectStore 구현체 찾아보기

[S3.kt]​

뭐 별거는 없다.

버킷 이름과 SNS 토픽 ARN을 필수로 받는다.

Registration 클래스를 통해 XTDB 모듈 시스템에 통합된다.

@file:UseSerializers(PathWithEnvVarSerde::class, StringWithEnvVarSerde::class)

package xtdb.aws

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlinx.serialization.UseSerializers
import xtdb.api.PathWithEnvVarSerde
import xtdb.api.StringWithEnvVarSerde
import xtdb.api.module.XtdbModule
import xtdb.api.storage.ObjectStore
import xtdb.api.storage.ObjectStoreFactory
import xtdb.aws.s3.DefaultS3Configurator
import xtdb.aws.s3.S3Configurator
import xtdb.util.requiringResolve
import java.nio.file.Path

/**
 * Used to set configuration options for an S3 Object Store, which can be used as implementation of objectStore within a [xtdb.api.storage.Storage.RemoteStorageFactory].
 *
 * Requires at least [bucket][S3.Factory.bucket] and an [snsTopicArn][S3.Factory.snsTopicArn] to be provided - these will need to be accessible to whichever authentication credentials you use.
 * Authentication is handled via the Default AWS Credential Provider Chain.
 * See the [AWS documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) on the various methods which you can handle authentication to be able to make use of the operations inside the modules.
 *
 * For more info on setting up the necessary infrastructure on AWS to be able to use S3 as an XTDB object store, see the section on infrastructure & setting up the AWS Cloudformation Stack within our [S3 Module Reference](https://docs.xtdb.com/config/storage/s3.html).
 *
 * Example usage, as part of a node config:
 * ```kotlin
 * Xtdb.openNode {
 *    remoteStorage(
 *       objectStore = s3(bucket = "xtdb-bucket", snsTopicArn = "example-arn") {
 *           prefix = Path.of("my/custom/prefix")
 *       },
 *       localDiskCache = Paths.get("test-path")
 *    ),
 *    ...
 * }
 * ```
 */
object S3 {
    @JvmStatic
    fun s3(bucket: String, snsTopicArn: String) = Factory(bucket, snsTopicArn)

    /**
     * Used to set configuration options for an S3 Object Store, which can be used as implementation of objectStore within a [xtdb.api.storage.Storage.RemoteStorageFactory].
     *
     * Requires at least [bucket] and an [snsTopicArn] to be provided - these will need to be accessible to whichever authentication credentials you use.
     * Authentication is handled via the Default AWS Credential Provider Chain.
     * See the [AWS documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) on the various methods which you can handle authentication to be able to make use of the operations inside the modules.
     *
     * For more info on setting up the necessary infrastructure on AWS to be able to use S3 as an XTDB object store, see the section on infrastructure & setting up the AWS Cloudformation Stack within our [S3 Module Reference](https://docs.xtdb.com/config/storage/s3.html).
     *
     * Example usage, as part of a node config:
     * ```kotlin
     * Xtdb.openNode {
     *    remoteStorage(
     *       objectStore = s3(bucket = "xtdb-bucket", snsTopicArn = "example-arn") {
     *           prefix = Path.of("my/custom/prefix")
     *       },
     *       localDiskCache = Paths.get("test-path")
     *    ),
     *    ...
     * }
     * ```
     * @param bucket The name of the [S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingBucket.html) to be used as an object store
     * @param snsTopicArn The [ARN](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html) of the [SNS topic](https://aws.amazon.com/sns/) which is collecting notifications from the S3 bucket you are using.
     */
    @JvmSynthetic
    fun s3(bucket: String, snsTopicArn: String, configure: Factory.() -> Unit = {}) =
	s3(bucket, snsTopicArn).also(configure)

    @Serializable
    @SerialName("!S3")
    data class Factory(
	@Serializable(StringWithEnvVarSerde::class) val bucket: String,
	@Serializable(StringWithEnvVarSerde::class) val snsTopicArn: String,
	@Serializable(PathWithEnvVarSerde::class) var prefix: Path? = null,
	@Transient var s3Configurator: S3Configurator = DefaultS3Configurator,
    ) : ObjectStoreFactory {

	fun prefix(prefix: Path) = apply { this.prefix = prefix }

	/**
	 * @param s3Configurator An optional [xtdb.aws.s3.S3Configurator] instance with extra S3 configuration options to be used by the object store.
	 */
	fun s3Configurator(s3Configurator: S3Configurator) = apply { this.s3Configurator = s3Configurator }

	override fun openObjectStore() = requiringResolve("xtdb.aws.s3/open-object-store")(this) as ObjectStore
    }

    /**
     * @suppress
     */
    class Registration : XtdbModule.Registration {
	override fun register(registry: XtdbModule.Registry) {
	    registry.registerObjectStore(Factory::class)
	}
    }
}

이 코드를 사용하는 곳은?

2.1.3 s3.clj

[여기인듯 하다.]​

여기에 S3ObjectStore 라는 record가 있고, ObjectStore 인터페이스를 구현하고 있다.

  1. ObjectStore interface

    네, 이 Kotlin 코드에 대해 설명해 드리겠습니다.

    이 코드는 `xtdb.api.storage` 패키지에 있는 객체 저장소(Object Store)를 위한 인터페이스를 정의하고 있습니다. 주요 내용은 다음과 같습니다:

    1. `ObjectStore` 인터페이스:
      • `AutoCloseable`을 상속받아 자원 관리를 용이하게 합니다.
      • 객체 저장소의 기본적인 작업들을 비동기 메서드로 정의합니다.
    2. 주요 메서드:
      • `getObject`: 객체를 가져오는 메서드 (ByteBuffer 반환 또는 파일로 저장)
      • `getObjectRange`: 객체의 특정 범위만 가져오는 메서드
      • `putObject`: 객체를 저장하는 메서드
      • `listAllObjects`: 모든 객체를 재귀적으로 나열하는 메서드
      • `listObjects`: 특정 디렉토리 내의 객체들을 나열하는 메서드
      • `deleteObject`: 객체를 삭제하는 메서드
    3. 비동기 처리:
      • 대부분의 메서드가 `CompletableFuture`를 반환하여 비동기 작업을 지원합니다.
    4. 예외 처리:
      • 객체가 존재하지 않을 경우 `IllegalStateException`을 발생시킵니다.
    5. `Path` 사용:
      • 객체의 식별자로 `java.nio.file.Path`를 사용합니다.
    6. `ObjectStoreFactory` 인터페이스:
      • `ObjectStore` 인스턴스를 생성하는 팩토리 메서드를 정의

    이 인터페이스는 XTDB에서 사용되는 객체 저장소의 추상화를 제공 이를 통해 다양한 백엔드(예: 파일 시스템, 클라우드 스토리지 등)에 대한 일관된 인터페이스를 제공할 수 있으며, 비동기 작업을 통해 성능을 최적화할 수 있음. 또한 `ObjectStoreFactory`를 통해 객체 저장소의 생성을 추상화하여 의존성 주입 등의 패턴을 쉽게 적용할 수 있게 함.

    package xtdb.api.storage
    
    import java.nio.ByteBuffer
    import java.nio.file.Path
    import java.util.concurrent.CompletableFuture
    
    interface ObjectStore : AutoCloseable {
        /**
         * Asynchronously returns the given object in a ByteBuffer.
         *
         * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
         */
        fun getObject(k: Path): CompletableFuture<ByteBuffer>
    
        /**
         * Asynchronously returns a specified range of bytes from the object in a ByteBuffer.
         *
         * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
         */
        fun getObjectRange(k: Path, start: Long, len: Long): CompletableFuture<ByteBuffer>
    
        /**
         * Asynchronously writes the object to the given path.
         *
         * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
         */
        fun getObject(k: Path, outPath: Path): CompletableFuture<Path>
    
        /**
         * Stores an object in the object store.
         */
        fun putObject(k: Path, buf: ByteBuffer): CompletableFuture<*>
    
        /**
         * Recursively lists all objects in the object store.
         *
         * Objects are returned in lexicographic order of their path names.
         */
        fun listAllObjects(): Iterable<Path>
    
        /**
         * Lists objects directly within the specified directory in the object store.
         *
         * Objects are returned in lexicographic order of their path names.
         */
        fun listObjects(dir: Path): Iterable<Path>
    
        /**
         * Deletes the object with the given path from the object store.
         */
        fun deleteObject(k: Path): CompletableFuture<*>
    
        override fun close() {
        }
    }
    
    interface ObjectStoreFactory {
        fun openObjectStore(): ObjectStore
    }  
    

2.2 localDiskCache : Path

  1. Local Disk Cache:
    • 목적 : localDiskCache 는 Path 타입이다. 이름처럼 원격저장소(예:S3)에서 자주 접근하는 데이터를 로컬 디스크에 캐싱하여 성능을 향상시킨다.
    • 작동 방식:
      • 원격 저장소에서 데이터를 가져올 때, 로컬 디스크에 복사본을 저장ㅎ나다.
      • 이후 같은 데이터에 접근할 때, 원격 저장소 대신 로컬 캐시에서 빠르게 읽어온다.
    • 장점 :
      • 네트워크 지연 감소
      • 원격 저장소 접근 비용 절감
      • 전반적인 시스템 응답 시간 개선
  2. Local Disk Cache Evictor:
    • 목적 : 로컬 디스크 캐시의 크기를 관리하고 오래된 또는 덜 사용되는 데이터를 제거한다.
    • 작동 방식:
      • 주기적으로 또는 특정 조건에 따라 캐시를 검사
      • 미리 정의된 정책(LRU, LFU)에 따라 제거할 항목을 선택한다.
      • 선택된 항목을 캐시에서 삭제하여 공간을 확보.
    • 중요성:
      • 디스크 공간 관리
      • 캐시 효율성 유지
      • 최신 데이터 유지 보장

아래 open-remote-storage에서 관련 변수를 직접적으로 사용한다.

3 open-remote-storage

[source code]​

(defn open-remote-storage ^xtdb.IBufferPool [^BufferAllocator allocator, ^Storage$RemoteStorageFactory factory]
  (util/with-close-on-catch [object-store (.openObjectStore (.getObjectStore factory))]
    (let [^Path local-disk-cache (.getLocalDiskCache factory)
	  local-disk-size-limit (or (.getMaxDiskCacheBytes factory)
				    (calculate-limit-from-percentage-of-disk local-disk-cache (.getMaxDiskCachePercentage factory)))
	  ^AsyncCache local-disk-cache-evictor (->local-disk-cache-evictor local-disk-size-limit local-disk-cache)
	  !evictor-max-weight (atom local-disk-size-limit)]

      ;; Add watcher to max-weight atom - when it changes, update the cache max weight
      (add-watch !evictor-max-weight :update-cache-max-weight
		 (fn [_ _ _ ^long new-size]
		   (let [sync-cache (.synchronous local-disk-cache-evictor)
			 ^Policy$Eviction eviction-policy (.get (.eviction (.policy sync-cache)))]
		     (.setMaximum eviction-policy (max 0 new-size)))))


      (->RemoteBufferPool (.newChildAllocator allocator "buffer-pool" 0 Long/MAX_VALUE)
			  (->memory-buffer-cache (.getMaxCacheBytes factory))
			  local-disk-cache
			  local-disk-cache-evictor
			  object-store
			  !evictor-max-weight))))

매개변수:

  • allocator: BufferAllocator 타입으로, 메모리 할당을 관리
  • factory: Storage$RemoteStorageFactory 타입으로, 원격 저장소 구성 정보를 포함

3.1 local-disk-cache-evictor : AsyncCache

이걸 만드는 함수는 ->local-disk-cache-evictor 다.

로컬 디스크 캐시를 만들고 관리하는 함수를 정의한다.

  1. 함수 정의 : 캐시크기(size), 로컬 디스크 캐시 경로(local-disk-cache)를 인자로 받는다.
  2. 캐시 생성 : Caffeine 라이브러리를 사용하여 비동기 캐시를 생성.
  3. 캐시 설정 :
    • 최대 무게(크기)를 설정한다.
    • Weigher를 구현하여 각 항목의 무게를 결정한다. 고정된(pinned) 항목은 무게가 0, 그렇지 않으면 파일의 크기가 무게가 된다.
    • RemovalListener를 구현하여 캐시에서 항목이 제거될 때 로컬 디스크에서도 파일을 삭제한다.
  4. 로컬 디스크 캐시 로딩 :
    • 로컬 디스크 캐시 디렉토리의 모든 파일을 읽는다.
    • 각 파일의 정보(경로, 마지막 접근 시간)을 수집한다.
    • 파일들을 마지막 접근 시간 순으로 정렬한다.
  5. 캐시 초기화 :
    • 정렬된 파일 목록을 순회하여 각 파일을 캐시에 추가한다.
    • 각 파일은 고정되지 않은 상태로, 파일 크기 정보와 함께 캐시에 저장된다.
  6. 리턴 : 생성된 비동기 캐시(AsyncCache)를 반환한다.
    • AsyncCache는 Caffeine 라이브러리에서 제공하는 비동기 캐시 구현체이다. 주요특징은 다음과 같다.
      • 비동기 연산 지원: 캐시 미스 시 비동기적으로 값을 로드할 수 있다. 이는 CompletableFuture를 통해 구현한다.
      • 논블로킹 동작: 캐시 조회 및 갱신 작업이 메인 스레드를 차단하지 않는다.
      • 높은 동시성: 여러 스레드에서 동시에 안전하게 접근할 수 있다.
      • 유연한 설정: 최대 크기, 만료 시간, 제거 리스너 등 다양한 옵션 제공
      • 통계 제공: 히트율, 미스율 등의 성능 지표를 확인할 수 있다.
(defn ->local-disk-cache-evictor ^com.github.benmanes.caffeine.cache.AsyncCache [size ^Path local-disk-cache]
  (log/debugf "Creating local-disk-cache with size limit of %s bytes" size)
  (let [cache (-> (Caffeine/newBuilder)
		  (.maximumWeight size)
		  (.weigher (reify Weigher
			      (weigh [_ _k {:keys [pinned? file-size]}]
				(if pinned? 0 file-size))))
		  (.evictionListener (reify RemovalListener
				       (onRemoval [_ k {:keys [file-size]} _]  ;; Caffeine 캐시가 사라지면 로컬 디스크에서도 파일을 제거함.
					 (log/debugf "Removing file %s from local-disk-cache - exceeded size limit (freed %s bytes)" k file-size)
					 (util/delete-file (.resolve local-disk-cache ^Path k)))))
		  (.buildAsync))
	synced-cache (.synchronous cache)]

    ;; Load local disk cache into cache, 로컬 디스크 캐시 로딩
    (let [files (filter #(.isFile ^File %) (file-seq (.toFile local-disk-cache)))
	  file-infos (map (fn [^File file]
			    (let [^BasicFileAttributes attrs (Files/readAttributes (.toPath file) BasicFileAttributes  ^"[Ljava.nio.file.LinkOption;" (make-array LinkOption 0))
				  last-accessed-ms (.toMillis (.lastAccessTime attrs))
				  last-modified-ms (.toMillis (.lastModifiedTime attrs))]
			      {:file-path (util/->path file)
			       :last-access-time (max last-accessed-ms last-modified-ms)}))
			  files)
	  ordered-files (sort-by #(:last-access-time %) file-infos)]
      (doseq [{:keys [file-path]} ordered-files]
	(let [k (.relativize local-disk-cache file-path)]
	  (.put synced-cache k {:pinned? false
				:file-size (util/size-on-disk file-path)}))))
    cache))

특이한 점은 메모리 캐시(Caffeine) 과 로컬 디스크 캐시와 동기되어 있다는 것이다.

그렇다면 캐시 레이어를 위한 것이 아닌 것이다.

그렇다면 나의 예상으로는 로컬 디스크 캐시가 존재하는 이유는 새로 어플이 재시작 하거나 할 때, 빠른 로딩을 위한 것이 주요 이유중 하나가 아닐까 싶다.

해서 Load local disk cache into cache 부분 코드를 좀 더 자세히 보자.

  1. 파일 목록 생성: (filter #(.isFile ^ File %) (file-seq (.toFile local-disk-cache)))
    • local-disk-cache 디렉토리의 모든 파일을 찾는다.
    • 디렉토리는 제외하고 파일만 선택한다.
  2. 파일 정보 수집:
    • 각 파일에 대해 다음 정보를 수집한다.
      • 파일 경로
      • 마지막 접근 시간(마지막 접근 시간과 마지막 수정 시간 중 더 최근 값)
    • Files/readAttributes 를 사용하여 파일 속성을 읽는다.
  3. 파일 정렬: (sort-by #(:last-access-time %) file-infos)
    • 수집된 파일 정보를 마지막 접근 시간을 기준으로 정렬한다.
  4. 캐시의 파일 정보 추가:
    • 정렬된 파일 목록을 순회하여 각 파일을 캐시에 추가
    • 캐시 키는 local-disk-cache 를 기준으로 한 상대 경로
    • 캐시 값은 다음 정보를 포함한다:
      • pinned? : false (고정되지 않음)
      • file-size : 디스크상의 파일 크기

이 구현의 중요한 특징들:

  1. 초기 상태 복원 : 시스템 재시작 시 이전 상태를 빠르게 복원
  2. 접근 시간 기반 정렬 : 가장 최근에 사용된 파일부터 캐시에 로드: LRU(Least Recently Used)캐싱 전략과 유사
  3. 메모리-디스크 동기화
  4. 유연한 캐시 관리 : pinned? 플래그를 통해 일부 항목을 고정할 수 있는 가능성을 제공
  5. 크기 기반 관리 : file-size 를 통해 캐시의 전체 크기를 관리할 수 있음.

3.1.1 add-watch !evictor-max-weight :update-cache-max-weight

이 코드가 아주 중요하다. 이 코드로 caffeine 의 AsyncCache의 캐시최대값을 가져오고, 방출까지 하는 것이다.

(add-watch !evictor-max-weight :update-cache-max-weight
	   (fn [_ _ _ ^long new-size]
	     (let [sync-cache (.synchronous local-disk-cache-evictor)
		   ^Policy$Eviction eviction-policy (.get (.eviction (.policy sync-cache)))]
	       (.setMaximum eviction-policy (max 0 new-size)))))
  1. add-watch 함수
    • add-watch 는 atom 값의 변경을 감시한다.
    • 여기서는 !evictor-max-weight atom 을 감시한다.
  2. 감시 키
    • :update-cache-max-weight 는 이 감시자를 식별하는 키(그냥 이 이벤트를 식별하기 위한 녀석)
  3. 감시 함수
    • (fn [_ _ _ ^long new-size] ...) 이 부분은 atom의 값이 변경될 때 실행될 함수
    • new-size 는 atom의 새로운 값을 나타냄.
  4. 함수 내부 로직
    • local-disk-cache-evictor 의 동기 버전을 가져온다. AsyncCache 값을 동기화해서 Cache 값을 가져온다.
    • 해당 캐시의 제거 정책(eviction policy)을 가져온다.
    • 제거 정책의 최대값을 새로운 크기로 설정한다. (단, 음수가 되지 않도록 0과의 최대값을 사용)
    • 이렇게 바꾸면, 실제 local-disk-cache-evictor 의 캐시 최대값을 조절하는 것이고, 이것이 0이 되면 아마도 캐시방출이 될 것 같다.

이 코드의 목적은 !evictor-max-weight atom 값이 변경될 때마다 캐시의 최대 크기를 자동으로 업데이트하는 것. 이렇게 하면 런타임에 캐시 크기를 동적으로 조절할 수 있다.

그렇다면 어딘가에서 이 녀석을 swap! 같은 함수로 수정을 해야 메모리 캐시를 의미있게 사용할 것이다.

드디어 open-remote-storage 에서 리턴하는 값

(->RemoteBufferPool (.newChildAllocator allocator "buffer-pool" 0 Long/MAX_VALUE)
		    (->memory-buffer-cache (.getMaxCacheBytes factory))
		    local-disk-cache
		    local-disk-cache-evictor
		    object-store
		    !evictor-max-weight))))

RemoteBufferPool 에 대해서 알아보자.

3.1.2 ->memory-buffer-cache : Cache

이 함수는 캐시를 리턴해서 바로 RemoteBufferPool 에서 사용하는 메모리 캐시다.

(defn ->memory-buffer-cache ^Cache [^long max-cache-bytes]
  (-> (Caffeine/newBuilder)
      (.maximumWeight max-cache-bytes)
      (.weigher (reify Weigher
		  (weigh [_ _k v]
		    (.capacity ^ArrowBuf v))))
      (.removalListener (reify RemovalListener
			  (onRemoval [_ _k v _]
			    (util/close v))))
      (.build)))

여기서 removalListener 가 중요해보인다.

Caffeine 에게 removal 이란? [Caffeine/wiki/Removal]​

Terminology:

  • eviction means removal due to the policy
  • invalidation means manual removal by the caller
  • removal occurs as a consequence of invalidation or eviction

즉, removal 은 invalidation 혹은 eviction 의 결과로 일어나는 것.

removalListener 는 어떤 정책으로 Caffeine 에서 캐시를 제거하면 리스너가 수행되는 것같다.

이때 실행되는 것이 util/close 이다.

  1. util/close
    (defn close [c]
      (letfn [(close-all [closables]
    	    (let [{:keys [toplevel-ex]}
    
    		  (reduce (fn [{:keys [^Throwable toplevel-ex] :as res} closable]
    			    (try
    			      (close closable)
    			      res
    			      (catch Throwable t
    				(if toplevel-ex
    				  (do (.addSuppressed toplevel-ex t) res)
    				  {:toplevel-ex t}))))
    			  {:toplevel-ex nil}
    			  closables)]
    	      (when toplevel-ex
    		(throw toplevel-ex))))]
        (cond
          (nil? c) nil
          (instance? AutoCloseable c) (.close ^AutoCloseable c)
          (instance? Map c) (close-all (.values ^Map c))
          (seqable? c) (close-all c)
          :else (throw (ClassCastException. (format "could not close '%s'" (.getName (class c))))))))
    

    재귀적으로 무언가를 하지만 결국 중요한 것은 cond 에 있는 (instance? AutoCloseable c) (.close ^AutoCloseable c) 이다.

    그렇다면 이 값이 AutoCloseable 을 구현한 구현체를 구경해야한다.

    하지만 우리가 보는 것은 Caffeine 의 값이다. Caffeine 의 v 값은 ArrowBuf 이다. 우리는 ArrowBuf에 대해 이미 이전에 공부를 했다. 하지만 이제 우리는 도메인에 좀 더 알았으니 다시 좀 더 알아보자.

  2. ArrowBuf's AutoCloseable

    ArrowBuf 는 close 메소드를 구현하고 있다.

    @Override void close() {
        referenceManager.release();
    }
    

    이 referenceManager.release 는 뭘까?

4 RemoteBufferPool

(defrecord RemoteBufferPool [allocator
			     ^Cache memory-store
			     ^Path local-disk-cache
			     ^AsyncCache local-disk-cache-evictor
			     ^ObjectStore object-store
			     !evictor-max-weight]
  IBufferPool

  ,,,
  )

5 RemoteBufferPool/getBuffer

(getBuffer [_ k]
    (when k
      (or (cache-get memory-store k)
	  (let [buffer-cache-path (.resolve local-disk-cache k)
		{:keys [file-size ctx]} @(update-evictor-key local-disk-cache-evictor k
							     (fn [^CompletableFuture fut]
							       (-> (if (util/path-exists buffer-cache-path) ;; 버퍼가 로컬 디스크 캐시에 존재하는지 확인
								     ;; 로컬 캐시에 존재하는 경우
								     (-> (or fut (CompletableFuture/completedFuture {:pinned? false}))  ;; 기존 CompletableFuture 사용하던가 새로 생성
									 (.thenApply (fn [{:keys [pinned?]}]
										       (log/tracef "Key %s found in local-disk-cache - returning buffer" k)  ;; 로그를 남기고 버퍼 경로와 이전 고정 상태를 리턴.
										       {:path buffer-cache-path :previously-pinned? pinned?})))
								     ;; 로컬 캐시에 존재하지 않는 경우.
								     ;; 필요한 디렉토리를 생성 
								     (do (util/create-parents buffer-cache-path)
									 (log/debugf "Key %s not found in local-disk-cache, loading from object store" k)
									 ;; 객체 저장소에서 객체를 로드
									 (-> (.getObject object-store k buffer-cache-path)
									     ;; 로드된 path와 previously-pinned? 를 false로 리턴한다.
									     (.thenApply (fn [path]
											   {:path path :previously-pinned? false})))))
								   ;; 이후 공통 로직
								   (.thenApply (fn [{:keys [path previously-pinned?]}]
										 (let [file-size (util/size-on-disk path)  ;; 파일크기 계산
										       nio-buffer (util/->mmap-path path)  ;; 메모리 맵 버퍼 계산
										       buffer-release-fn (fn []  ;; 버퍼 해제 함수 정의, 버퍼가 해제될 때 호출됨.
													   (update-evictor-key local-disk-cache-evictor k
													     (fn [^CompletableFuture fut]
													       (some-> fut
														       (.thenApply (fn [{:keys [pinned? file-size]}]
																     ;; buffer 를 release 할때는 !evictor-max-weight의 크기를 키운다.
																     ;; pinned인 경우만 빼기를 했을테니까 원래대로 되돌리는 것.(덧셈으로)
																     (when pinned?
																       (swap! !evictor-max-weight + file-size))
																     ;; 고정 상태를 해제하고 필요한 경우 전체 무게를 조정함.
																     {:pinned? false :file-size file-size}))))))
										       ;; ArrowBuf 생성
										       create-arrow-buf #(util/->arrow-buf-view allocator nio-buffer buffer-release-fn)
										       ;; 메모리 캐시에 저장.
										       buf (cache-compute memory-store k create-arrow-buf)]
										   ;; 해당 버퍼를 pinned? true 로 변경, 파일 크기, 버퍼, previously-pinned? 정보 포함.
										   {:pinned? true :file-size file-size :ctx {:buf buf :previously-pinned? previously-pinned?}}))))))
		{:keys [previously-pinned? buf]} ctx]

	    (when-not previously-pinned?
	      (swap! !evictor-max-weight - file-size))
	    buf))))
  1. 키 존재 여부 검사
  2. 메모리 캐시 확인 (cache-get ,,,)
  3. 로컬 디스크 캐시 처리 : 만약 매모리에 없다면, 로컬 디스크 캐시를 확인하고 필요한 경우 객체 저장소에 로드한다. (.resolve local-disk-cache k)

pinned란?

  • pinned 상태랑 캐시에서 고정되었다는 의미로 캐시에서 강제로 유지된다. 일반적인 제거 정책에 의해 제거되지 않기도함.
  • 중요하거나 자주 사용되는 데이터는 캐시로 유지한다. 특정 데이터의 빠른 접근을 보장한다.
  • pinned 항목은 캐시의 크기 제한에 포함되지 않을 수 있다. 캐시 전체 크기를 증가시킬 수 있다.
  • 사용사례 : 빈번하게 접근되는 데이터베이스 레코드, 애플리케이션 구성정보, 실시간 처리가 필요한 데이터

근데 이 pinned가 위에서도 동일한 의미인지는 모르겠음.

왜냐하면 마지막에 공통적으로 pinned? true 값을 리턴함.

하지만 여기서 pinned 는 고정할 가치가 있는 것을 말하는 것 같다. 그래서 getBuffer 는 하나의 값(Path) 를 가져온 것이므로, 이에 대한 히트로 pinned true 의 플래그를 넣는 것 같다.

previously-pinned!evictor-max-weight 를 다루는데 중요한 값이 된다. 이전에 조회를 했으면 (pinned=true) max-weight 를 변경하지 않고, 이전에 조회하지 않았으면(pinned=false) 이번에 조회한 것이므로 캐시의 크기를 조절한다.

다음 이슈를 한번 읽어봐야겠다. [github issue]​

5.1 buffer-release-fn

버퍼를 릴리즈 하는 함수다. 이것은 대체 어디서 실행하는 것일까?

!evictor-max-weight 가 0이 되었을 때, 어디서 무슨 일이 일어나는 거지?

일단 buffer-release-fn#(util/->arrow-buf-view allocator nio-buffer buffer-release-fn) 에 인자로 들어가있다.

원래 MemoryBufferPool 의 경우 release-fn 이 nil 이었을 것이다. 하지만 이번에는 지정해서 넣는다.

(defn ->arrow-buf-view
  (^org.apache.arrow.memory.ArrowBuf [^BufferAllocator allocator ^ByteBuffer nio-buffer]
   (->arrow-buf-view allocator nio-buffer nil))

  (^org.apache.arrow.memory.ArrowBuf [^BufferAllocator allocator ^ByteBuffer nio-buffer release-fn]
   (let [nio-buffer (if (and (.isDirect nio-buffer) (zero? (.position nio-buffer)))
		      nio-buffer
		      (-> (ByteBuffer/allocateDirect (.remaining nio-buffer))
			  (.put (.duplicate nio-buffer))
			  (.clear)))]
     (.wrapForeignAllocation allocator
			     (proxy [ForeignAllocation] [(.remaining nio-buffer) (MemoryUtil/getByteBufferAddress nio-buffer)]
			       (release0 []
				(try-free-direct-buffer nio-buffer)
				 ;; 여기서 실행한다. 대체 일단 이것은 어떻게 연결되는 거지?
				 (when release-fn
				   (release-fn))))))))

일단 이 release가 util/close 와 연결이 될까?

일단 .wrapForeignallocation 으로 ArrowBuf 를 만들었다. 이곳에 close 를 수행하면 release0 도 수행되어야 하는 것이다. 그렇다는 것은 FoeignAllocation 의 release0 을 쓰는 곳이 어딘지 소스코드에서 찾아봐야한다.

[소스코드 wrapForeignAllocation]​

여기서 우리가 proxy로 넣은 allocation 은? 바로 이 코드에 들어간다.

try {
    final AllocationManager manager = new ForeignAllocationManager(this, allocation);
    final BufferLedger ledger = manager.associate(this);
    final ArrowBuf buf =
	new ArrowBuf(ledger, /*bufferManager=*/ null, size, allocation.memoryAddress());
    buf.writerIndex(size);
    listener.onAllocation(size);
    return buf;
    ,,,

자. 이제 ForeignAllocationManger, BufferLedger 를 보면 될 것 같다.

5.1.1 ForeignAllocation.java

[ForeignAllocationManager source code]​

class ForeignAllocationManager extends AllocationManager {
  private final ForeignAllocation allocation;

  protected ForeignAllocationManager(
      BufferAllocator accountingAllocator, ForeignAllocation allocation) {
    super(accountingAllocator);
    this.allocation = allocation;
  }

  @Override
  public long getSize() {
    return allocation.getSize();
  }

  @Override
  protected long memoryAddress() {
    return allocation.memoryAddress();
  }

  @Override
  protected void release0() {
    allocation.release0();
  }
}

별거 없음 아까 우리가 proxy로 만든 release0 임. 여기서 중요한 것은. BufferLedger 를 만들때 사용한 associate 이다. 아래 코드에는 없으니 Allocationmanager 를 봐야한다.

5.1.2 AllocationManger

[AllocationManager source code]​

너무 김. 나중에 봐야하나? 하지만 일단 associate 코드를 집중해보자.

BufferLedger associate(final BufferAllocator allocator) {
  return associate(allocator, true);
}

private BufferLedger associate(final BufferAllocator allocator, final boolean retain) {
  ,,,
  // 여기서 this가 ForeignAllocationManager이다!!!!
  ledger = new BufferLedger(allocator, this);
  return ledger
}

release0 는 release 라는 함수 안에 있다.

void release(final BufferLedger ledger) {
  final BufferAllocator allocator = ledger.getAllocator();
  ...
  // free the memory chunk associated with the allocation manager
  release();
}

근데 이 release 라는 함수를 잘 보자. BufferLedger 를 받고 있다. 아까 BufferLedger 는 ArrowBuf를 만들 때 사용했다. 즉, 우리는 ForeignAllocation 을 만들었고 그 안에 release0 이 있다. 그리고 ForeignAllocation 으로 BufferLedger 를 만드는데 그 안에 ForeignAllocation 이 존재한다.

자 이제 거의다 왔다. ArrowBuf의 구현체를 보자.

public final  class ArrowBuf implements AutoCloseable {
  public ArrowBuf(
    final ReferenceManager referenceManager,
    final BufferManager bufferManager,
    ,,,
    ) { }

  @Override
  public void close() {
      referenceManager.release();
  }
}

여기서 referenceManager는 BufferLedger다!

이제 BufferLedger의 release를 보자.

class BufferLedger {
  BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
    this.allocator = allocator;
    // 이것이 ForeignAllocationManager로 보임.
    this.allocationManager = allocationManager;
  }

  @Override
  public ovid release() { return release(1); }

  @Override
  public boolean release(int decrement) {
    ,,,
    final int refCnt = decrement(decrement);
    return refCnt == 0;
  }
  private int decrement(int decrement) {
    ,,,
    allocationManager.release(this);
  }

즉, allocationManager.release 는 ForeignAllocationManager의 release를 수행하고 (this로 보낸거) 그 release 는 우리가 커스텀으로 넣은 release0을 실행한다.

5.2 update-evictor-key

(defn update-evictor-key {:style/indent 2} [^AsyncCache local-disk-cache-evictor ^Path k update-fn]
  (-> (.asMap local-disk-cache-evictor)
      (.compute k (reify BiFunction
		    (apply [_ _k fut] (update-fn fut))))))

6 RemoteBufferPool/putObject

특이하게 코드가 아주 짧다.

(putObject [_ k buffer]
  @(.putObject object-store k buffer))

object-storeputObject 는 비동기작업의 반환일 확률이 높다. (CompletableFuture)

@(deref) 를 이용해서 비동기 작업이 반환할 때까지 기다린다.

주요 특징:

  • 이 메소드는 원격 저장소(object-store)에 직접 객체를 저장합니다.
  • 로컬 디스크 캐시나 메모리 캐시에는 저장하지 않습니다.
  • 저장 작업이 완료될 때까지 기다립니다 (동기적으로 동작).
  • 에러 처리는 명시적으로 되어 있지 않지만, object-store의 putObject 메소드나 그 결과를 기다리는 과정에서 발생하는 예외는 호출자에게 전파될 것입니다.

object-storeObjectStore 타입을 가진다.

7 ObjectStore.kt

ObjectStore 는 원격저장소 구현체이다.

package xtdb.api.storage

import java.nio.ByteBuffer
import java.nio.file.Path
import java.util.concurrent.CompletableFuture

interface ObjectStore : AutoCloseable {
    /**
     * Asynchronously returns the given object in a ByteBuffer.
     *
     * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
     */
    fun getObject(k: Path): CompletableFuture<ByteBuffer>

    /**
     * Asynchronously returns a specified range of bytes from the object in a ByteBuffer.
     *
     * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
     */
    fun getObjectRange(k: Path, start: Long, len: Long): CompletableFuture<ByteBuffer>

    /**
     * Asynchronously writes the object to the given path.
     *
     * If the object doesn't exist, the CompletableFuture completes with an IllegalStateException.
     */
    fun getObject(k: Path, outPath: Path): CompletableFuture<Path>

    /**
     * Stores an object in the object store.
     */
    fun putObject(k: Path, buf: ByteBuffer): CompletableFuture<*>

    /**
     * Recursively lists all objects in the object store.
     *
     * Objects are returned in lexicographic order of their path names.
     */
    fun listAllObjects(): Iterable<Path>

    /**
     * Lists objects directly within the specified directory in the object store.
     *
     * Objects are returned in lexicographic order of their path names.
     */
    fun listObjects(dir: Path): Iterable<Path>

    /**
     * Deletes the object with the given path from the object store.
     */
    fun deleteObject(k: Path): CompletableFuture<*>

    override fun close() {
    }
}

interface ObjectStoreFactory {
    fun openObjectStore(): ObjectStore
}  

[source code]​

7.1 ObjectStore/putObject

역시 putObject는 CompletableFuture를 리턴하고 있다.

위에 있는 S3ObjectStore의 putObject 구현을 구경해보자.

[S3ObjectStore source code]​

(putObject [this k buf]
    ;; 먼저 주어진 키 k에 prefix를 추가하여 prefixed-key를 생성 
    (let [prefixed-key (util/prefix-key prefix k)]
      ;; S3 버킷에 해당 객체가 이미 존재하는지 확인하기 위해 headObject 요청을 보냄.
      (-> (.headObject client                       
		       (-> (HeadObjectRequest/builder)
			   (.bucket bucket)
			   (.key (str prefixed-key))
			   (->> (.configureHead configurator))
			   ^HeadObjectRequest (.build)))
	  ;; 성공하면(객체가 존재하면 true)
	  (.thenApply (fn [_resp] true))
	  ;; NoSuchKeyException 발생하면 false
	  (.exceptionally (fn [^Exception e]
			    (let [e (.getCause e)]
			      (if (instance? NoSuchKeyException e)
				false
				(throw e)))))
	  (.thenCompose (fn [exists?]
			  (if exists?
			    ;; 존재하면 nil 리턴(중복업로드방지)
			    (CompletableFuture/completedFuture nil)
			    ;; 없으면 단건 업로드 수행
			    (single-object-upload this k buf))))
	  (.thenApply (fn [_]
			;; Add file name to the local cache as the last thing we do (ie - if PUT
			;; fails, shouldnt add filename to the cache)
			(.add file-name-cache k))))))

7.1.1 single-object-upload

putObject 유효성을 다 패스하면 실행되는 실제로 업로드하는 함수

(defn single-object-upload
  [{:keys [^S3AsyncClient client ^S3Configurator configurator bucket ^Path prefix]} ^Path k ^ByteBuffer buf]
  (let [prefixed-key (util/prefix-key prefix k)]
    (.putObject client
		(-> (PutObjectRequest/builder)
		    (.bucket bucket)
		    (.key (str prefixed-key))
		    (->> (.configurePut configurator))
		    ^PutObjectRequest (.build))
		(AsyncRequestBody/fromByteBuffer buf))))
  1. 파라미터:
    • S3AsyncClient, S3Configurator, bucket, prefix 등의 설정 정보
    • 저장할 객체의 키 k
    • 저장할 데이터가 들어있는 ByteBuffer buf
  2. 동작: a. prefix-key 함수를 사용하여 주어진 키 k 에 prefix를 추가합니다.

    b. S3AsyncClient의 putObject 메소드를 호출하여 객체를 업로드합니다.

    c. PutObjectRequest를 생성합니다:

    • 버킷 이름 설정
    • 생성한 prefixed-key를 객체의 키로 설정
    • configurator를 사용하여 추가 설정 적용

    d. AsyncRequestBody를 사용하여 ByteBuffer의 내용을 비동기적으로 업로드합니다.

  3. 특징:
    • 비동기 작업: S3AsyncClient를 사용하여 비동기적으로 업로드를 수행합니다.
    • 설정 가능성: S3Configurator를 통해 요청을 추가로 구성할 수 있습니다.
    • 단일 객체 업로드: 이름에서 알 수 있듯이, 한 번에 하나의 객체만 업로드합니다.
  4. 반환값:
    • CompletableFuture를 반환합니다. 이를 통해 업로드 작업의 완료를 비동기적으로 처리할 수 있습니다.

이 함수는 S3에 단일 객체를 업로드하는 기본적인 작업을 수행합니다. 비동기 방식을 사용하여 효율적인 I/O 처리를 가능하게 하며, 설정을 통해 유연성을 제공합니다. putObject 메소드에서 객체가 존재하지 않을 때 이 함수를 호출하여 실제 업로드를 수행합니다.

7.2 ObjectStore/getObject

  1. 첫 번째 버전 (객체를 ByteBuffer로 반환):
(getObject [this k]
  (-> (.getObject client (get-obj-req this k) (AsyncResponseTransformer/toBytes))
      (.thenApply (reify Function
		    (apply [_ bs]
		      (.asByteBuffer ^ResponseBytes bs))))
      (with-exception-handler k)))
  • S3 클라이언트의 `getObject` 메소드를 호출하여 객체를 비동기적으로 가져옵니다.
  • AsyncResponseTransformer/toBytes (aws 모듈) 를 사용하여 응답을 바이트 배열로 변환합니다.
  • 응답을 ByteBuffer로 변환합니다.
  • 예외 처리를 위해 `with-exception-handler`를 적용합니다.
  • 두 번째 버전 (객체를 파일로 저장, RemoteBufferPool/getBuffer 에서 사용함):
(getObject [this k out-path]
  (-> (.getObject client (get-obj-req this k) out-path)
      (.thenApply (reify Function
		    (apply [_ _]
		      out-path)))
      (with-exception-handler k)))

이 버전은 다음과 같이 동작합니다:

  • S3 클라이언트의 `getObject` 메소드를 호출하여 객체를 비동기적으로 가져와 지정된 out-path 에 저장합니다.
  • 작업이 완료되면 out-path 를 반환합니다.
  • 예외 처리를 위해 with-exception-handler 를 적용합니다.

getObject는 S3Client.getObject 함수를 쓰는 거서 같다. 이중에 아래 인자 2개인 녀석같다.

공식문서에서 두 인자 설명은 다음과 같다.

Parameters:

getObjectRequest - A Consumer that will call methods on GetObjectRequest.Builder to create a request.
destinationPath - Path to file that response contents will be written to.
    The file must not exist or this method will throw an exception.
    If the file is not writable by the current user then an exception will be thrown.
    The service documentation for the response content is as follows '
Object data.

[공식문서링크]​

 default GetObjectResponse getObject(Consumer<GetObjectRequest.Builder> getObjectRequest,
Path destinationPath)

이 path 값은 RemoteBufferPool 에 buffer-cache-path 라는 이름이 값이 던져지고 있다.

(defrecord RemoteBufferPool [allocator
			   ^Cache memory-store
			   ^Path local-disk-cache
			   ^AsyncCache local-disk-cache-evictor
			   ^ObjectStore object-store
			   !evictor-max-weight]
IBufferPool
(getBuffer [_ k]
   (let [buffer-cache-path (.resolve local-disk-cache k)
      ,,,
      (-> (.getObject object-store k buffer-cache-path)

여튼 뭔가

두 버전 모두 다음과 같은 특징을 가집니다:

  1. 비동기 작업: CompletableFuture를 사용하여 비동기적으로 동작합니다.
  2. 예외 처리: `with-exception-handler`를 통해 NoSuchKeyException을 ObjectMissingException으로 변환합니다.
  3. S3 요청 구성: `get-obj-req` 함수를 사용하여 S3 GetObjectRequest를 구성합니다.

get-obj-req 함수는 다음과 같이 정의되어 있습니다:

(defn- get-obj-req
  ^GetObjectRequest [{:keys [^S3Configurator configurator bucket ^Path prefix]} ^Path k]
  (let [prefixed-key (util/prefix-key prefix k)]
    (-> (GetObjectRequest/builder)
	(.bucket bucket)
	(.key (str prefixed-key))
	(->> (.configureGet configurator))
	^GetObjectRequest (.build))))

이 함수는 S3 버킷, 키(prefix 포함), 그리고 추가 설정을 적용하여 GetObjectRequest를 생성합니다.

요약하면, 이 getObject 구현은 S3에서 객체를 효율적으로 검색하고, 비동기 처리와 적절한 예외 처리를 제공하며, 메모리나 파일 시스템으로 객체를 가져오는 유연성을 제공합니다.

8 RemoteBufferPool/openArrowWriter

putObject 와 다르게 멀티파트로 여러번 올림.

(openArrowWriter [_ k rel]
    (let [tmp-path (create-tmp-path local-disk-cache)]
      (util/with-close-on-catch [file-ch (util/->file-channel tmp-path util/write-truncate-open-opts)
				 unl (.startUnload rel file-ch)]

	(reify ArrowWriter
	  (writeBatch [_] (.writeBatch unl))

	  (end [_]
	    (.endFile unl)
	    (.close file-ch)

	    (upload-arrow-file allocator object-store k tmp-path)

	    (let [buffer-cache-path (.resolve local-disk-cache k)]
	      (update-evictor-key local-disk-cache-evictor k
				  (fn [^CompletableFuture fut]
				    (-> (or fut (CompletableFuture/completedFuture {:pinned? false}))
					(.thenApply (fn [{:keys [pinned?]}]
						      (log/tracef "Writing arrow file, %s, to local disk cache" k)
						      (util/create-parents buffer-cache-path)
						      ;; see #2847
						      (util/atomic-move tmp-path buffer-cache-path)
						      {:pinned? pinned? :file-size (util/size-on-disk buffer-cache-path)})))))))

	  (close [_]
	    (util/close unl)
	    (when (.isOpen file-ch)
	      (.close file-ch)))))))

8.1 create-tmp-path

뭐 일단 .tmpupload[disk-store].arrow 이름의 파일을 만드는 것인가?

(defn- create-tmp-path ^Path [^Path disk-store]
  (Files/createTempFile (doto (.resolve disk-store ".tmp") util/mkdirs)
			"upload" ".arrow"
			(make-array FileAttribute 0)))

8.2 utils/->file-channel

함수이름처럼 FileChannel/open 으로 해당 파일에 채널을 열어버린다.

(defn ->file-channel
  (^java.nio.channels.FileChannel [^Path path]
   (->file-channel path #{:read}))
  (^java.nio.channels.FileChannel [^Path path open-opts]
   (FileChannel/open path (into-array OpenOption (map #(standard-open-options % %) open-opts)))))

8.3 (.startUnload rel file-ch)

relRelation.kt 구현체이다. [Relation.kt]​

여기서 startUnload 코드를 보자.

fun startUnload(ch: WritableByteChannel): RelationUnloader = RelationUnloader(WriteChannel(ch))  

WriteChannel : ApacheArrow 구현체, FileChannel 을 받아서 Arrow 로 쓰기 위한 파일인듯?

RelationUnloader : Arrow 파일 형식으로 데이터를 저장한다. Unload 하기위한 구현체가 따로 있다.

Loader 추상 클래스와 그 구현체들: Arrow 파일에서 데이터를 로드합니다. (나중에 다시 보도록 하자)

inner class RelationUnloader(private val ch: WriteChannel) : AutoCloseable {
	// vectors 는 Relation 생성될 때 받는 인자.
	private val vectors = this@Relation.vectors.values
	private val schema = Schema(vectors.map { it.field })
	private val recordBlocks = mutableListOf<ArrowBlock>()

	init {
	    ch.write(MAGIC)
	    ch.align()
	    MessageSerializer.serialize(ch, schema)
	}

	fun writeBatch() {
	    val nodes = mutableListOf<ArrowFieldNode>()
	    val buffers = mutableListOf<ArrowBuf>()

	    vectors.forEach { it.unloadBatch(nodes, buffers) }

	    ArrowRecordBatch(rowCount, nodes, buffers).use { recordBatch ->
		MessageSerializer.serialize(ch, recordBatch)
		    .also { recordBlocks.add(it) }
	    }
	}

	fun endStream() {
	    ch.writeIntLittleEndian(MessageSerializer.IPC_CONTINUATION_TOKEN)
	    ch.writeIntLittleEndian(0)
	}

	fun endFile() {
	    endStream()

	    val footerStart = ch.currentPosition
	    ch.write(ArrowFooter(schema, emptyList(), recordBlocks), false)

	    val footerLength = ch.currentPosition - footerStart
	    check(footerLength > 0) { "Footer length must be positive" }
	    ch.writeIntLittleEndian(footerLength.toInt())
	    ch.write(MAGIC)
	}

	override fun close() {
	    ch.close()
	}
    }
  1. 초기화: Arrow 파일 헤더를 작성합니다. 여기에는 매직 넘버("ARROW1")와 스키마 정보가 포함
  2. 배치 쓰기: `writeBatch()` 메서드를 통해 데이터 배치를 파일에 기록합니다. 이 과정에서 각 벡터의 데이터를 `ArrowRecordBatch`로 변환하여 직렬화
  3. 스트림 종료: `endStream()` 메서드로 데이터 스트림의 끝을 표시
  4. 파일 종료: `endFile()` 메서드로 Arrow 파일을 마무리. 여기서는 파일 푸터를 작성하고, 푸터의 길이와 종료 매직 넘버를 기록
  5. 리소스 관리: `AutoCloseable`을 구현하여 사용 후 리소스를 적절히 해제

이 클래스를 사용하면 `Relation` 객체의 데이터를 표준 Arrow 파일 형식으로 효율적으로 저장할 수 있어, 다른 Arrow 호환 시스템과의 데이터 교환이 용이

뭔가 일이 커진 것 같다. 내 생각에는 Relation 이라는 클래스를 제대로 알아봐야 할 것 같다.

Date: 2024-08-17 Sat 00:00

Author: Younghwan Nam

Created: 2024-12-21 Sat 16:39

Emacs 27.2 (Org mode 9.4.4)

Validate