Last active
March 16, 2023 11:40
-
-
Save trbngr/a31ada5cc961d4c940dc5a647a0f9571 to your computer and use it in GitHub Desktop.
Delete events of a certain type from a stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create or replace function delete_stream_events_of_type(target_stream_id bigint, target_event_type text) returns integer | |
language sql | |
as | |
$$ | |
-- disable triggers | |
alter table stream_events disable trigger no_update_stream_events; | |
alter table stream_events disable trigger no_delete_stream_events; | |
alter table events disable trigger no_delete_events; | |
alter table events disable trigger no_update_events; | |
-- delete events | |
with target_events as (select e.event_id | |
from events e | |
inner join stream_events se on e.event_id = se.event_id | |
inner join streams s on s.stream_id = se.original_stream_id | |
where s.stream_id = target_stream_id | |
and e.event_type = target_event_type), | |
deleted_stream_events as ( | |
delete from stream_events | |
where event_id in (select event_id from target_events) | |
returning event_id), | |
linked_events as ( | |
delete from stream_events | |
where event_id in (select event_id from deleted_stream_events)) | |
delete | |
from events | |
where event_id in (select event_id from deleted_stream_events); | |
-- drop the unique index so we can reset stream_version columns | |
select 'drop index ix_stream_events'; | |
update stream_events se | |
set stream_version = row_num, | |
original_stream_version = row_num | |
from (select event_id, row_number() over (order by stream_version) as row_num | |
from stream_events | |
where stream_id = target_stream_id) x | |
where x.event_id = se.event_id | |
and se.stream_id = target_stream_id; | |
-- re-create the unique index now that we're done | |
select 'create unique index if not exists ix_stream_events on stream_events (stream_id, stream_version)'; | |
-- update the stream version | |
update streams | |
set stream_version = (select max(stream_version) from stream_events where stream_id = target_stream_id) | |
where stream_id = target_stream_id; | |
-- re-enable triggers | |
alter table stream_events enable trigger no_update_stream_events; | |
alter table stream_events enable trigger no_delete_stream_events; | |
alter table events enable trigger no_delete_events; | |
alter table events enable trigger no_update_events; | |
select 1; | |
$$; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create or replace procedure delete_stream_events_of_type(in target_stream_id bigint, in target_event_type text) | |
language sql | |
as $$ | |
-- disable triggers | |
alter table stream_events disable trigger no_update_stream_events; | |
alter table stream_events disable trigger no_delete_stream_events; | |
alter table events disable trigger no_delete_events; | |
alter table events disable trigger no_update_events; | |
-- delete events | |
with target_events as (select e.event_id | |
from events e | |
inner join stream_events se on e.event_id = se.event_id | |
inner join streams s on s.stream_id = se.original_stream_id | |
where s.stream_id = target_stream_id | |
and e.event_type = target_event_type), | |
deleted_stream_events as ( | |
delete from stream_events | |
where event_id in (select event_id from target_events) | |
returning event_id), | |
linked_events as ( | |
delete from stream_events | |
where event_id in (select event_id from deleted_stream_events)) | |
delete | |
from events | |
where event_id in (select event_id from deleted_stream_events); | |
-- drop the unique index so we can reset stream_version columns | |
select 'drop index ix_stream_events'; | |
update stream_events se | |
set stream_version = row_num, | |
original_stream_version = row_num | |
from (select event_id, row_number() over (order by stream_version) as row_num | |
from stream_events | |
where stream_id = target_stream_id) x | |
where x.event_id = se.event_id | |
and se.stream_id = target_stream_id; | |
-- re-create the unique index now that we're done | |
select 'create unique index if not exists ix_stream_events on stream_events (stream_id, stream_version)'; | |
-- update the stream version | |
update streams | |
set stream_version = (select max(stream_version) from stream_events where stream_id = target_stream_id) | |
where stream_id = target_stream_id; | |
-- re-enable triggers | |
alter table stream_events enable trigger no_update_stream_events; | |
alter table stream_events enable trigger no_delete_stream_events; | |
alter table events enable trigger no_delete_events; | |
alter table events enable trigger no_update_events; | |
$$; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
procedure usage
function usage