[240726] Postgres 를 이용한 Bitemporal Table 생성 및 CRUD 프로시저
Table of Contents
1 동기
Bimteporal 한 데이터를 저장하고 다룰 때, 형식화하면 좋지 않을까 해서 만들어보았었다. 이것을 프로덕션에서 쓸 수 있을지는 잘 모르겠다. 단일 테이블에서는 잘 사용할 수 있을지 모르나, 다른 테이블과의 연결에서는 부족함이 있다. 물론 Temporal Table들간의 조인은 일반적인 Row-base RDB 로는 복잡한 쿼리가 만들어질 수 밖에 없다.
결국 이런 것들을 신경 써야 한다면, Datomic, XTDB 를 사용하는 것이 훨씬 옳다는 결론이 났다. 하지만 1인 프로젝트이므로, 인프라 구성은 아주 단순했으면 좋겠다.
즉, Database 하나에 서버 하나만 있으면 좋겠다고 생각했다.
그래서 MariaDB를 POC 해보기로 했고, 그전까지 내 프로젝트에서 사용하기위해 만든
Postgresql Procedure 모음집을 이곳에 기록해놓는다.
-- Bitemporal 테이블 생성 프로시저
CREATE OR REPLACE PROCEDURE create_bitemporal_table(
p_table_name TEXT
) AS $$
DECLARE
v_sql TEXT;
BEGIN
RAISE NOTICE 'Starting creation of bitemporal table: %', p_table_name;
-- 테이블 생성
v_sql := format(
'CREATE TABLE IF NOT EXISTS %I (
id BIGINT,
data JSONB,
valid_time tstzrange NOT NULL,
sys_time tstzrange NOT NULL
)',
p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Table created: %', p_table_name;
-- EXCLUDE 제약 조건 추가
v_sql := format(
'ALTER TABLE %I ADD CONSTRAINT %I_exclude
EXCLUDE USING gist (
id WITH =,
valid_time WITH &&,
sys_time WITH &&
)',
p_table_name, p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Exclude constraint added to %', p_table_name;
-- 뷰 생성
v_sql := format(
'CREATE OR REPLACE VIEW current_%I AS
SELECT id, data, valid_time
FROM %I
WHERE CURRENT_TIMESTAMP <@ sys_time
AND CURRENT_TIMESTAMP <@ valid_time',
p_table_name,
p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'View created: current_%', p_table_name;
-- 유니크 인덱스 생성
v_sql := format(
'CREATE UNIQUE INDEX %I_id_valid_sys_unique ON %I (id, lower(valid_time), lower(sys_time))',
p_table_name, p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Unique index created for %', p_table_name;
-- 추가 인덱스 생성
v_sql := format(
'CREATE INDEX %I_id_valid_sys_idx ON %I (id, valid_time, sys_time)',
p_table_name, p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Index created: %_id_valid_sys_idx', p_table_name;
v_sql := format(
'CREATE INDEX %I_valid_time_idx ON %I USING GIST (valid_time)',
p_table_name, p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Index created: %_valid_time_idx', p_table_name;
v_sql := format(
'CREATE INDEX %I_sys_time_idx ON %I USING GIST (sys_time)',
p_table_name, p_table_name
);
EXECUTE v_sql;
RAISE NOTICE 'Index created: %_sys_time_idx', p_table_name;
RAISE NOTICE 'Successfully created bitemporal table, view, and indexes: %', p_table_name;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Error creating bitemporal table %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;
-- 레코드 삽입 프로시저
CREATE OR REPLACE PROCEDURE insert_bitemporal_record(
p_table_name TEXT,
p_id BIGINT,
p_data JSONB,
p_valid_from TIMESTAMP WITH TIME ZONE,
p_valid_to TIMESTAMP WITH TIME ZONE
) AS $$
DECLARE
v_sql TEXT;
BEGIN
RAISE NOTICE 'Inserting record into %: id = %, valid_from = %, valid_to = %', p_table_name, p_id, p_valid_from, p_valid_to;
v_sql := format(
'INSERT INTO %I (id, data, valid_time, sys_time)
VALUES (%L, %L, tstzrange(%L, %L, ''[)''), tstzrange(CURRENT_TIMESTAMP, ''infinity'', ''[)''))',
p_table_name, p_id, p_data, p_valid_from, p_valid_to
);
EXECUTE v_sql;
RAISE NOTICE 'Record inserted successfully';
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Error inserting record into %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;
-- 레코드 업데이트 프로시저
CREATE OR REPLACE PROCEDURE update_bitemporal_record(
p_table_name TEXT,
p_id BIGINT,
p_data JSONB,
p_valid_from TIMESTAMP WITH TIME ZONE,
p_valid_to TIMESTAMP WITH TIME ZONE
) AS $$
DECLARE
v_sys_end TIMESTAMP WITH TIME ZONE := CURRENT_TIMESTAMP;
v_new_valid_range tstzrange;
v_affected_rows INT;
v_sql TEXT;
BEGIN
-- 입력 유효성 검사
IF p_table_name IS NULL OR p_id IS NULL OR p_data IS NULL OR p_valid_from IS NULL THEN
RAISE EXCEPTION 'Invalid input parameters';
END IF;
v_new_valid_range := tstzrange(p_valid_from, COALESCE(p_valid_to, 'infinity'::timestamptz), '[)');
RAISE NOTICE 'Starting update_bitemporal_record for table: %, id: %, valid range: %', p_table_name, p_id, v_new_valid_range;
-- 트랜잭션 시작
BEGIN
-- 1. 기존 레코드의 sys_time 종료
v_sql := '
UPDATE ' || quote_ident(p_table_name) || '
SET sys_time = tstzrange(lower(sys_time), $1, ''[)'')
WHERE id = $2 AND upper(sys_time) = ''infinity''
AND valid_time && $3';
EXECUTE v_sql USING v_sys_end, p_id, v_new_valid_range;
GET DIAGNOSTICS v_affected_rows = ROW_COUNT;
RAISE NOTICE 'Step 1: Updated % existing records', v_affected_rows;
-- 2. 새로운 레코드 삽입
v_sql := '
INSERT INTO ' || quote_ident(p_table_name) || '
(id, data, valid_time, sys_time)
VALUES ($1, $2, $3, tstzrange($4, ''infinity'', ''[)''))';
EXECUTE v_sql USING p_id, p_data, v_new_valid_range, v_sys_end;
RAISE NOTICE 'Step 2: Inserted new record';
RAISE NOTICE 'Successfully updated bitemporal record for table: %, id: %', p_table_name, p_id;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Error in update_bitemporal_record: %', SQLERRM;
-- 트랜잭션 롤백은 자동으로 수행됨
END;
END;
$$ LANGUAGE plpgsql;
-- 레코드 논리적 삭제 프로시저
CREATE OR REPLACE PROCEDURE delete_bitemporal_record(
p_table_name TEXT,
p_id BIGINT
) AS $$
DECLARE
v_sql TEXT;
BEGIN
RAISE NOTICE 'Logically deleting record from %: id = %', p_table_name, p_id;
v_sql := format(
'UPDATE %I SET sys_time = tstzrange(lower(sys_time), CURRENT_TIMESTAMP, ''[)'')
WHERE id = %L AND upper(sys_time) = ''infinity''',
p_table_name, p_id
);
EXECUTE v_sql;
RAISE NOTICE 'Record logically deleted successfully';
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Error deleting record from %: %', p_table_name, SQLERRM;
END;
$$ LANGUAGE plpgsql;
-- 시간 여행 쿼리 함수
CREATE OR REPLACE FUNCTION get_records_at_time(
p_table_name TEXT,
p_valid_time TIMESTAMP WITH TIME ZONE,
p_sys_time TIMESTAMP WITH TIME ZONE
) RETURNS TABLE (
id BIGINT,
data JSONB,
valid_time tstzrange,
sys_time tstzrange
) AS $$
DECLARE
v_sql TEXT;
BEGIN
RAISE NOTICE 'Querying records from % at valid_time: % and sys_time: %', p_table_name, p_valid_time, p_sys_time;
v_sql := format(
'SELECT id, data, valid_time, sys_time
FROM %I
WHERE %L::timestamptz <@ valid_time
AND %L::timestamptz <@ sys_time',
p_table_name, p_valid_time, p_sys_time
);
RETURN QUERY EXECUTE v_sql;
RAISE NOTICE 'Query executed successfully';
END;
$$ LANGUAGE plpgsql;