Skip to content

Instantly share code, notes, and snippets.

@jakzal
Last active December 12, 2024 11:55
Show Gist options
  • Save jakzal/a54f0759e3adfef862a83bd279934a37 to your computer and use it in GitHub Desktop.
Save jakzal/a54f0759e3adfef862a83bd279934a37 to your computer and use it in GitHub Desktop.
Learn how to use LISTEN/NOTIFY in PostgreSQL
using KaffeineLabs.MQ.Outbox;
using KaffeineLabs.Testcontainers.PostgreSql;
using FluentAssertions;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using Xunit.Abstractions;
namespace KaffeineLabs.MQ.Tests.Outbox;
public class LearnPostgresListenNotifyTest(PostgreSqlFixture postgreSql, ITestOutputHelper testOutputHelper)
: IClassFixture<PostgreSqlFixture>
{
private readonly TestDbContext _dbContext = postgreSql.TestDbContext();
[Fact]
public async Task ItSendsOutboxNotification()
{
var repository = CreateDatabaseOutboxMessageRepository(_dbContext);
var notifications = new List<string>();
await using var connection = new NpgsqlConnection(_dbContext.Database.GetConnectionString());
await connection.OpenAsync();
connection.Notification += (o, e) =>
{
testOutputHelper.WriteLine($"Notification for {e.Channel}: {e.Payload}");
notifications.Add(e.Channel);
testOutputHelper.WriteLine($"Notifications for {e.Channel}: {notifications.Count}");
};
async Task Listener(NpgsqlConnection npgsqlConnection)
{
testOutputHelper.WriteLine($"Listening to notifications");
await using var listenCommand = new NpgsqlCommand("LISTEN outbox;", npgsqlConnection);
await listenCommand.ExecuteNonQueryAsync();
}
var listener = Listener(connection);
await repository.SaveAsync(new OutboxMessage
{
Message = "Hello World",
Type = "String"
});
await connection.WaitAsync(100);
await listener;
notifications.Count.Should().Be(1);
notifications.Should().BeEquivalentTo(["outbox"]);
}
private DatabaseOutboxMessageRepository<TestDbContext> CreateDatabaseOutboxMessageRepository(
TestDbContext dbContext)
{
_dbContext.Database.ExecuteSqlRaw("""
CREATE OR REPLACE FUNCTION outbox_messages_notify()
RETURNS TRIGGER AS $trigger$
DECLARE
payload TEXT;
channel_name TEXT;
BEGIN
IF TG_ARGV[0] IS NULL THEN
RAISE EXCEPTION 'A channel name is required as the first argument';
END IF;
channel_name := TG_ARGV[0];
payload := json_build_object('timestamp', CURRENT_TIMESTAMP, 'payload', row_to_json(NEW));
PERFORM pg_notify(channel_name, payload);
RETURN NEW;
END;
$trigger$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER outbox_messages_notify_trigger
AFTER INSERT ON "OutboxMessages"
FOR EACH ROW EXECUTE PROCEDURE outbox_messages_notify('outbox');
""");
return new DatabaseOutboxMessageRepository<TestDbContext>(dbContext);
}
}
using Testcontainers.PostgreSql;
using Xunit;
namespace KaffeineLabs.Testcontainers.PostgreSql;
public class PostgreSqlFixture : IAsyncLifetime
{
private readonly PostgreSqlContainer _postgreSql = new PostgreSqlBuilder().Build();
public string ConnectionString => _postgreSql.GetConnectionString();
public Task InitializeAsync()
{
return _postgreSql.StartAsync();
}
public Task DisposeAsync()
{
return _postgreSql.DisposeAsync().AsTask();
}
}
using KaffeineLabs.Testcontainers.PostgreSql;
using Microsoft.EntityFrameworkCore;
namespace KaffeineLabs.MQ.Tests.Outbox;
internal static class PostgreSqlFixtureExtensions
{
public static TestDbContext TestDbContext(this PostgreSqlFixture postgreSql)
{
var context = new TestDbContext(postgreSql.DbContextOptions<TestDbContext>());
context.Database.EnsureCreated();
context.SaveChanges();
return context;
}
private static DbContextOptions<TC> DbContextOptions<TC>(this PostgreSqlFixture postgreSql) where TC : DbContext =>
new DbContextOptionsBuilder<TC>()
.UseNpgsql(postgreSql.ConnectionString)
.Options;
}
using KaffeineLabs.EntityFrameworkCore.Timestamps;
using KaffeineLabs.MQ.Outbox;
using Microsoft.EntityFrameworkCore;
namespace KaffeineLabs.MQ.Tests.Outbox;
internal class TestDbContext(DbContextOptions<TestDbContext> options) : DbContext(options), IOutboxMessageDbContext
{
public DbSet<OutboxMessage> OutboxMessages { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.OnOutboxMessageCreating();
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.AddInterceptors(new EntityDateInterceptor());
}
}
using Microsoft.EntityFrameworkCore;
namespace KaffeineLabs.MQ.Outbox;
public class DatabaseOutboxMessageRepository<TC>(TC context) : IOutboxMessageRepository
where TC : DbContext, IOutboxMessageDbContext
{
public async Task SaveAsync(OutboxMessage message)
{
context.Update(message);
await context.SaveChangesAsync();
}
public async Task<List<OutboxMessage>> FindOldest(int count = 10)
{
return await context.OutboxMessages
.OrderBy(o => o.CreatedOn)
.Where(o => o.Published == false)
.Take(count)
.ToListAsync();
}
}
namespace KaffeineLabs.MQ.Outbox;
public class OutboxMessage
{
public int Id { get; set; }
public required string Type { get; init; }
public required string Message { get; init; }
public bool Published { get; set; }
public DateTimeOffset CreatedOn { get; set; }
public DateTimeOffset UpdatedOn { get; set; }
}
using KaffeineLabs.EntityFrameworkCore.Timestamps;
using Microsoft.EntityFrameworkCore;
namespace KaffeineLabs.MQ.Outbox;
public static class ModelBuilderExtensions
{
public static ModelBuilder OnOutboxMessageCreating(this ModelBuilder modelBuilder)
{
modelBuilder.Entity<OutboxMessage>(om =>
{
om.Property(m => m.CreatedOn).HasTimestampDefault();
om.HasIndex(m => m.CreatedOn);
om.HasIndex(m => m.Published);
}
);
return modelBuilder;
}
}
using Microsoft.EntityFrameworkCore;
namespace KaffeineLabs.MQ.Outbox;
public interface IOutboxMessageDbContext
{
DbSet<OutboxMessage> OutboxMessages { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment