Skip to content

Instantly share code, notes, and snippets.

@overing
Created November 14, 2023 16:20
Show Gist options
  • Save overing/cbebde8f062630ec6fc0d3ff46f518e4 to your computer and use it in GitHub Desktop.
Save overing/cbebde8f062630ec6fc0d3ff46f518e4 to your computer and use it in GitHub Desktop.
展示用 ASP.NET Core 的 WebSocket 與 Orleans 製作簡易的聊天室功能, 當中包含使用 IGrainObserver 來達到由 Grain 主動呼叫前端的使用方式
using System;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Hosting;
using Orleans.Utilities;
var builder = WebApplication.CreateBuilder(args);
builder.Logging
.ClearProviders()
.AddConfiguration(builder.Configuration.GetSection("Logging"))
.AddSimpleConsole();
builder.Host
.UseOrleans(siloBuilder =>
{
siloBuilder.UseLocalhostClustering();
});
var app = builder.Build();
app.UseWebSockets(new WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromMinutes(3),
});
app.Use(async (HttpContext context, Func<Task> next) =>
{
if (context.WebSockets.IsWebSocketRequest)
{
var clusterClient = context.RequestServices.GetRequiredService<IClusterClient>();
var logger = context.RequestServices.GetRequiredService<ILogger<WebSocketSession>>();
using var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var session = new WebSocketSession(logger, webSocket, clusterClient);
await session.HandleReceiveAsync(context.RequestAborted);
}
else
context.Response.StatusCode = StatusCodes.Status400BadRequest;
});
await app.RunAsync(url: "http://*:8763/");
// HOW TO PLAY
// open the browser js console
// var socket = new WebSocket("ws://localhost:8763/"); [ENTER]
// socket.addEventListener("message", (e) => console.log(e.data)); [ENTER]
// socket.send(JSON.stringify({ "Action": "join", "Parameters": [ "overing" ] })); [ENTER]
// socket.send(JSON.stringify({ "Action": "send", "Parameters": [ "hello!" ] })); [ENTER]
// socket.send(JSON.stringify({ "Action": "leave" })); [ENTER]
public record class Command(string Action, params string[] Parameters);
public interface IChatReceiver : IGrainObserver
{
Task ReceiveMessageAsync(string message);
}
sealed class WebSocketSession : IChatReceiver
{
ILogger _logger;
WebSocket _webSocket;
IClusterClient _clusterClient;
string _name;
IChatReceiver _reference;
public WebSocketSession(ILogger<WebSocketSession> logger, WebSocket webSocket, IClusterClient clusterClient)
{
_logger = logger;
_webSocket = webSocket;
_clusterClient = clusterClient;
}
public Task ReceiveMessageAsync(string message)
{
_logger.LogInformation(message);
var data = Encoding.UTF8.GetBytes(message);
return _webSocket.SendAsync(data, WebSocketMessageType.Text, endOfMessage: true, cancellationToken: default);
}
public async ValueTask HandleReceiveAsync(CancellationToken cancellationToken)
{
using var stream = new MemoryStream();
var buffer = WebSocket.CreateServerBuffer(ushort.MaxValue);
var room = _clusterClient.GetGrain<IChatRoom>(1);
while (_webSocket.State == WebSocketState.Open)
{
WebSocketReceiveResult receive;
do
{
receive = await _webSocket.ReceiveAsync(buffer, cancellationToken);
if (receive.Count > 0)
stream.Write(buffer.Array, 0, receive.Count);
}
while (!receive.EndOfMessage);
if (stream.Length == 0)
continue;
stream.Position = 0;
var json = Encoding.UTF8.GetString(stream.ToArray());
stream.SetLength(0);
var command = JsonSerializer.Deserialize<Command>(json);
switch ((command.Action, command.Parameters))
{
case ("join", { Length: 1 }) when !string.IsNullOrWhiteSpace(command.Parameters[0]):
_reference = _clusterClient.CreateObjectReference<IChatReceiver>(this);
_name = command.Parameters[0];
await room.SubscribeAsync(_reference, _name);
break;
case ("send", { Length: 1 }) when !string.IsNullOrWhiteSpace(command.Parameters[0]):
await room.SendAsync(_name, command.Parameters[0]);
break;
case ("leave", null):
await room.UnsubscribeAsync(_reference, _name);
break;
}
}
if (_reference is IChatReceiver reference)
await room.UnsubscribeAsync(reference, _name);
}
}
public interface IChatRoom : IGrainWithIntegerKey
{
ValueTask SendAsync(string name, string message);
ValueTask SubscribeAsync(IChatReceiver observer, string name);
ValueTask UnsubscribeAsync(IChatReceiver observer, string name);
}
sealed class ChatRoom : Grain, IChatRoom
{
readonly ObserverManager<IChatReceiver> _subsManager;
public ChatRoom(ILogger<ChatRoom> logger)
=> _subsManager = new ObserverManager<IChatReceiver>(TimeSpan.FromMinutes(5), logger);
public ValueTask SendAsync(string name, string message)
{
_subsManager.Notify(s => s.ReceiveMessageAsync($"{name}: {message}"));
return ValueTask.CompletedTask;
}
public ValueTask SubscribeAsync(IChatReceiver observer, string name)
{
_subsManager.Subscribe(observer, observer);
_subsManager.Notify(s => s.ReceiveMessageAsync($"Welcome '{name}' to join the discussion!"));
return ValueTask.CompletedTask;
}
public ValueTask UnsubscribeAsync(IChatReceiver observer, string name)
{
_subsManager.Unsubscribe(observer);
_subsManager.Notify(s => s.ReceiveMessageAsync($"'{name}' to leave the discussion..."));
return ValueTask.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment