[240726] Postgres 를 이용한 Bitemporal Table 생성 및 CRUD 프로시저 기록

Table of Contents

1 동기

Bimteporal 한 데이터를 저장하고 다룰 때, 형식화하면 좋지 않을까 해서 만들어보았었다. 이것을 프로덕션에서 쓸 수 있을지는 잘 모르겠다. 단일 테이블에서는 잘 사용할 수 있을지 모르나, 다른 테이블과의 연결에서는 부족함이 있다. 물론 Temporal Table들간의 조인은 일반적인 Row-base RDB 로는 복잡한 쿼리가 만들어질 수 밖에 없다.

결국 이런 것들을 신경 써야 한다면, Datomic, XTDB 를 사용하는 것이 훨씬 옳다는 결론이 났다. 하지만 1인 프로젝트이므로, 인프라 구성은 아주 단순했으면 좋겠다.

즉, Database 하나에 서버 하나만 있으면 좋겠다고 생각했다.

그래서 MariaDB를 POC 해보기로 했고, 그전까지 내 프로젝트에서 사용하기위해 만든

Postgresql Procedure 모음집을 이곳에 기록해놓는다.

-- Bitemporal 테이블 생성 프로시저
CREATE OR REPLACE PROCEDURE create_bitemporal_table(
    p_table_name TEXT
) AS $$
DECLARE
    v_sql TEXT;
BEGIN
    RAISE NOTICE 'Starting creation of bitemporal table: %', p_table_name;

    -- 테이블 생성
    v_sql := format(
	'CREATE TABLE IF NOT EXISTS %I (
	    id BIGINT,
	    data JSONB,
	    valid_time tstzrange NOT NULL,
	    sys_time tstzrange NOT NULL
	)', 
	p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Table created: %', p_table_name;

    -- EXCLUDE 제약 조건 추가
    v_sql := format(
	'ALTER TABLE %I ADD CONSTRAINT %I_exclude 
	 EXCLUDE USING gist (
	    id WITH =,
	    valid_time WITH &&,
	    sys_time WITH &&
	 )',
	p_table_name, p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Exclude constraint added to %', p_table_name;

    -- 뷰 생성
    v_sql := format(
	'CREATE OR REPLACE VIEW current_%I AS
	 SELECT id, data, valid_time
	 FROM %I
	 WHERE CURRENT_TIMESTAMP <@ sys_time
	   AND CURRENT_TIMESTAMP <@ valid_time',
	p_table_name,
	p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'View created: current_%', p_table_name;

    -- 유니크 인덱스 생성
    v_sql := format(
	'CREATE UNIQUE INDEX %I_id_valid_sys_unique ON %I (id, lower(valid_time), lower(sys_time))',
	p_table_name, p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Unique index created for %', p_table_name;

    -- 추가 인덱스 생성
    v_sql := format(
	'CREATE INDEX %I_id_valid_sys_idx ON %I (id, valid_time, sys_time)',
	p_table_name, p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Index created: %_id_valid_sys_idx', p_table_name;

    v_sql := format(
	'CREATE INDEX %I_valid_time_idx ON %I USING GIST (valid_time)',
	p_table_name, p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Index created: %_valid_time_idx', p_table_name;

    v_sql := format(
	'CREATE INDEX %I_sys_time_idx ON %I USING GIST (sys_time)',
	p_table_name, p_table_name
    );
    EXECUTE v_sql;
    RAISE NOTICE 'Index created: %_sys_time_idx', p_table_name;

    RAISE NOTICE 'Successfully created bitemporal table, view, and indexes: %', p_table_name;
EXCEPTION
    WHEN OTHERS THEN
	RAISE EXCEPTION 'Error creating bitemporal table %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;

-- 레코드 삽입 프로시저
CREATE OR REPLACE PROCEDURE insert_bitemporal_record(
    p_table_name TEXT,
    p_id BIGINT,
    p_data JSONB,
    p_valid_from TIMESTAMP WITH TIME ZONE,
    p_valid_to TIMESTAMP WITH TIME ZONE
) AS $$
DECLARE
    v_sql TEXT;
BEGIN
    RAISE NOTICE 'Inserting record into %: id = %, valid_from = %, valid_to = %', p_table_name, p_id, p_valid_from, p_valid_to;

    v_sql := format(
	'INSERT INTO %I (id, data, valid_time, sys_time) 
	 VALUES (%L, %L, tstzrange(%L, %L, ''[)''), tstzrange(CURRENT_TIMESTAMP, ''infinity'', ''[)''))',
	p_table_name, p_id, p_data, p_valid_from, p_valid_to
    );
    EXECUTE v_sql;

    RAISE NOTICE 'Record inserted successfully';
EXCEPTION
    WHEN OTHERS THEN
	RAISE EXCEPTION 'Error inserting record into %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;

-- 레코드 업데이트 프로시저

CREATE OR REPLACE PROCEDURE update_bitemporal_record(
    p_table_name TEXT,
    p_id BIGINT,
    p_data JSONB,
    p_valid_from TIMESTAMP WITH TIME ZONE,
    p_valid_to TIMESTAMP WITH TIME ZONE
) AS $$
DECLARE
    v_sys_end TIMESTAMP WITH TIME ZONE := CURRENT_TIMESTAMP;
    v_new_valid_range tstzrange;
    v_affected_rows INT;
    v_sql TEXT;
BEGIN
    -- 입력 유효성 검사
    IF p_table_name IS NULL OR p_id IS NULL OR p_data IS NULL OR p_valid_from IS NULL THEN
	RAISE EXCEPTION 'Invalid input parameters';
    END IF;

    v_new_valid_range := tstzrange(p_valid_from, COALESCE(p_valid_to, 'infinity'::timestamptz), '[)');
    RAISE NOTICE 'Starting update_bitemporal_record for table: %, id: %, valid range: %', p_table_name, p_id, v_new_valid_range;

    -- 트랜잭션 시작
    BEGIN
	-- 1. 기존 레코드의 sys_time 종료
	v_sql := '
	    UPDATE ' || quote_ident(p_table_name) || '
	    SET sys_time = tstzrange(lower(sys_time), $1, ''[)'')
	    WHERE id = $2 AND upper(sys_time) = ''infinity''
	      AND valid_time && $3';

	EXECUTE v_sql USING v_sys_end, p_id, v_new_valid_range;
	GET DIAGNOSTICS v_affected_rows = ROW_COUNT;
	RAISE NOTICE 'Step 1: Updated % existing records', v_affected_rows;

	-- 2. 새로운 레코드 삽입
	v_sql := '
	    INSERT INTO ' || quote_ident(p_table_name) || '
	    (id, data, valid_time, sys_time)
	    VALUES ($1, $2, $3, tstzrange($4, ''infinity'', ''[)''))';

	EXECUTE v_sql USING p_id, p_data, v_new_valid_range, v_sys_end;
	RAISE NOTICE 'Step 2: Inserted new record';

	RAISE NOTICE 'Successfully updated bitemporal record for table: %, id: %', p_table_name, p_id;
    EXCEPTION
	WHEN OTHERS THEN
	    RAISE EXCEPTION 'Error in update_bitemporal_record: %', SQLERRM;
	    -- 트랜잭션 롤백은 자동으로 수행됨
    END;
END;
$$ LANGUAGE plpgsql;


-- 레코드 논리적 삭제 프로시저
CREATE OR REPLACE PROCEDURE delete_bitemporal_record(
    p_table_name TEXT,
    p_id BIGINT
) AS $$
DECLARE
    v_sql TEXT;
BEGIN
    RAISE NOTICE 'Logically deleting record from %: id = %', p_table_name, p_id;

    v_sql := format(
	'UPDATE %I SET sys_time = tstzrange(lower(sys_time), CURRENT_TIMESTAMP, ''[)'')
	 WHERE id = %L AND upper(sys_time) = ''infinity''',
	p_table_name, p_id
    );
    EXECUTE v_sql;

    RAISE NOTICE 'Record logically deleted successfully';
EXCEPTION
    WHEN OTHERS THEN
	RAISE EXCEPTION 'Error deleting record from %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;

-- 시간 여행 쿼리 함수
CREATE OR REPLACE FUNCTION get_records_at_time(
    p_table_name TEXT,
    p_valid_time TIMESTAMP WITH TIME ZONE,
    p_sys_time TIMESTAMP WITH TIME ZONE
) RETURNS TABLE (
    id BIGINT,
    data JSONB,
    valid_time tstzrange,
    sys_time tstzrange
) AS $$
DECLARE
    v_sql TEXT;
BEGIN
    RAISE NOTICE 'Querying records from % at valid_time: % and sys_time: %', p_table_name, p_valid_time, p_sys_time;

    v_sql := format(
	'SELECT id, data, valid_time, sys_time
	 FROM %I
	 WHERE %L::timestamptz <@ valid_time
	   AND %L::timestamptz <@ sys_time',
	p_table_name, p_valid_time, p_sys_time
    );
    RETURN QUERY EXECUTE v_sql;

    RAISE NOTICE 'Query executed successfully';
END;
$$ LANGUAGE plpgsql;

Date: 2024-07-26 Fri 00:00

Author: Younghwan Nam

Created: 2024-09-25 Wed 07:42

Emacs 27.2 (Org mode 9.4.4)

Validate