Skip to content

Instantly share code, notes, and snippets.

@trbngr
Last active March 16, 2023 11:40
Show Gist options
  • Save trbngr/a31ada5cc961d4c940dc5a647a0f9571 to your computer and use it in GitHub Desktop.
Save trbngr/a31ada5cc961d4c940dc5a647a0f9571 to your computer and use it in GitHub Desktop.
Delete events of a certain type from a stream
create or replace function delete_stream_events_of_type(target_stream_id bigint, target_event_type text) returns integer
language sql
as
$$
-- disable triggers
alter table stream_events disable trigger no_update_stream_events;
alter table stream_events disable trigger no_delete_stream_events;
alter table events disable trigger no_delete_events;
alter table events disable trigger no_update_events;
-- delete events
with target_events as (select e.event_id
from events e
inner join stream_events se on e.event_id = se.event_id
inner join streams s on s.stream_id = se.original_stream_id
where s.stream_id = target_stream_id
and e.event_type = target_event_type),
deleted_stream_events as (
delete from stream_events
where event_id in (select event_id from target_events)
returning event_id),
linked_events as (
delete from stream_events
where event_id in (select event_id from deleted_stream_events))
delete
from events
where event_id in (select event_id from deleted_stream_events);
-- drop the unique index so we can reset stream_version columns
select 'drop index ix_stream_events';
update stream_events se
set stream_version = row_num,
original_stream_version = row_num
from (select event_id, row_number() over (order by stream_version) as row_num
from stream_events
where stream_id = target_stream_id) x
where x.event_id = se.event_id
and se.stream_id = target_stream_id;
-- re-create the unique index now that we're done
select 'create unique index if not exists ix_stream_events on stream_events (stream_id, stream_version)';
-- update the stream version
update streams
set stream_version = (select max(stream_version) from stream_events where stream_id = target_stream_id)
where stream_id = target_stream_id;
-- re-enable triggers
alter table stream_events enable trigger no_update_stream_events;
alter table stream_events enable trigger no_delete_stream_events;
alter table events enable trigger no_delete_events;
alter table events enable trigger no_update_events;
select 1;
$$;
create or replace procedure delete_stream_events_of_type(in target_stream_id bigint, in target_event_type text)
language sql
as $$
-- disable triggers
alter table stream_events disable trigger no_update_stream_events;
alter table stream_events disable trigger no_delete_stream_events;
alter table events disable trigger no_delete_events;
alter table events disable trigger no_update_events;
-- delete events
with target_events as (select e.event_id
from events e
inner join stream_events se on e.event_id = se.event_id
inner join streams s on s.stream_id = se.original_stream_id
where s.stream_id = target_stream_id
and e.event_type = target_event_type),
deleted_stream_events as (
delete from stream_events
where event_id in (select event_id from target_events)
returning event_id),
linked_events as (
delete from stream_events
where event_id in (select event_id from deleted_stream_events))
delete
from events
where event_id in (select event_id from deleted_stream_events);
-- drop the unique index so we can reset stream_version columns
select 'drop index ix_stream_events';
update stream_events se
set stream_version = row_num,
original_stream_version = row_num
from (select event_id, row_number() over (order by stream_version) as row_num
from stream_events
where stream_id = target_stream_id) x
where x.event_id = se.event_id
and se.stream_id = target_stream_id;
-- re-create the unique index now that we're done
select 'create unique index if not exists ix_stream_events on stream_events (stream_id, stream_version)';
-- update the stream version
update streams
set stream_version = (select max(stream_version) from stream_events where stream_id = target_stream_id)
where stream_id = target_stream_id;
-- re-enable triggers
alter table stream_events enable trigger no_update_stream_events;
alter table stream_events enable trigger no_delete_stream_events;
alter table events enable trigger no_delete_events;
alter table events enable trigger no_update_events;
$$;
@trbngr
Copy link
Author

trbngr commented Nov 30, 2022

procedure usage

 call delete_stream_events_of_type(target_stream_id := 41070, target_event_type := 'Elixir.MyApp.MyEvent');

function usage

 select delete_stream_events_of_type(target_stream_id := 41070, target_event_type := 'Elixir.MyApp.MyEvent');

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment