[20240822] XTDB Internals 4 - Relation

Table of Contents

1 Relation.kt

[Relation.kt source code]​

Relation은 vectors, rowCount 두개를 인자로 받는다.

class Relation(val vectors: SequencedMap<String, Vector>, override var rowCount: Int = 0) : RelationReader {

    override val schema get() = Schema(vectors.sequencedValues().map { it.field })

    @JvmOverloads
    constructor(vectors: List<Vector>, rowCount: Int = 0)
	    : this(vectors.associateByTo(linkedMapOf()) { it.name }, rowCount)

    @JvmOverloads
    constructor(allocator: BufferAllocator, schema: Schema, rowCount: Int = 0)
	    : this(allocator, schema.fields, rowCount)

    @JvmOverloads
    constructor(allocator: BufferAllocator, fields: List<Field>, rowCount: Int = 0)
	    : this(fields.map { fromField(allocator, it) }, rowCount)

이것은 누가 초기에 실행할까?

내 생각에는 metadata.clj 이다.

1.1 metadata.clj 는 Relation 을 초기화한다.

[metadata.clj]​

metadata 에 TableMetadataRelation 을 인자로 받고 있다. 그리고 이것을 실행하는 ->table-metadata 함수가 relation 을 처음 만드는 것 같다. (metadata도 나중에 자세히 알아보자.)

(defn ->table-metadata ^xtdb.metadata.ITableMetadata [^IBufferPool buffer-pool ^Path file-path]
  (util/with-close-on-catch [buf (.getBuffer buffer-pool file-path)]
    (util/with-open [loader (Relation/loader buf)]
      (util/with-close-on-catch [rel (Relation. (.getAllocator (.getReferenceManager buf))
						(.getSchema loader))]
	(let [nodes-vec (.get rel "nodes")]
	  (.loadBatch loader 0 rel)
	  (let [rdr (.getOldRelReader rel)
		^IVectorReader metadata-reader (-> (.readerForName rdr "nodes")
						   (.legReader "leaf"))
		{:keys [col-names page-idx-cache]} (->table-metadata-idxs metadata-reader)
		temporal-col-types-rdr (some-> (.structKeyReader metadata-reader "columns")
					       (.listElementReader)
					       (.structKeyReader "types")
					       (.structKeyReader temporal-col-type-leg-name))

		min-rdr (some-> temporal-col-types-rdr (.structKeyReader "min"))
		max-rdr (some-> temporal-col-types-rdr (.structKeyReader "max"))]
	    (->TableMetadata (ArrowHashTrie. nodes-vec) rel buf metadata-reader col-names page-idx-cache (AtomicInteger. 1)
			     min-rdr max-rdr)))))))

->table-metadata 는 버퍼풀과 파일패스를 받아서 TableMetadata 를 만든다.

  1. 버퍼 풀에서 파일에 대한 버퍼를 가져옵니다.
  2. Relation 로더를 생성하고 첫 번째 배치 데이터를 로드합니다.
  3. 메타데이터 정보(컬럼 이름, 페이지 인덱스 캐시 등)를 추출합니다.
  4. 시간 관련 컬럼 타입에 대한 리더를 설정합니다.
  5. 추출한 정보를 바탕으로 TableMetadata 인스턴스를 생성하여 반환합니다.

여기서 우리가 볼 것은 Relation 로더를 생성해서 데이터를 로드하는 부분이다.

@JvmStatic
fun loader(buf: ArrowBuf): Loader {
  buf.referenceManager.retain()
  return BufferLoader(buf, readFooter(buf))
}

    private class BufferLoader(
	private val buf: ArrowBuf,
	footer: ArrowFooter
    ) : Loader() {
	override val schema: Schema = footer.schema

	inner class Batch(private val idx: Int, private val block: ArrowBlock): Loader.Batch {

	    override fun load(rel: Relation) {
		val prefixSize =
		    if (buf.getInt(block.offset) == MessageSerializer.IPC_CONTINUATION_TOKEN) 8L else 4L

		val metadataBuf = buf.nioBuffer(block.offset + prefixSize, block.metadataLength - prefixSize.toInt())

		val bodyBuf = buf.slice(block.offset + block.metadataLength, block.bodyLength)
		    .also { it.referenceManager.retain() }

		val msg = Message.getRootAsMessage(metadataBuf.asReadOnlyBuffer())
		val recordBatchFB = RecordBatch().also { msg.header(it) }

		(MessageSerializer.deserializeRecordBatch(recordBatchFB, bodyBuf)
		    ?: error("Failed to deserialize record batch $idx, offset ${block.offset}"))

		    .use { batch -> rel.load(batch) }
	    }
	}

	override val batches = footer.recordBatches.mapIndexed(::Batch)

	override fun close() = buf.close()
    }

load 함수로 몇가지 구조를 유추할 수 있다.

