Last active
May 26, 2023 05:07
-
-
Save chanks/c2e7e0efbd3d038775208047abb68524 to your computer and use it in GitHub Desktop.
Lua script to delete/trim all processed messages from a Redis stream - an updated version is maintained here: https://gist.github.com/FSX/fb86595c64751201497e2050aeb722e2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- The goal of this script is to trim messages that have been processed by | |
-- all extant groups from the a given Redis stream. It returns the number | |
-- of messages that were deleted from the stream, if any. I make no | |
-- guarantees about its performance, particularly if the stream is large | |
-- and not fully processed (so a simple XTRIM isn't possible). | |
-- First off, bail out early if the stream doesn't exist. | |
if redis.call("EXISTS", KEYS[1]) == 0 then | |
return false | |
end | |
-- To figure out what messages are deletable, we fetch the "last- | |
-- delivered-id" for each consumer group of the stream, and set the lowest | |
-- one of those ids as our upper bound. Next, we scan the pending lists | |
-- for each group, because we also don't want to delete any events that | |
-- are delivered but not acknowledged. The lowest unacknowledged id (if | |
-- any exists) then becomes our new upper bound. | |
-- "last-delivered-id" isn't mentioned in the Redis docs, for some reason, | |
-- but it's in there, as of 5.0.3. | |
-- In the (common?) case where there are no pending messages, and the | |
-- lowest last-delivered-id equals the most recent id on the stream, we | |
-- can just do a simpler (and much more efficient) XTRIM stream MAXLEN 0. | |
-- If we can't do that, we'll have to pull in all the message ids before | |
-- the lowest unacknowledged id, and XDEL them all. | |
-- First, use XINFO GROUPS to get all group names and the most recently | |
-- distributed ids. | |
local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1]) | |
local last_delivered_ids = {} | |
local groups = {} | |
if #xinfo_groups == 0 then | |
-- When there's no groups, there's nothing to delete. | |
return 0 | |
end | |
for _, group_info_array in ipairs(xinfo_groups) do | |
-- Redis passes us a flattened array of key, value pairs, so before | |
-- anything else, convert it into a proper hash-style table so that it's | |
-- easier to use. | |
local group_info = {} | |
for i = 1, #group_info_array, 2 do | |
group_info[group_info_array[i]] = group_info_array[i+1] | |
end | |
table.insert(groups, group_info["name"]) | |
table.insert(last_delivered_ids, group_info["last-delivered-id"]) | |
end | |
local lowest_pending_ids = {} | |
for _, group_name in ipairs(groups) do | |
local pending = redis.call("XPENDING", KEYS[1], group_name) | |
local lowest_id = pending[2] | |
if not lowest_id == false then | |
table.insert(lowest_pending_ids, lowest_id) | |
end | |
end | |
local function string_id_to_table(s) | |
local t = {} | |
for k, v in string.gmatch(s, "(%d+)-(%d+)") do | |
table.insert(t, tonumber(k)) | |
table.insert(t, tonumber(v)) | |
end | |
return t | |
end | |
-- Returns true if a < b, or if a == b (which is important later). | |
local function compare_ids(a, b) | |
local a_t = string_id_to_table(a) | |
local b_t = string_id_to_table(b) | |
return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2])) | |
end | |
table.sort(last_delivered_ids, compare_ids) | |
table.sort(lowest_pending_ids, compare_ids) | |
-- Here's our XTRIM optimization. | |
if #lowest_pending_ids == 0 then | |
local stream_info_array = redis.call("XINFO", "STREAM", KEYS[1]) | |
local stream_info = {} | |
for i = 1, #stream_info_array, 2 do | |
stream_info[stream_info_array[i]] = stream_info_array[i+1] | |
end | |
if last_delivered_ids[1] == stream_info["last-generated-id"] then | |
-- Yay! | |
return redis.call("XTRIM", KEYS[1], "MAXLEN", 0) | |
end | |
end | |
-- If we've gotten here, looks like we need to do a big XDEL, so find our | |
-- lower bound. | |
local lowest_id = last_delivered_ids[1] | |
-- We can include the lowest delivered id in the deletion, so long as it | |
-- isn't pending, which we'll check for next. | |
local protect_lowest_id = false | |
if #lowest_pending_ids > 0 then | |
-- We rely here on compare_ids returning true if the ids are equal. | |
if compare_ids(lowest_pending_ids[1], lowest_id) then | |
lowest_id = lowest_pending_ids[1] | |
protect_lowest_id = true | |
end | |
end | |
local messages = redis.call("XRANGE", KEYS[1], "-", lowest_id) | |
if #messages == 0 then | |
-- Nothing to delete. | |
return 0 | |
end | |
local delete_command = {"XDEL", KEYS[1]} | |
for _,t in pairs(messages) do | |
local id = t[1] | |
if (lowest_id ~= id) or (not protect_lowest_id) then | |
table.insert(delete_command, id) | |
end | |
end | |
return redis.call(unpack(delete_command)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For reference, since redis 6.2 XTRIM introduces new strategy
MIND
which could be used to optimize this script instead ofMAXLEN
used currentlyP.s. actually you can also replace
XDEL
with it