[20240814] XTDB Internals 1 - MemoryBufferPool

Table of Contents

1 xtdb.api.storage.Storage.kt

인메모리 스토리지 팩토리가 이곳에 있음.

[GITHUB SOURCE CODE]​

2 xtdb.api.Xtdb.kt

XTDB 핵심 객체

[GITHUB SOURCE CODE]​

3 Buffer Pool

4 IBufferPool

[IBufferPool.kt]​

package xtdb

import org.apache.arrow.memory.ArrowBuf
import xtdb.arrow.Relation
import java.nio.ByteBuffer
import java.nio.file.Path

interface IBufferPool : AutoCloseable {
    fun getBuffer(key: Path): ArrowBuf

    fun putObject(k: Path, buffer: ByteBuffer)

    /**
     * Recursively lists all objects in the buffer pool.
     *
     * Objects are returned in lexicographic order of their path names.
     */
    fun listAllObjects(): Iterable<Path>

    /**
     * Lists objects directly within the specified directory in the buffer pool.
     *
     * Objects are returned in lexicographic order of their path names.
     */
    fun listObjects(dir: Path): Iterable<Path>

    fun openArrowWriter(k: Path, rel: Relation): ArrowWriter
}
  • getBuffer : 각 키(Path) 는 하나의 버퍼(ArrowBuf) 를 가진다.
  • putObject : 주어진 키에 ByteBuffer를 ArrowBuf로 변환해서 쌍으로 연결하여 저장.

5 BufferAllocator

org.apache.arrow.memory.BufferAllocator 는 인터페이스이다. 버퍼를 새성 및 관리해주는 인터페이스이다.

[GITHUB SOURCE CODE]​

어디서 이것을 만드는 것일까?

bufferpool.clj 를 보면 integrant 설정이 있다.

(defmethod ig/prep-key :xtdb/buffer-pool [_ factory]
  {:allocator (ig/ref :xtdb/allocator)
   :factory factory})

(defmethod ig/init-key :xtdb/buffer-pool [_ {:keys [allocator ^Storage$Factory factory]}]
  (.openStorage factory allocator))

(defmethod ig/halt-key! :xtdb/buffer-pool [_ ^IBufferPool buffer-pool]
  (util/close buffer-pool))

:xtdb/allocator 의존성을 가지는데 어디서 만드는지 찾아보자.

[clojure.xtdb.node.impl.clj]​

(ns xtdb.node.impl
  (:import (org.apache.arrow.memory BufferAllocator RootAllocator))
  )

(defmethod ig/init-key :xtdb/allocator [_ _] (RootAllocator.))
(defmethod ig/halt-key! :xtdb/allocator [_ ^BufferAllocator a]
  (util/close a))

(defmethod ig/init-key :xtdb/default-tz [_ default-tz] default-tz)

RootAllocator는 뭘까?

5.1 RootAllocator

  • [공식문서]​
  • [GITHUB SOURCE CODE]​ : 별기능은 없고 BaseAllocator 를 확장함.
  • [BaseAllocator SOURCE CODE]​

    BaseAllocator의 wrapForeignAllocation(ForeignAllocation allocation) 에 대해 조금만 보면 ByteBuffer를 ArrowBuf 로 변환하고 있다. 자세한 내용은 아래설명에서 찾아볼 것.

    final ArrowBuf buf =
        new ArrowBuf(ledger, /*bufferManager=*/ null, size, allocation.memoryAddress());
    return buf;
    

6 MemoryBufferPool/putObject

가장 기본적인 MemoryBufferPool에 대해 보자.

Allocator 는 BufferAllocator 라고 Apache Arrow 구현체에 대해 종속적이다.

(defrecord MemorybufferPool [allocator, ^NavigableMap memory-store]
  IBufferPool
  (getBuffer [_ k] ,,,)
  (putObject [_ k buffer] ,,,)
  (listAllObjects [_] ,,,)
  (listObjects [_ dir] ,,,)
  (openArrowWriter [this k vsr] ,,,)

  EvictBufferTest
  (evict-cached-buffer! [_ _k] ,,,)

  Closeable
  (close [_] ,,,)

기본적으로 IBufferPool 인터페이스로 버퍼에 데이터를 넣고 조회하는 듯 하다.

이를 생성하는 함수는

(defn open-in-memory-storage [^BufferAllocator allocator]
  (->MemoryBufferPool (.newChildAllocagtor  allocator "buffer-pool" 0 Long/MAX_VALUE)
		      (TreeMap .)))

allocator를 사용하는 부분은 putObject 이다

(ns xtdb.buffer-pool
  (:require [xtdb.util as util])
  (:import [java.util NavigableMap TreeMap])

(defrecord MemoryBufferPool [allocator, memory-store]
  IBufferPool
  (putObject [_ k buffer]
    (locking memory-store
      ;; k : 키값(Path)
      ;; buffer : ByteBuffer
      ;; (util/->arrow-buf-view allocator buffer) : ArrowBuf 로 변환하여 리턴.
      (.put memory-store k (util/->arrow-buf-view allocator buffer)))

여기서 궁금한 것이 있을 것이다. locking 의 구현, util/->arrow-buf-view 의 구현

6.1 locking

locking 은 기본 clojure api 이다. (https://clojuredocs.org/clojure.core/locking)

이건 동기화를 위해 사용하는 기능이고, 내부구현은 special form인 monitor-enter 를 사용한다. (https://clojure.org/reference/special_forms#monitor-enter)

6.2 util/->arrow-buf-view

java.nio.ByteBuffer 를 ArrowBuf 로 변환하기 위한 함수.

중요한 것은 .wrapForeignallocation 이다. 외부 할당된 녀석(ByteBuffer) 를 ArrowBuf 구현으로 가져오는 것이다.

BufferAllocator 는 Arrow 에서 사용하는 메모리를 할당하여 버퍼를 생성하는 녀석이 아닌가 싶다.

https://github.com/xtdb/xtdb/blob/main/core/src/main/clojure/xtdb/util.clj#L434

(defn ->arrow-buf-view
  (^org.apache.arrow.memory.ArrowBuf [^BufferAllocator allocator ^ByteBuffer nio-buffer]
   (->arrow-buf-view allocator nio-buffer nil))

  (^org.apache.arrow.memory.ArrowBuf [^BufferAllocator allocator ^ByteBuffer nio-buffer release-fn]
 (let [nio-buffer (if (and (.isDirect nio-buffer) (zero? (.position nio-buffer)))
		    ;; 빈 nio-buffer 다이렉트버퍼면 그대로 리턴.
		    nio-buffer
		    ;; 비어있지 않거나, 다이렉트 버퍼가 아니면 다이렉트버퍼로 다시 만들어서 리턴.
		    (-> (ByteBuffer/allocateDirect (.remaining nio-buffer))
			(.put (.duplicate nio-buffer))
			(.clear)))]
   ;; BufferAllocator를 이용하여 nio-buffer(ArrowBuf) 의 주소와 사이즈를 리턴하면, 그것으로 ArrowBuf로 할당하는 듯.
   (.wrapForeignAllocation allocator
			   ;; ForeignAllocation 클래스는 거의 데이터 클래스와 같음. 별거 없음.
			   ;; 주소와 사이즈를 머금고 있고 release0 을 이용해서 free 방법만 위임해준다.
			   (proxy [ForeignAllocation] [(.remaining nio-buffer) (MemoryUtil/getByteBufferAddress nio-buffer)]
			     (release0 []
			      (try-free-direct-buffer nio-buffer)
			       (when release-fn
				 (release-fn))))))))
  1. 입력으로 BufferAllocator와 ByteBuffer를 받는다.
  2. 입력 ByteBuffer가 다이렉트 버퍼가 아니거나 position이 0이 아닌 경우, 새로운 다이렉트 버퍼를 생성하고 데이터를 복사함.

    (let [nio-buffer (if (and (.isDirect nio-buffer) (zero? (.position nio-buffer)))
    	      nio-buffer
    	      (-> (ByteBuffer/allocateDirect (.remaining nio-buffer))
    		  (.put (.duplicate nio-buffer))
    		  (.clear)))]
    
  3. allocator.wrapForeignAllocation을 사용하여 ArrowBuf를 생성합니다. 이는 외부 메모리를 Arrow 버퍼로 래핑.
  4. ForeignAllocation 프록시를 사용하여 버퍼 해제 로직을 구현합니다.
  5. 선택적으로 release-fn을 받아 버퍼 해제 시 추가 정리 작업을 수행할 수 있게 합니다.
  6. 네티(Netty)의 PlatformDependent.freeDirectBuffer를 사용하여 다이렉트 버퍼를 해제합니다.

    네티 기능은 try-free-direct-buffer 함수에서 사용함. Netty로 이용하는 함수는 (io.netty.util.internal.PlatformDependent/freeDirectBuffer nio-buffer) 이다.

    (.wrapForeignAllocation allocator
    		   (proxy [ForeignAllocation] [(.remaining nio-buffer) (MemoryUtil/getByteBufferAddress nio-buffer)]  ;;[size memroy-addres]
    		     (release0 []
    		      (try-free-direct-buffer nio-buffer)
    		       (when release-fn
    			 (release-fn)))))
    

6.3 ArrowBuf

Arrow의 메모미 관리 핵심 부분.

  1. 구조
    • ArrowBuf는 기본적으로 off-heap 메모리를 래핑함.
    • 내부적으로 nio.ByteBuffer를 사용하여 실제 메모리를 관리(?)
  2. 메모리 관리
    • 참조 카운팅 방식 이용.
    • BufferAllocator를 통해 생성, 관리

6.4 ByteBuffer

https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html

java.nio 에 있는 ByteBuffer 또한 핵심 개념임.

특이 Direct 여부가 중요하다.

  1. 다이렉트 버퍼 (Direct Buffer):
    • `isDirect()` 메소드가 true를 반환합니다.
    • JVM은 이 버퍼에 대해 네이티브 I/O 작업을 직접 수행하려고 노력합니다.
    • 중간 버퍼로의 복사 과정을 피하여 I/O 성능을 향상시킵니다.
    • `ByteBuffer.allocateDirect()` 메소드로 생성할 수 있습니다.
    • 일반적으로 할당과 해제 비용이 더 높습니다.
    • 내용이 일반적인 가비지 컬렉션 힙 외부에 있을 수 있어, 메모리 사용량 파악이 어려울 수 있습니다.
    • 파일의 특정 영역을 메모리에 직접 매핑하여 생성할 수도 있습니다.
    • 주로 크고 수명이 긴 버퍼에 사용되며, 특히 네이티브 I/O 작업에 적합합니다.
  2. 비다이렉트 버퍼 (Non-direct Buffer):
    • `isDirect()` 메소드가 false를 반환합니다.
    • 일반적인 JVM 힙 메모리에 할당됩니다.
    • 다이렉트 버퍼에 비해 할당과 해제 비용이 낮습니다.
  3. 사용 지침:
    • 다이렉트 버퍼는 주로 크고 수명이 긴 버퍼에 사용하는 것이 좋습니다.
    • 프로그램 성능에 측정 가능한 이득이 있을 때만 다이렉트 버퍼를 할당하는 것이 좋습니다.
    • 성능이 중요한 코드에서는 `isDirect()` 메소드를 사용하여 명시적인 버퍼 관리를 할 수 있습니다.
  4. 주의사항:
    • 다이렉트 버퍼가 접근 불가능한 메모리 영역을 참조하는 경우, 해당 영역에 접근하려고 하면 버퍼의 내용은 변경되지 않고 특정되지 않은 예외가 발생할 수 있습니다.
  5. 추가 생성 방법:
    • JNI를 통해 네이티브 코드에서 다이렉트 바이트 버퍼를 생성할 수도 있습니다(Java 플랫폼 구현에 따라 선택적으로 지원).

이러한 특성들로 인해, 다이렉트 버퍼는 특히 대용량 데이터 처리나 빈번한 네이티브 I/O 작업이 필요한 경우에 성능상의 이점을 제공할 수 있습니다.

우리가 보는 코드에서는 Direct Buffer를 만든다.

6.5 wrapForeignAllocation

그리고 allocator.wrapForeignAllocation 으로 아까 만든 DirectByteBuffer 를 Apache Arrow 메모리 관리 시스템으로 통합한다.

메모리 관리 통합:

Arrow의 BufferAllocator가 외부 메모리를 관리할 수 있게 합니다. 이를 통해 Arrow의 메모리 추적 및 참조 카운팅 메커니즘을 활용할 수 있습니다.

  1. 성능 최적화: 다이렉트 버퍼의 네이티브 메모리를 직접 사용하여 추가적인 복사를 방지합니다. I/O 작업이나 네이티브 코드와의 상호 작용 시 효율성을 높입니다.
  1. 리소스 해제 관리: ForeignAllocation 프록시를 통해 버퍼가 더 이상 필요하지 않을 때 적절히 해제될 수 있도록 합니다. try-free-direct-buffer 함수를 통해 다이렉트 버퍼의 메모리를 명시적으로 해제합니다.
  1. Arrow 생태계와의 호환성: 외부 메모리를 Arrow의 ArrowBuf로 감싸 Arrow의 다른 컴포넌트들과 원활하게 사용할 수 있게 합니다.
  1. 유연한 메모리 소스 지원: NIO 버퍼뿐만 아니라 다양한 외부 메모리 소스를 Arrow 시스템에 통합할 수 있는 방법을 제공합니다.

5.제로-카피 연산 지원: 가능한 경우 메모리 복사를 피하고 직접 참조를 사용하여 성능을 최적화합니다.

이 접근 방식은 Java NIO의 다이렉트 버퍼의 이점(네이티브 I/O 성능)과 Apache Arrow의 메모리 관리 기능을 결합하여, 고성능 데이터 처리 파이프라인에서 효율적인 메모리 사용을 가능하게 합니다.

allocator.wrapForeignallocation 구현에 대해 알아보자.

  • ForeignAllocation 프록시 객체를 생성한다. (https://arrow.apache.org/docs/dev/java/reference/org/apache/arrow/memory/ForeignAllocation.html)
    • 생성자 : protected ForeignALlocation(long size, long memoryAddress)
    • 메소드 :
      • public long getSize() : get the size of this allocation.
      • protected long memoryAddress : get the address of this allocation
      • protected abstract void release0() : free this allocation. will only called once.
    • 코드상 생성자에 버퍼의 크기와 주소를 전달한다. (proxy [ForeignAllocation] [(.remaining nio-buffer) (MemoryUtil/getByteBufferAddress nio-buffer)]
    • [소스코드 링크]​

       /*
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements.  See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License.  You may obtain a copy of the License at
       *
       *    http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package org.apache.arrow.memory;
      
      /**
       * EXPERIMENTAL: a memory allocation that does not come from a BufferAllocator, but rather an
       * outside source (like JNI).
       *
       * <p>To use this, subclass this class and implement {@link #release0()} to free the allocation.
       */
      public abstract class ForeignAllocation {
        private final long memoryAddress;
        private final long size;
      
        /**
         * Create a new AllocationManager representing an imported buffer.
         *
         * @param size The buffer size.
         * @param memoryAddress The buffer address.
         */
        protected ForeignAllocation(long size, long memoryAddress) {
          this.memoryAddress = memoryAddress;
          this.size = size;
        }
      
        /** Get the size of this allocation. */
        public long getSize() {
          return size;
        }
      
        /** Get the address of this allocation. */
        protected long memoryAddress() {
          return memoryAddress;
        }
      
        /** Free this allocation. Will only be called once. */
        protected abstract void release0();
      }
      
  • 메모리 해제 로직 구현
    • try-free-direct-buffer 함수를 호출하여 다이렉트 버퍼의 메모리를 해제
    • 추가적인 release-fn 이 제공되는 경우 이를 호출.

이 코드는 Java NIO의 ByteBuffer를 Apache Arrow의 ArrowBuf로 효율적으로 변환하며, 메모리 관리를 Arrow 시스템에 위임합니다. 이를 통해 네이티브 메모리 사용의 이점을 유지하면서 Arrow의 메모리 관리 기능을 활용할 수 있습니다.

7 MemoryBufferPool/listAllObjects

이는 TreeMap, NavigableMap 구현체의 키(Path)를 모두 vector로 리턴.

(listAllObjects [_]
    (locking memory-store (vec (.keySet ^NavigableMap memory-store))))

8 MemoryBufferPool/getBuffer

getBuffer는 키 값(Path)를 이용해서 키 값인 ArrowBuf 를 리턴한다.

memory-store는 단순하게 TreeMap 일 것이다.

IBufferPool
(getBuffer [_ k]
  (when k
    (or (locking memory-store
	  (some-> (.get memory-store k) retain))
	(throw (os/obj-missing-exception k)))))
  1. 함수 구조 : getBuffer 는 두 개의 인자를 받음 _this
  2. 키 검사: (when k …): 키 k가 nil이 아닌 경우에만 실행됩니다.
  3. 버퍼 검색 및 반환: (or … …): 두 개의 표현식 중 첫 번째가 truthy면 그 값을 반환하고, 아니면 두 번째를 실행합니다.
  4. 동기화된 메모리 접근: (locking memory-store …): memory-store에 대한 동기화된 접근을 보장합니다. 이는 스레드 안전성을 위한 것입니다.
  5. 버퍼 검색 및 유지:
    • (.get memory-store k) : memory-store에서 키 k에 해당하는 값을 가져옵니다.
    • some->: nil이 아닌 경우에만 다음 연산을 수행합니다.
    • retain: 버퍼를 찾으면 retain 함수를 호출하여 참조 카운트를 증가시킵니다.

getBuffer 메서드는 MemoryBufferPool 내부에서 직접 사용되지는 않지만, 외부에서 중요한 역할을 합니다. 주요 용도는 다음과 같습니다:

  • 외부 접근: 다른 컴포넌트나 클라이언트 코드에서 특정 키에 해당하는 버퍼를 요청할 때 사용됩니다.
  • 인터페이스 구현: IBufferPool 인터페이스의 일부로, 버퍼 풀의 일관된 API를 제공합니다.
  • 데이터 검색: 저장된 데이터에 접근할 때 사용됩니다. 예를 들어, Arrow 파일을 읽거나 처리할 때 필요합니다.
  • 메모리 관리: 버퍼를 가져올 때 참조 카운트를 증가시켜 메모리 관리를 돕습니다.
  • 스레드 안전성: 동기화된 접근을 제공하여 멀티스레드 환경에서의 안전성을 보장합니다.
  • 코드에서 getBuffer의 사용 예: 이 함수들은 getBuffer를 사용하여 특정 경로의 데이터에 접근하고, Arrow 파일의 구조를 읽거나 특정 레코드 배치를 열 때 사용됩니다. 따라서 getBuffer는 버퍼 풀의 핵심 기능을 외부에 노출시키는 중요한 인터페이스 메서드입니다. 내부 구현에서 직접 사용되지 않더라도, 전체 시스템에서 데이터 접근의 핵심 경로로 작용합니다.

8.1 MemoryUtil/getByteBufferAddress

org.apache.arrow.memory.util.MemoryUtil/getByteBufferAddress 는 nio의 ByteBuffer 에서 주소를 가져오는 함수.

메모리 주소 관리: MemoryUtil은 Java의 직접 메모리(Direct Memory)와 네이티브 메모리 사이의 상호 작용을 용이하게 합니다.

안전한 메모리 접근: 이 클래스는 JVM의 경계를 넘어 직접 메모리에 접근할 때 필요한 안전 장치를 제공합니다.

플랫폼 독립성: 다양한 플랫폼에서 일관된 방식으로 메모리 주소를 다룰 수 있게 해줍니다.

API 문서의 설명을 보자.

public static long getByteBufferAddress(ByteBuffer buf)

Given a ByteBuffer, gets the address the underlying memory space.

Parameters:
  buf - the byte buffer.

Returns:
  address of the underlying memory.

소스코드를 보자. GITHUB SOURCE

/** The offset of the address field with the {@link java.nio.ByteBuffer} object. */
private static final long BYTE_BUFFER_ADDRESS_OFFSET;

static {
  // ...
  // get the offset of the address field in a java.nio.Buffer object
  Field addressField = java.nio.Buffer.class.getDeclaredField("address");
  addressField.setAccessible(true);
  // ...
}
/**
 * Given a {@link ByteBuffer}, gets the address the underlying memory space.
 *
 * @param buf the byte buffer.
 * @return address of the underlying memory.
 */
 public static long getByteBufferAddress(ByteBuffer buf) {
   return UNSAFE.getLong(buf, BYTE_BUFFER_ADDRESS_OFFSET);
 }

UNSAFE 라는 기능을 쓴다. 오랫만에 본다. UNSAFE 는 예전에 블로그를 쓴 적이 있다. 다른 플랫폼에 적었던 것인데 찾아서 이곳에도 옮겨놓아야겠다.

Unsafe 는 Java 버전이 올라가면서 쓰지 말라고 권고하는 클래스 중에 하나이다. [GITHUB SOURCE CODE LINK]​

/**
  * @deprecated Use {@link java.lang.foreign} to access off-heap memory.
  *
  * @see #getByte(long)
  */
 @Deprecated(since="23", forRemoval=true)
 @ForceInline
 public long getLong(long address) {
   beforeMemoryAccess();
   return theInternalUnsafe.getLong(address);
}

// Reports the location of a given field in the storage allocation of its class.
// Do not expect to perform any sort of arithmetic on this offset;
// it is just a cookie which is passed to the unsafe heap memory accessors.
public long objectFieldOffset(Field f) {
  return objectFieldOffset0(f);
}

private native long objectFieldOffset0(Field f);


theInternalUnsafe.getLong 은 native 기능이다.

/** @see #getInt(Object, long) */
@IntrinsicCandidate
public native long    getLong(Object o, long offset);

이 메소드는 주어진 java 객체의 offset 에서 값을 가져온다.

9 listObjects(dir: Path): Iterable<Path>

특정 디렉토리(Path)에 있는 키(Path)를 리턴한다.

(listObjects [_ dir]
  (locking memory-store
    (let [dir-depth (.getNameCount dir)]
      (->> (.keySet (.tailMap ^NavigableMap memory-store dir))
	   (take-while #(.startsWith ^Path % dir))
	   (keep (fn [^Path path]
		   (when (> (.getNameCount path) dir-depth)
		     (.subpath path 0 (inc dir-depth)))))
	   (distinct)
	   (vec)))))
  1. 동기화(locking)

    스레드 안전성을 보장하기 위해 memory-store에 대한 락을 획득합니다.

    (locking memory-store ...)
    
  2. 디렉토리 깊이 계산:

    (let [dir-depth (.getNameCount dir)]
    

    주어진 디렉토리의 깊이(경로 요소의 수)를 계산합니다.

  3. 키 집합 가져오기:

    (.keySet (.tailMap ^NavigableMap memory-store dir))
    

    memory-store에서 dir보다 크거나 같은 모든 키를 가져온다. (이것을 위해서 NavigableMap 이 필요했던 것)

  4. 필터링:

    (take-while #(.startsWith ^Path % dir))
    

    dir로 시작하는 경로만 선택.

  5. 경로 처리:

    (keep (fn [^Path path]
      (when (> (.getNameCount path) dir-depth)
        (.subpath path 0 (inc dir-depth)))))
    

    dir보다 깊은 경로만 선택하고, 그 경로의 상위 부분(dir의 바로 아래 수준)만 추출.

    keep 은 참이면 남기는 함수.

    (when (> (.getNameCount path) dir-depth) ...)path 의 길이가 dir-depth 보다 긴 경우만 처리. 이는 dir 자체나 그보다 얕은 경로는 무시하고 더 깊은 경로만 고려.

    (.subpath path 0 (inc dir-depth)) 로 부분경로만 가져온다. subpath의 (0, dir-dpeth + 1) 부분을 제외한 subpath를 리턴. 이는 dir 바로 아래 수준의 경로만을 가져오는 것.

  6. 중복제거 및 벡터 변환

    (distinct)
    (vec)
    

10 MemoryBufferPool/openArrowWriter

openArrowWriter의 인터페이스 시그니처

fun openArrowWriter(k: Path, vsr: VectorSchemaRoot): ArrowWriter

Apache Arrow 파일을 메모리에 쓰기 위한 writer를 생성.

(openArrowWriter [this k vsr]
    ;; 초기화                
    (let [baos (ByteArrayOutputStream.)]
      ;; with-close-on-catch 예외발생시 리소스를 안전하게 닫는다.
      ;; Channels/newChannel로 ByteArrayOutputStream에 쓸 채널을 생성합니다
      ;; ArrowFileWriter를 생성하여 Arrow 파일을 쓸 준비를 합니다.
      ;; (Arrow 파일 형식의 데이터를 메모리에 생성한다는 의미)
      (util/with-close-on-catch [write-ch (Channels/newChannel baos)
				 aw (ArrowFileWriter. vsr nil write-ch)]
	(try
	  ;; 쓰지시작
	  (.start aw)
	  (catch ClosedByInterruptException e
	    (throw (InterruptedException.))))

	;;IArrowWriter 즉시 구현.
	(reify ArrowWriter
	  ;; Arrow writer의 writeBatch 메서드를 호출하여 데이터 배치를 씁니다.
	  (writeBatch [] (.writeBatch aw))
	  ;; Arrow 파일 쓰기를 완료하고, 채널을 닫음.
	  ;; 작성된 데이터를 ByteBuffer로 변환하여 putObject로 저장.
	  (end []
	    (.end aw)
	    (.close write-ch)
	    ;; MemoryBufferPool의 putObject 메서드를 호출하여 데이터를 저장합니다.
	    ;; ByteArrayOutputStream의 모든 내용을 바이트 배열로 변환 후 ByteBuffer로 감싼다.
	    (.putObject this k (ByteBuffer/wrap (.toByteArray baos))))
	  ;; Arrow writer와 채널을 안전하게 닫습니다.
	  (close [_]
	    (close-arrow-writer aw)
	    (when (.isOpen write-ch)
	      (.close write-ch)))))))
  1. openArrowWriter 의 역할:
    • ArrowWriter 인터페이스를 구현하는 클로저를 생성
    • 이 클로저는 데이터를 메모리(ByteArrayOutputStream)에 쓰기 위한 준비를 한다.
  2. writeBatch 실행:
    • ~writeBatch~를 실행하면 데이터가 ByteArrayOutputStream에 쓰여집니다.
    • 이 시점에서는 아직 MemoryBufferPool에 데이터가 저장되지 않습니다.
  3. MemoryBufferPool에 데이터 저장:
    • 데이터는 end 메서드가 호출될 때 MemoryBufferPool에 저장됩니다.
  4. MemoryBufferPool 참조:
    • MemoryBufferPool의 참조는 this 를 통해 사용.
    • openArrowWriter 메서드는 MemoryBufferPool 레코드 내부에 정의되어 있으므로, this 는 현재 MemoryBufferPool 인스턴스를 가리킴.

코드 구조를 다시 보면:

(defrecord MemoryBufferPool [allocator, ^NavigableMap memory-store]
  IBufferPool
  ;; 다른 메서드들...

  (openArrowWriter [this k vsr]
    (let [baos (ByteArrayOutputStream.)]
      (util/with-close-on-catch [write-ch (Channels/newChannel baos)
				 aw (ArrowFileWriter. vsr nil write-ch)]
	;; ... 생략 ...
	(reify ArrowWriter
	  (writeBatch [_] (.writeBatch aw))
	  (end [_]
	    (.end aw)
	    (.close write-ch)
	    (.putObject this k (ByteBuffer/wrap (.toByteArray baos))))
	  ;; ... 생략 ...
	))))

여기서 this 는 MemoryBufferPool 인스턴스를 참조합니다. end 메서드 내에서 (.putObject this k ...) 를 호출할 때 이 참조를 사용하여 데이터를 MemoryBufferPool에 저장.

정리하면:

  • openArrowWriter는 데이터를 메모리에 쓰기 위한 준비를 합니다.
  • writeBatch는 데이터를 메모리(ByteArrayOutputStream)에 씁니다.
  • end가 호출될 때 비로소 데이터가 MemoryBufferPool에 저장됩니다.

10.1 openArrowWriter 가 왜 필요한 거지?

putObject와는 다른 것인가?

이는 뭔가 Writer를 이용해서 저장해야 하는 값이 있는 것 같다. 하지만 이는 여러개를 한번에 저장하는 기능은 아닌 것 같다.

시작은 아래 이슈때문인 것 같다.

10.1.1 Issue 1 - 대용량 저장에서 에러가 나고 있다.

[XTDB GITHUB ISSUE]​

2023년 10월 5일 이슈가 생성되었다.

  1. 문제정의
    1. .putObject 메서드가 1GB이상의 대용량 파일을 효과적으로 처리하지 못함 (메모리 사용량 및 네트워크 시간 초과)
    2. 주로 두 가지 상황에서 .putObject 호출이 발생: L0 쓰기(인덱싱 중 청크 플러시) 및 압축(L1+ 파일 생성)

      보고자의 생각에는 L0 쓰기는 여기 #2816에 설명된 제한에 묶일 가능성이 높으므로 테이블당 심각한 메모리 또는 시간 초과 문제를 일으키지 않을 것으로 생각되므로 덜 걱정할 수 있다고 함.(#2816도 확인해봐야할듯)

  2. 메모리

    .putObject는 현재 java nio ByteBuffer를 사용한다. 이는 전체 객체가 메모리에 있거나 메모리에 매핑되어 있다고 가정. 메모리 매핑을 하지 않으면 버퍼가 커져 사용 가능한 메모리가 소진되거나 메모리가 부족한 쿼리에 과도한 압력이 가해질 수 있다.

    압축(Compaction)은 현재 배열이 .putObject를 통해 스토리지로 전송되는 heap ByteBuffers 로 구현되는 ByteArrayOutputStream에 쓰는데, 이를 위해서는 기껏해야 전체 객체가 힙으로 구현되거나 ByteArrayOutputStream의 .toArray 복사 의미론으로 인해 2배의 메모리가 할당되어야 합니다. (뭔가 잘못되어있긴하군)

  3. 제안된 해결방안
    • Require a memory mapped buffer, or have it be our accepted way of writing large files
      • 메모리 매핑 여부와 관계없이 전체 버퍼를 미리 작성해야 함
      • 멀티파트를 사용할 경우 업로드를 분할하여 병렬로 전송 가능
      • SDK는 매핑된 메모리에서 직접 작업하여 복사를 피할 수 있어야 함 (이 가정은 테스트 필요)
      • 타임아웃 정책이 적절히 설정된다면 멀티파트가 반드시 필요하지 않을 수 있음
      • 메모리 매핑 사용 시 파일 사용을 암시함
    • Require a file e.g putFile rather than putObject
      • 전체 파일을 미리 작성해야 함
      • 디스크 공간 필요
      • 멀티파트가 필요하지 않을 수 있음
      • 클라우드 SDK는 일반적으로 파일 핸들을 직접 처리할 수 있어 우리가 직접 코드를 작성할 필요성 감소
      • 작은 버퍼의 경우 파일 열기/닫기 및 복사와 관련된 지연 시간이 크게 중요하지 않을 수 있음
      • 여전히 타임아웃 정책을 검토하고 파일 크기에 따른 타임아웃 조정이 필요한지 테스트해야 함
      • 병렬 처리 조정 기회가 줄어들 수 있음
      • 임시 파일 정리를 기억해야 할 수 있음
    • Expose an OutputStream or nio.WritableByteChannel
      • 파이프라이닝과 제한된 병렬 처리 가능
      • 전체 스트림을 미리 실현할 필요 없음
      • 상대적으로 더 정교할 수 있으며, 멀티파트가 필요할 가능성 높음

그래서 일단 멀티파트 업로드를 구현하게 된다.

10.1.2 MultiPart Upload

일단 여러 클라우드 서비스를 지원하려고 하지만 S3 관련 이슈를 공유한다.

  1. xtdb.object-store.clj

    잠깐 PR을 보면 xtdb.object-store.cljIMultipartUpload 라는 인터페이스를 만들었다.

    #_{:clj-kondo/ignore [:unused-binding :clojure-lsp/unused-public-var]}
    (definterface IMultipartUpload
      (^java.util.concurrent.CompletableFuture #_<Void> uploadPart [^java.nio.ByteBuffer buf] 
       "Asynchonously uploads a part to the multipart request - adds to the internal completed parts list")
      (^java.util.concurrent.CompletableFuture #_<Void> complete []
       "Asynchonously completes the multipart-request")
      (^java.util.concurrent.CompletableFuture #_<?> abort []
       "Asynchonously cancels the multipart-request - useful/necessary to cleanup any parts of the multipart upload on an error"))
    

    그리고 ObjectStore 인터페이스에 startMultipart 메소드가 추가되었다.

    #_{:clj-kondo/ignore [:unused-binding :clojure-lsp/unused-public-var]}
    (definterface ObjectStore
      (^java.util.concurrent.CompletableFuture #_<ByteBuffer> getObject [^String k]
    								    "Asynchonously returns the given object in a ByteBuffer
        If the object doesn't exist, the CF completes with an IllegalStateException.")
    
      (^java.util.concurrent.CompletableFuture #_<ByteBuffer> getObjectRange
       [^String k ^long start ^long len]
       "Asynchonously returns the given len bytes starting from start (inclusive) of the object in a ByteBuffer
        If the object doesn't exist, the CF completes with an IllegalStateException.
    
        Out of bounds `start` cause the returned future to complete with an Exception, the type of which is implementation dependent.
        If you supply a len that exceeds the number of bytes remaining (from start) then you will receive less bytes than len.
    
        Exceptions are thrown immediately if the start is negative, or the len is zero or below. This is
        to ensure consistent boundary behaviour between different object store implementations. You should check for these conditions and deal with them
        before calling getObjectRange.
    
        Behaviour for a start position at or exceeding the byte length of the object is undefined. You may or may not receive an exception.")
    
      (^java.util.concurrent.CompletableFuture #_<Path> getObject [^String k, ^java.nio.file.Path out-path]
    							      "Asynchronously writes the object to the given path.
        If the object doesn't exist, the CF completes with an IllegalStateException.")
    
      (^java.util.concurrent.CompletableFuture #_<?> putObject [^String k, ^java.nio.ByteBuffer buf])
    
      (^java.util.concurrent.CompletableFuture #_<IMultipartUpload> startMultipart [^String k])
      (^java.lang.Iterable #_<String> listObjects [])
      (^java.lang.Iterable #_<String> listObjects [^String dir])
      (^java.util.concurrent.CompletableFuture #_<?> deleteObject [^String k]))
    
  2. s3-stack.yml

    S3 버킷에 대한 IAM 정책에 멀티파트 업로드 관련 권한(AbortMultipartUpload, ListBucketMultipartUploads)이 추가되었다.

  3. s3.clj
    • MultipartUpload 레코드가 추가되어 IMultipartUpload 인터페이스를 구현
    • S3ObjectStore 레코드에 멀티파트 업로드 관련 메서드들이 추가
    • 단일 객체 업로드와 멀티파트 업로드를 구분하여 처리하는 로직이 추가

    뭐 여튼 openArrowwriter는 대용량 업로드를 위해서는 필요한 녀석인 듯 하다.

10.2 ArrowWriter

ArrowWriter 는 xtdb 내부에서 쓰는 BufferPool 구현체들이 사용하는 ArrowWriter 를 일관성있게 사용하려고 만든 인터페이스 같음.

우리가 이전에 IBufferPool.kt 의 인터페이스 중에 openArrowWriter 메소드의 리턴값이다.

package xtdb

interface ArrowWriter : AutoCloseable {
  fun writeBatch()
  fun end()
}

[stdb/core/src/main/kotlin/xtdb/ArrowWriter.kt]​

그런데 이 구현을 MemoryBefferPool 구현체에서 보면 이상한 것이 있다.

(openArrowWriter [this k rel]
  (let [baos (ByteArrayOutputStream.)]
    (util/with-close-on-catch [write-ch (Channels/newChannel baos)
			       unl (.startUnload rel write-ch)]
      (reify ArrowWriter
	(writeBatch [_] (.writeBatch unl))

	(end [_]
	  (.endFile unl)
	  (.close write-ch)
	  (.putObject this k (ByteBuffer/wrap (.toByteArray baos))))

	(close [_]
	  (util/close unl)
	  (when (.isOpen write-ch)
	    (.close write-ch)))))))

writeBatch 를 구현하라고 했더니, 그대로 (.writeBatch unl) 에 위임한다. 이미 writeBatch 를 구현해놓아져있다. 돌아가보면 rel 이 대체뭔지를 알아봐야겠다고 생각이 들었다.

10.3 xtdb.arrow.Relation (optional)

[xtdb.arrow.Relation]​

Relation 은 RelationReader 인터페이스를 구현한다.

그냥 봐선 별기능이 있는 것 같지 않다. 그저 데이터를 Arrow 와 연결이 쉽도록 하기 위한 코드같아보인다.

궁금했던 writeBatch() 에 대해 알아보자.

// Apache Arrow 파일 포맷에서 사용하는 매직 바이트 인듯?
private val MAGIC = "ARROW1".toByteArray()

inner class RelationUnloader(private val ch: WriteChannel) : AutoCloseable {

	private val vectors = this@Relation.vectors.values
	private val schema = Schema(vectors.map { it.field })
	private val recordBlocks = mutableListOf<ArrowBlock>()

	init {
	    // 파일 시작에 매직 바이트 쓰기.
	    ch.write(MAGIC)
	    ch.align()
	    MessageSerializer.serialize(ch, schema)
	}

	fun writeBatch() {
	    val nodes = mutableListOf<ArrowFieldNode>()
	    val buffers = mutableListOf<ArrowBuf>()

	    vectors.forEach { it.unloadBatch(nodes, buffers) }

	    ArrowRecordBatch(rowCount, nodes, buffers).use { recordBatch ->
		MessageSerializer.serialize(ch, recordBatch)
		    .also { recordBlocks.add(it) }
	    }
	}

	fun endStream() {
	    ch.writeIntLittleEndian(MessageSerializer.IPC_CONTINUATION_TOKEN)
	    ch.writeIntLittleEndian(0)
	}

	fun endFile() {
	    endStream()

	    val footerStart = ch.currentPosition
	    ch.write(ArrowFooter(schema, emptyList(), recordBlocks), false)

	    val footerLength = ch.currentPosition - footerStart
	    check(footerLength > 0) { "Footer length must be positive" }
	    ch.writeIntLittleEndian(footerLength.toInt())
	    // 파일 마지막에 다시 매직 바이트 쓰기 
	    ch.write(MAGIC)
	}

	override fun close() {
	    ch.close()
	}
    }

단순히 arrow 파일로 쓰기 위한 것인듯. 매직바이트를 쓰는 것이 특이하다. 아마 읽을 때도 이 매직바이트를 읽을 것이다.

10.4 xtdb.arrow.RelationReader (optional)

[xtdb.arrow.RelationReader.kt]​

주요구현을 보자.

  • get(colName: String) : 컬럼 이름을 통한 데이터 접근과 반복자를 제공
  • select 메서드를 통해 데이터 부분 집합을 쉽게 생성할 수 있음.
  • from 메서드로 OldRelationReader 를 새로운 RelationReader 로 변환한다.
package xtdb.arrow

import org.apache.arrow.vector.types.pojp.Schema
import java.util.*
import xtdb.vector.RelationReader as OldRelationReader

interface RelationReader : Interable<VectorReader>, AutoCloseable {
  val schema: Schema
  val rowCount: Int

  operator fun get(colName: String): VectorReader?

  private class IndirectRelation(
    private val vecs: SequencedMap<String, VectorReader>,
    override val rowCount: Int
  ) : RelationReader {
      override val schema get() = Schema(vecs.map { it.value field })
      override fun get(colName: String) = vecs[colName]
      override fun iterator() = vecs.values.iterator()
      override fun close() = vecs.forEach { it.value.close() }
  }

  fun select(idxs: IntArray): RelationReader =
      IndirectRelation(associateTo(linkedMapOf()) { it.name to it.select(idxs) }, idxs.size)

  override fun close() = forEach { it.close() }

  companion object {
    @JvmStatic
    fun from(oldReader: OldRelationReader) = object : RelationReader {
	override val schema = Schema(oldReader.map { it.field })
	override val rowCount: Int get() = oldReader.rowCount()

	override operator fun get(colName: String) =
	    oldReader.readerForName(colName)?.let { VectorReader.from(it) }

	override fun iterator() = oldReader.asSequence().map { VectorReader.from(it) }.iterator()
    }
  }
}

Date: 2024-08-13 Tue 00:00

Author: Younghwan Nam

Created: 2024-09-25 Wed 07:42

Emacs 27.2 (Org mode 9.4.4)

Validate