  1. 프리픽스 (Prefix)

크기: 4바이트 또는 8바이트 목적: 레코드 배치의 시작을 나타냄

  1. 메타데이터 섹션

위치: 프리픽스 직후 크기: block.metadataLength - prefixSize로 결정됨 내용: 레코드 배치에 대한 직렬화된 메타데이터 접근 방법: buf.nioBuffer() 사용

  1. 본문 섹션

위치: 메타데이터 섹션 이후 크기: block.bodyLength로 결정됨 내용: 실제 레코드 배치 데이터 접근 방법: buf.slice() 사용

  1. 전체 구조

[프리픽스][메타데이터][본문]

여기서 나의 눈에 띄는 것은 schema 가 footer에 존재한다는 것이다. readFooter 함수가 궁금하다.

1.1.1 readFooter

private fun readFooter(buf: ArrowBuf): ArrowFooter {
    val magicBytes = ByteArray(Int.SIZE_BYTES + MAGIC.size)
    val footerLengthOffset = buf.capacity() - magicBytes.size
    buf.getBytes(footerLengthOffset, magicBytes)

    require(MAGIC.contentEquals(magicBytes.copyOfRange(Int.SIZE_BYTES, magicBytes.size))) {
	"missing magic number at end of Arrow file"
    }

    val footerLength = MessageSerializer.bytesToInt(magicBytes)
    require(footerLength > 0) { "Footer length must be positive" }
    require(footerLength + MAGIC.size * 2 + Int.SIZE_BYTES <= buf.capacity()) { "Footer length exceeds file size" }

    val footerBuffer = ByteBuffer.allocate(footerLength)
    buf.getBytes(footerLengthOffset - footerLength, footerBuffer)
    footerBuffer.flip()
    return ArrowFooter(Footer.getRootAsFooter(footerBuffer))
}

주요 단계:

  1. 파일의 마지막 8바이트를 읽습니다:
    1. 4바이트: 푸터 길이
    2. 4바이트: 매직 넘버 ("ARROW1")
  2. 매직 넘버를 검증하여 유효한 Arrow 파일인지 확인합니다.
  3. 푸터 길이를 사용하여 푸터의 시작 위치를 찾습니다.
  4. 푸터 데이터를 ByteBuffer로 읽습니다.
  5. Flatbuffers를 사용하여 푸터 데이터를 파싱합니다.

주요 검증:

  1. 푸터 길이가 양수인지 확인합니다.
  2. 푸터가 파일 크기를 초과하지 않는지 확인합니다.

BufferLoader에서의 사용:

  1. 데이터의 스키마를 제공합니다.
  2. 효율적인 로딩을 위한 레코드 배치 정보를 제공합니다.

반환값: ArrowFooter 객체

  1. 스키마 정보
  2. 레코드 배치 메타데이터를 포함

중요성:

  1. 전체 파일을 읽지 않고도 Arrow 파일의 구조와 내용을 이해할 수 있게 합니다.
  2. 파일의 다른 부분에 대한 랜덤 액세스를 가능하게 합니다.
  3. 효율적인 데이터 처리를 위한 필수 메타데이터를 제공합니다.

다시 돌아와서 Relation은 어떻게 만들어지는가 그래서?

(util/with-open [loader (Relation/loader buf)]
  (util/with-close-on-catch [rel (Relation. (.getAllocator (.getReferenceManager buf))
					    (.getSchema loader))]

(Relation/loader buf) 의 리턴값은 BufferLoader 이고 이 안에 schemafooter: ArrowFooter 에서 나온 schema 정보를 가진다.

이는 vectors 가 아니지만 아래 생성자를 사용한다.

@JvmOverloads
constructor(allocator: BufferAllocator, schema: Schema, rowCount: Int = 0)
	: this(allocator, schema.fields, rowCount)

@JvmOverloads
constructor(allocator: BufferAllocator, fields: List<Field>, rowCount: Int = 0)
	: this(fields.map { fromField(allocator, it) }, rowCount)

결국 footer에서 가져오는 schema 정보가 vectors 까지 이어진다.

그말을 처음 load할 때 ArrowBuf 형태로 저장된 파일이었을 것이고, 그 데이터의 footer를 읽어와서 schema를 읽고 그것이 Relation의 vectors 값으로까지 연결된다.

Date: 2024-08-22 Thu 00:00

Author: Younghwan Nam

Created: 2024-09-25 Wed 07:42

Emacs 27.2 (Org mode 9.4.4)

Validate