[240726] Postgres 를 이용한 Bitemporal Table 생성 및 CRUD 프로시저 기록
Table of Contents
1 동기
Bimteporal 한 데이터를 저장하고 다룰 때, 형식화하면 좋지 않을까 해서 만들어보았었다. 이것을 프로덕션에서 쓸 수 있을지는 잘 모르겠다. 단일 테이블에서는 잘 사용할 수 있을지 모르나, 다른 테이블과의 연결에서는 부족함이 있다. 물론 Temporal Table들간의 조인은 일반적인 Row-base RDB 로는 복잡한 쿼리가 만들어질 수 밖에 없다.
결국 이런 것들을 신경 써야 한다면, Datomic, XTDB 를 사용하는 것이 훨씬 옳다는 결론이 났다. 하지만 1인 프로젝트이므로, 인프라 구성은 아주 단순했으면 좋겠다.
즉, Database 하나에 서버 하나만 있으면 좋겠다고 생각했다.
그래서 MariaDB를 POC 해보기로 했고, 그전까지 내 프로젝트에서 사용하기위해 만든
Postgresql Procedure 모음집을 이곳에 기록해놓는다.
-- Bitemporal 테이블 생성 프로시저 CREATE OR REPLACE PROCEDURE create_bitemporal_table( p_table_name TEXT ) AS $$ DECLARE v_sql TEXT; BEGIN RAISE NOTICE 'Starting creation of bitemporal table: %', p_table_name; -- 테이블 생성 v_sql := format( 'CREATE TABLE IF NOT EXISTS %I ( id BIGINT, data JSONB, valid_time tstzrange NOT NULL, sys_time tstzrange NOT NULL )', p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Table created: %', p_table_name; -- EXCLUDE 제약 조건 추가 v_sql := format( 'ALTER TABLE %I ADD CONSTRAINT %I_exclude EXCLUDE USING gist ( id WITH =, valid_time WITH &&, sys_time WITH && )', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Exclude constraint added to %', p_table_name; -- 뷰 생성 v_sql := format( 'CREATE OR REPLACE VIEW current_%I AS SELECT id, data, valid_time FROM %I WHERE CURRENT_TIMESTAMP <@ sys_time AND CURRENT_TIMESTAMP <@ valid_time', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'View created: current_%', p_table_name; -- 유니크 인덱스 생성 v_sql := format( 'CREATE UNIQUE INDEX %I_id_valid_sys_unique ON %I (id, lower(valid_time), lower(sys_time))', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Unique index created for %', p_table_name; -- 추가 인덱스 생성 v_sql := format( 'CREATE INDEX %I_id_valid_sys_idx ON %I (id, valid_time, sys_time)', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Index created: %_id_valid_sys_idx', p_table_name; v_sql := format( 'CREATE INDEX %I_valid_time_idx ON %I USING GIST (valid_time)', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Index created: %_valid_time_idx', p_table_name; v_sql := format( 'CREATE INDEX %I_sys_time_idx ON %I USING GIST (sys_time)', p_table_name, p_table_name ); EXECUTE v_sql; RAISE NOTICE 'Index created: %_sys_time_idx', p_table_name; RAISE NOTICE 'Successfully created bitemporal table, view, and indexes: %', p_table_name; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION 'Error creating bitemporal table %: %', p_table_name, SQLERRM; END; $$ LANGUAGE plpgsql; -- 레코드 삽입 프로시저 CREATE OR REPLACE PROCEDURE insert_bitemporal_record( p_table_name TEXT, p_id BIGINT, p_data JSONB, p_valid_from TIMESTAMP WITH TIME ZONE, p_valid_to TIMESTAMP WITH TIME ZONE ) AS $$ DECLARE v_sql TEXT; BEGIN RAISE NOTICE 'Inserting record into %: id = %, valid_from = %, valid_to = %', p_table_name, p_id, p_valid_from, p_valid_to; v_sql := format( 'INSERT INTO %I (id, data, valid_time, sys_time) VALUES (%L, %L, tstzrange(%L, %L, ''[)''), tstzrange(CURRENT_TIMESTAMP, ''infinity'', ''[)''))', p_table_name, p_id, p_data, p_valid_from, p_valid_to ); EXECUTE v_sql; RAISE NOTICE 'Record inserted successfully'; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION 'Error inserting record into %: %', p_table_name, SQLERRM; END; $$ LANGUAGE plpgsql; -- 레코드 업데이트 프로시저 CREATE OR REPLACE PROCEDURE update_bitemporal_record( p_table_name TEXT, p_id BIGINT, p_data JSONB, p_valid_from TIMESTAMP WITH TIME ZONE, p_valid_to TIMESTAMP WITH TIME ZONE ) AS $$ DECLARE v_sys_end TIMESTAMP WITH TIME ZONE := CURRENT_TIMESTAMP; v_new_valid_range tstzrange; v_affected_rows INT; v_sql TEXT; BEGIN -- 입력 유효성 검사 IF p_table_name IS NULL OR p_id IS NULL OR p_data IS NULL OR p_valid_from IS NULL THEN RAISE EXCEPTION 'Invalid input parameters'; END IF; v_new_valid_range := tstzrange(p_valid_from, COALESCE(p_valid_to, 'infinity'::timestamptz), '[)'); RAISE NOTICE 'Starting update_bitemporal_record for table: %, id: %, valid range: %', p_table_name, p_id, v_new_valid_range; -- 트랜잭션 시작 BEGIN -- 1. 기존 레코드의 sys_time 종료 v_sql := ' UPDATE ' || quote_ident(p_table_name) || ' SET sys_time = tstzrange(lower(sys_time), $1, ''[)'') WHERE id = $2 AND upper(sys_time) = ''infinity'' AND valid_time && $3'; EXECUTE v_sql USING v_sys_end, p_id, v_new_valid_range; GET DIAGNOSTICS v_affected_rows = ROW_COUNT; RAISE NOTICE 'Step 1: Updated % existing records', v_affected_rows; -- 2. 새로운 레코드 삽입 v_sql := ' INSERT INTO ' || quote_ident(p_table_name) || ' (id, data, valid_time, sys_time) VALUES ($1, $2, $3, tstzrange($4, ''infinity'', ''[)''))'; EXECUTE v_sql USING p_id, p_data, v_new_valid_range, v_sys_end; RAISE NOTICE 'Step 2: Inserted new record'; RAISE NOTICE 'Successfully updated bitemporal record for table: %, id: %', p_table_name, p_id; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION 'Error in update_bitemporal_record: %', SQLERRM; -- 트랜잭션 롤백은 자동으로 수행됨 END; END; $$ LANGUAGE plpgsql; -- 레코드 논리적 삭제 프로시저 CREATE OR REPLACE PROCEDURE delete_bitemporal_record( p_table_name TEXT, p_id BIGINT ) AS $$ DECLARE v_sql TEXT; BEGIN RAISE NOTICE 'Logically deleting record from %: id = %', p_table_name, p_id; v_sql := format( 'UPDATE %I SET sys_time = tstzrange(lower(sys_time), CURRENT_TIMESTAMP, ''[)'') WHERE id = %L AND upper(sys_time) = ''infinity''', p_table_name, p_id ); EXECUTE v_sql; RAISE NOTICE 'Record logically deleted successfully'; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION 'Error deleting record from %: %', p_table_name, SQLERRM; END; $$ LANGUAGE plpgsql; -- 시간 여행 쿼리 함수 CREATE OR REPLACE FUNCTION get_records_at_time( p_table_name TEXT, p_valid_time TIMESTAMP WITH TIME ZONE, p_sys_time TIMESTAMP WITH TIME ZONE ) RETURNS TABLE ( id BIGINT, data JSONB, valid_time tstzrange, sys_time tstzrange ) AS $$ DECLARE v_sql TEXT; BEGIN RAISE NOTICE 'Querying records from % at valid_time: % and sys_time: %', p_table_name, p_valid_time, p_sys_time; v_sql := format( 'SELECT id, data, valid_time, sys_time FROM %I WHERE %L::timestamptz <@ valid_time AND %L::timestamptz <@ sys_time', p_table_name, p_valid_time, p_sys_time ); RETURN QUERY EXECUTE v_sql; RAISE NOTICE 'Query executed successfully'; END; $$ LANGUAGE plpgsql;