[20240822] XTDB Internals 4 - Relation
Table of Contents
1 Relation.kt
Relation은 vectors, rowCount 두개를 인자로 받는다.
class Relation(val vectors: SequencedMap<String, Vector>, override var rowCount: Int = 0) : RelationReader { override val schema get() = Schema(vectors.sequencedValues().map { it.field }) @JvmOverloads constructor(vectors: List<Vector>, rowCount: Int = 0) : this(vectors.associateByTo(linkedMapOf()) { it.name }, rowCount) @JvmOverloads constructor(allocator: BufferAllocator, schema: Schema, rowCount: Int = 0) : this(allocator, schema.fields, rowCount) @JvmOverloads constructor(allocator: BufferAllocator, fields: List<Field>, rowCount: Int = 0) : this(fields.map { fromField(allocator, it) }, rowCount)
이것은 누가 초기에 실행할까?
내 생각에는 metadata.clj 이다.
1.1 metadata.clj 는 Relation 을 초기화한다.
metadata 에 TableMetadata
는 Relation
을 인자로 받고 있다.
그리고 이것을 실행하는 ->table-metadata
함수가 relation 을 처음 만드는 것 같다. (metadata도 나중에 자세히 알아보자.)
(defn ->table-metadata ^xtdb.metadata.ITableMetadata [^IBufferPool buffer-pool ^Path file-path] (util/with-close-on-catch [buf (.getBuffer buffer-pool file-path)] (util/with-open [loader (Relation/loader buf)] (util/with-close-on-catch [rel (Relation. (.getAllocator (.getReferenceManager buf)) (.getSchema loader))] (let [nodes-vec (.get rel "nodes")] (.loadBatch loader 0 rel) (let [rdr (.getOldRelReader rel) ^IVectorReader metadata-reader (-> (.readerForName rdr "nodes") (.legReader "leaf")) {:keys [col-names page-idx-cache]} (->table-metadata-idxs metadata-reader) temporal-col-types-rdr (some-> (.structKeyReader metadata-reader "columns") (.listElementReader) (.structKeyReader "types") (.structKeyReader temporal-col-type-leg-name)) min-rdr (some-> temporal-col-types-rdr (.structKeyReader "min")) max-rdr (some-> temporal-col-types-rdr (.structKeyReader "max"))] (->TableMetadata (ArrowHashTrie. nodes-vec) rel buf metadata-reader col-names page-idx-cache (AtomicInteger. 1) min-rdr max-rdr)))))))
->table-metadata
는 버퍼풀과 파일패스를 받아서 TableMetadata
를 만든다.
- 버퍼 풀에서 파일에 대한 버퍼를 가져옵니다.
- Relation 로더를 생성하고 첫 번째 배치 데이터를 로드합니다.
- 메타데이터 정보(컬럼 이름, 페이지 인덱스 캐시 등)를 추출합니다.
- 시간 관련 컬럼 타입에 대한 리더를 설정합니다.
- 추출한 정보를 바탕으로 TableMetadata 인스턴스를 생성하여 반환합니다.
여기서 우리가 볼 것은 Relation 로더를 생성해서 데이터를 로드하는 부분이다.
@JvmStatic fun loader(buf: ArrowBuf): Loader { buf.referenceManager.retain() return BufferLoader(buf, readFooter(buf)) } private class BufferLoader( private val buf: ArrowBuf, footer: ArrowFooter ) : Loader() { override val schema: Schema = footer.schema inner class Batch(private val idx: Int, private val block: ArrowBlock): Loader.Batch { override fun load(rel: Relation) { val prefixSize = if (buf.getInt(block.offset) == MessageSerializer.IPC_CONTINUATION_TOKEN) 8L else 4L val metadataBuf = buf.nioBuffer(block.offset + prefixSize, block.metadataLength - prefixSize.toInt()) val bodyBuf = buf.slice(block.offset + block.metadataLength, block.bodyLength) .also { it.referenceManager.retain() } val msg = Message.getRootAsMessage(metadataBuf.asReadOnlyBuffer()) val recordBatchFB = RecordBatch().also { msg.header(it) } (MessageSerializer.deserializeRecordBatch(recordBatchFB, bodyBuf) ?: error("Failed to deserialize record batch $idx, offset ${block.offset}")) .use { batch -> rel.load(batch) } } } override val batches = footer.recordBatches.mapIndexed(::Batch) override fun close() = buf.close() }
위 load
함수로 몇가지 구조를 유추할 수 있다.
- 프리픽스 (Prefix)
크기: 4바이트 또는 8바이트 목적: 레코드 배치의 시작을 나타냄
- 메타데이터 섹션
위치: 프리픽스 직후 크기: block.metadataLength - prefixSize로 결정됨 내용: 레코드 배치에 대한 직렬화된 메타데이터 접근 방법: buf.nioBuffer() 사용
- 본문 섹션
위치: 메타데이터 섹션 이후 크기: block.bodyLength로 결정됨 내용: 실제 레코드 배치 데이터 접근 방법: buf.slice() 사용
- 전체 구조
[프리픽스][메타데이터][본문]
여기서 나의 눈에 띄는 것은 schema 가 footer에 존재한다는 것이다. readFooter 함수가 궁금하다.
1.1.1 readFooter
private fun readFooter(buf: ArrowBuf): ArrowFooter { val magicBytes = ByteArray(Int.SIZE_BYTES + MAGIC.size) val footerLengthOffset = buf.capacity() - magicBytes.size buf.getBytes(footerLengthOffset, magicBytes) require(MAGIC.contentEquals(magicBytes.copyOfRange(Int.SIZE_BYTES, magicBytes.size))) { "missing magic number at end of Arrow file" } val footerLength = MessageSerializer.bytesToInt(magicBytes) require(footerLength > 0) { "Footer length must be positive" } require(footerLength + MAGIC.size * 2 + Int.SIZE_BYTES <= buf.capacity()) { "Footer length exceeds file size" } val footerBuffer = ByteBuffer.allocate(footerLength) buf.getBytes(footerLengthOffset - footerLength, footerBuffer) footerBuffer.flip() return ArrowFooter(Footer.getRootAsFooter(footerBuffer)) }
주요 단계:
- 파일의 마지막 8바이트를 읽습니다:
- 4바이트: 푸터 길이
- 4바이트: 매직 넘버 ("ARROW1")
- 매직 넘버를 검증하여 유효한 Arrow 파일인지 확인합니다.
- 푸터 길이를 사용하여 푸터의 시작 위치를 찾습니다.
- 푸터 데이터를 ByteBuffer로 읽습니다.
- Flatbuffers를 사용하여 푸터 데이터를 파싱합니다.
주요 검증:
- 푸터 길이가 양수인지 확인합니다.
- 푸터가 파일 크기를 초과하지 않는지 확인합니다.
BufferLoader에서의 사용:
- 데이터의 스키마를 제공합니다.
- 효율적인 로딩을 위한 레코드 배치 정보를 제공합니다.
반환값: ArrowFooter 객체
- 스키마 정보
- 레코드 배치 메타데이터를 포함
중요성:
- 전체 파일을 읽지 않고도 Arrow 파일의 구조와 내용을 이해할 수 있게 합니다.
- 파일의 다른 부분에 대한 랜덤 액세스를 가능하게 합니다.
- 효율적인 데이터 처리를 위한 필수 메타데이터를 제공합니다.
다시 돌아와서 Relation은 어떻게 만들어지는가 그래서?
(util/with-open [loader (Relation/loader buf)] (util/with-close-on-catch [rel (Relation. (.getAllocator (.getReferenceManager buf)) (.getSchema loader))]
(Relation/loader buf)
의 리턴값은 BufferLoader
이고 이 안에 schema
는 footer: ArrowFooter
에서 나온 schema 정보를 가진다.
이는 vectors 가 아니지만 아래 생성자를 사용한다.
@JvmOverloads constructor(allocator: BufferAllocator, schema: Schema, rowCount: Int = 0) : this(allocator, schema.fields, rowCount) @JvmOverloads constructor(allocator: BufferAllocator, fields: List<Field>, rowCount: Int = 0) : this(fields.map { fromField(allocator, it) }, rowCount)
결국 footer에서 가져오는 schema 정보가 vectors 까지 이어진다.
그말을 처음 load할 때 ArrowBuf 형태로 저장된 파일이었을 것이고, 그 데이터의 footer를 읽어와서 schema를 읽고 그것이 Relation의 vectors 값으로까지 연결된다.