Last active
December 12, 2024 11:55
-
-
Save jakzal/a54f0759e3adfef862a83bd279934a37 to your computer and use it in GitHub Desktop.
Learn how to use LISTEN/NOTIFY in PostgreSQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using KaffeineLabs.MQ.Outbox; | |
using KaffeineLabs.Testcontainers.PostgreSql; | |
using FluentAssertions; | |
using Microsoft.EntityFrameworkCore; | |
using Npgsql; | |
using Xunit.Abstractions; | |
namespace KaffeineLabs.MQ.Tests.Outbox; | |
public class LearnPostgresListenNotifyTest(PostgreSqlFixture postgreSql, ITestOutputHelper testOutputHelper) | |
: IClassFixture<PostgreSqlFixture> | |
{ | |
private readonly TestDbContext _dbContext = postgreSql.TestDbContext(); | |
[Fact] | |
public async Task ItSendsOutboxNotification() | |
{ | |
var repository = CreateDatabaseOutboxMessageRepository(_dbContext); | |
var notifications = new List<string>(); | |
await using var connection = new NpgsqlConnection(_dbContext.Database.GetConnectionString()); | |
await connection.OpenAsync(); | |
connection.Notification += (o, e) => | |
{ | |
testOutputHelper.WriteLine($"Notification for {e.Channel}: {e.Payload}"); | |
notifications.Add(e.Channel); | |
testOutputHelper.WriteLine($"Notifications for {e.Channel}: {notifications.Count}"); | |
}; | |
async Task Listener(NpgsqlConnection npgsqlConnection) | |
{ | |
testOutputHelper.WriteLine($"Listening to notifications"); | |
await using var listenCommand = new NpgsqlCommand("LISTEN outbox;", npgsqlConnection); | |
await listenCommand.ExecuteNonQueryAsync(); | |
} | |
var listener = Listener(connection); | |
await repository.SaveAsync(new OutboxMessage | |
{ | |
Message = "Hello World", | |
Type = "String" | |
}); | |
await connection.WaitAsync(100); | |
await listener; | |
notifications.Count.Should().Be(1); | |
notifications.Should().BeEquivalentTo(["outbox"]); | |
} | |
private DatabaseOutboxMessageRepository<TestDbContext> CreateDatabaseOutboxMessageRepository( | |
TestDbContext dbContext) | |
{ | |
_dbContext.Database.ExecuteSqlRaw(""" | |
CREATE OR REPLACE FUNCTION outbox_messages_notify() | |
RETURNS TRIGGER AS $trigger$ | |
DECLARE | |
payload TEXT; | |
channel_name TEXT; | |
BEGIN | |
IF TG_ARGV[0] IS NULL THEN | |
RAISE EXCEPTION 'A channel name is required as the first argument'; | |
END IF; | |
channel_name := TG_ARGV[0]; | |
payload := json_build_object('timestamp', CURRENT_TIMESTAMP, 'payload', row_to_json(NEW)); | |
PERFORM pg_notify(channel_name, payload); | |
RETURN NEW; | |
END; | |
$trigger$ LANGUAGE plpgsql; | |
CREATE OR REPLACE TRIGGER outbox_messages_notify_trigger | |
AFTER INSERT ON "OutboxMessages" | |
FOR EACH ROW EXECUTE PROCEDURE outbox_messages_notify('outbox'); | |
"""); | |
return new DatabaseOutboxMessageRepository<TestDbContext>(dbContext); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Testcontainers.PostgreSql; | |
using Xunit; | |
namespace KaffeineLabs.Testcontainers.PostgreSql; | |
public class PostgreSqlFixture : IAsyncLifetime | |
{ | |
private readonly PostgreSqlContainer _postgreSql = new PostgreSqlBuilder().Build(); | |
public string ConnectionString => _postgreSql.GetConnectionString(); | |
public Task InitializeAsync() | |
{ | |
return _postgreSql.StartAsync(); | |
} | |
public Task DisposeAsync() | |
{ | |
return _postgreSql.DisposeAsync().AsTask(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using KaffeineLabs.Testcontainers.PostgreSql; | |
using Microsoft.EntityFrameworkCore; | |
namespace KaffeineLabs.MQ.Tests.Outbox; | |
internal static class PostgreSqlFixtureExtensions | |
{ | |
public static TestDbContext TestDbContext(this PostgreSqlFixture postgreSql) | |
{ | |
var context = new TestDbContext(postgreSql.DbContextOptions<TestDbContext>()); | |
context.Database.EnsureCreated(); | |
context.SaveChanges(); | |
return context; | |
} | |
private static DbContextOptions<TC> DbContextOptions<TC>(this PostgreSqlFixture postgreSql) where TC : DbContext => | |
new DbContextOptionsBuilder<TC>() | |
.UseNpgsql(postgreSql.ConnectionString) | |
.Options; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using KaffeineLabs.EntityFrameworkCore.Timestamps; | |
using KaffeineLabs.MQ.Outbox; | |
using Microsoft.EntityFrameworkCore; | |
namespace KaffeineLabs.MQ.Tests.Outbox; | |
internal class TestDbContext(DbContextOptions<TestDbContext> options) : DbContext(options), IOutboxMessageDbContext | |
{ | |
public DbSet<OutboxMessage> OutboxMessages { get; set; } | |
protected override void OnModelCreating(ModelBuilder modelBuilder) | |
{ | |
modelBuilder.OnOutboxMessageCreating(); | |
} | |
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) | |
{ | |
optionsBuilder.AddInterceptors(new EntityDateInterceptor()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.EntityFrameworkCore; | |
namespace KaffeineLabs.MQ.Outbox; | |
public class DatabaseOutboxMessageRepository<TC>(TC context) : IOutboxMessageRepository | |
where TC : DbContext, IOutboxMessageDbContext | |
{ | |
public async Task SaveAsync(OutboxMessage message) | |
{ | |
context.Update(message); | |
await context.SaveChangesAsync(); | |
} | |
public async Task<List<OutboxMessage>> FindOldest(int count = 10) | |
{ | |
return await context.OutboxMessages | |
.OrderBy(o => o.CreatedOn) | |
.Where(o => o.Published == false) | |
.Take(count) | |
.ToListAsync(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace KaffeineLabs.MQ.Outbox; | |
public class OutboxMessage | |
{ | |
public int Id { get; set; } | |
public required string Type { get; init; } | |
public required string Message { get; init; } | |
public bool Published { get; set; } | |
public DateTimeOffset CreatedOn { get; set; } | |
public DateTimeOffset UpdatedOn { get; set; } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using KaffeineLabs.EntityFrameworkCore.Timestamps; | |
using Microsoft.EntityFrameworkCore; | |
namespace KaffeineLabs.MQ.Outbox; | |
public static class ModelBuilderExtensions | |
{ | |
public static ModelBuilder OnOutboxMessageCreating(this ModelBuilder modelBuilder) | |
{ | |
modelBuilder.Entity<OutboxMessage>(om => | |
{ | |
om.Property(m => m.CreatedOn).HasTimestampDefault(); | |
om.HasIndex(m => m.CreatedOn); | |
om.HasIndex(m => m.Published); | |
} | |
); | |
return modelBuilder; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.EntityFrameworkCore; | |
namespace KaffeineLabs.MQ.Outbox; | |
public interface IOutboxMessageDbContext | |
{ | |
DbSet<OutboxMessage> OutboxMessages { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment