Skip to content

Instantly share code, notes, and snippets.

@chanks
Last active May 26, 2023 05:07
Show Gist options
  • Save chanks/c2e7e0efbd3d038775208047abb68524 to your computer and use it in GitHub Desktop.
Save chanks/c2e7e0efbd3d038775208047abb68524 to your computer and use it in GitHub Desktop.
Lua script to delete/trim all processed messages from a Redis stream - an updated version is maintained here: https://gist.github.com/FSX/fb86595c64751201497e2050aeb722e2
-- The goal of this script is to trim messages that have been processed by
-- all extant groups from the a given Redis stream. It returns the number
-- of messages that were deleted from the stream, if any. I make no
-- guarantees about its performance, particularly if the stream is large
-- and not fully processed (so a simple XTRIM isn't possible).
-- First off, bail out early if the stream doesn't exist.
if redis.call("EXISTS", KEYS[1]) == 0 then
return false
end
-- To figure out what messages are deletable, we fetch the "last-
-- delivered-id" for each consumer group of the stream, and set the lowest
-- one of those ids as our upper bound. Next, we scan the pending lists
-- for each group, because we also don't want to delete any events that
-- are delivered but not acknowledged. The lowest unacknowledged id (if
-- any exists) then becomes our new upper bound.
-- "last-delivered-id" isn't mentioned in the Redis docs, for some reason,
-- but it's in there, as of 5.0.3.
-- In the (common?) case where there are no pending messages, and the
-- lowest last-delivered-id equals the most recent id on the stream, we
-- can just do a simpler (and much more efficient) XTRIM stream MAXLEN 0.
-- If we can't do that, we'll have to pull in all the message ids before
-- the lowest unacknowledged id, and XDEL them all.
-- First, use XINFO GROUPS to get all group names and the most recently
-- distributed ids.
local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1])
local last_delivered_ids = {}
local groups = {}
if #xinfo_groups == 0 then
-- When there's no groups, there's nothing to delete.
return 0
end
for _, group_info_array in ipairs(xinfo_groups) do
-- Redis passes us a flattened array of key, value pairs, so before
-- anything else, convert it into a proper hash-style table so that it's
-- easier to use.
local group_info = {}
for i = 1, #group_info_array, 2 do
group_info[group_info_array[i]] = group_info_array[i+1]
end
table.insert(groups, group_info["name"])
table.insert(last_delivered_ids, group_info["last-delivered-id"])
end
local lowest_pending_ids = {}
for _, group_name in ipairs(groups) do
local pending = redis.call("XPENDING", KEYS[1], group_name)
local lowest_id = pending[2]
if not lowest_id == false then
table.insert(lowest_pending_ids, lowest_id)
end
end
local function string_id_to_table(s)
local t = {}
for k, v in string.gmatch(s, "(%d+)-(%d+)") do
table.insert(t, tonumber(k))
table.insert(t, tonumber(v))
end
return t
end
-- Returns true if a < b, or if a == b (which is important later).
local function compare_ids(a, b)
local a_t = string_id_to_table(a)
local b_t = string_id_to_table(b)
return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2]))
end
table.sort(last_delivered_ids, compare_ids)
table.sort(lowest_pending_ids, compare_ids)
-- Here's our XTRIM optimization.
if #lowest_pending_ids == 0 then
local stream_info_array = redis.call("XINFO", "STREAM", KEYS[1])
local stream_info = {}
for i = 1, #stream_info_array, 2 do
stream_info[stream_info_array[i]] = stream_info_array[i+1]
end
if last_delivered_ids[1] == stream_info["last-generated-id"] then
-- Yay!
return redis.call("XTRIM", KEYS[1], "MAXLEN", 0)
end
end
-- If we've gotten here, looks like we need to do a big XDEL, so find our
-- lower bound.
local lowest_id = last_delivered_ids[1]
-- We can include the lowest delivered id in the deletion, so long as it
-- isn't pending, which we'll check for next.
local protect_lowest_id = false
if #lowest_pending_ids > 0 then
-- We rely here on compare_ids returning true if the ids are equal.
if compare_ids(lowest_pending_ids[1], lowest_id) then
lowest_id = lowest_pending_ids[1]
protect_lowest_id = true
end
end
local messages = redis.call("XRANGE", KEYS[1], "-", lowest_id)
if #messages == 0 then
-- Nothing to delete.
return 0
end
local delete_command = {"XDEL", KEYS[1]}
for _,t in pairs(messages) do
local id = t[1]
if (lowest_id ~= id) or (not protect_lowest_id) then
table.insert(delete_command, id)
end
end
return redis.call(unpack(delete_command))
@DoumanAsh
Copy link

DoumanAsh commented May 26, 2023

For reference, since redis 6.2 XTRIM introduces new strategy MIND which could be used to optimize this script instead of MAXLEN used currently

P.s. actually you can also replace XDEL with it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment