Created
November 14, 2023 16:20
-
-
Save overing/cbebde8f062630ec6fc0d3ff46f518e4 to your computer and use it in GitHub Desktop.
展示用 ASP.NET Core 的 WebSocket 與 Orleans 製作簡易的聊天室功能, 當中包含使用 IGrainObserver 來達到由 Grain 主動呼叫前端的使用方式
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Net.WebSockets; | |
using System.Text; | |
using System.Text.Json; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Http; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
using Orleans; | |
using Orleans.Hosting; | |
using Orleans.Utilities; | |
var builder = WebApplication.CreateBuilder(args); | |
builder.Logging | |
.ClearProviders() | |
.AddConfiguration(builder.Configuration.GetSection("Logging")) | |
.AddSimpleConsole(); | |
builder.Host | |
.UseOrleans(siloBuilder => | |
{ | |
siloBuilder.UseLocalhostClustering(); | |
}); | |
var app = builder.Build(); | |
app.UseWebSockets(new WebSocketOptions | |
{ | |
KeepAliveInterval = TimeSpan.FromMinutes(3), | |
}); | |
app.Use(async (HttpContext context, Func<Task> next) => | |
{ | |
if (context.WebSockets.IsWebSocketRequest) | |
{ | |
var clusterClient = context.RequestServices.GetRequiredService<IClusterClient>(); | |
var logger = context.RequestServices.GetRequiredService<ILogger<WebSocketSession>>(); | |
using var webSocket = await context.WebSockets.AcceptWebSocketAsync(); | |
var session = new WebSocketSession(logger, webSocket, clusterClient); | |
await session.HandleReceiveAsync(context.RequestAborted); | |
} | |
else | |
context.Response.StatusCode = StatusCodes.Status400BadRequest; | |
}); | |
await app.RunAsync(url: "http://*:8763/"); | |
// HOW TO PLAY | |
// open the browser js console | |
// var socket = new WebSocket("ws://localhost:8763/"); [ENTER] | |
// socket.addEventListener("message", (e) => console.log(e.data)); [ENTER] | |
// socket.send(JSON.stringify({ "Action": "join", "Parameters": [ "overing" ] })); [ENTER] | |
// socket.send(JSON.stringify({ "Action": "send", "Parameters": [ "hello!" ] })); [ENTER] | |
// socket.send(JSON.stringify({ "Action": "leave" })); [ENTER] | |
public record class Command(string Action, params string[] Parameters); | |
public interface IChatReceiver : IGrainObserver | |
{ | |
Task ReceiveMessageAsync(string message); | |
} | |
sealed class WebSocketSession : IChatReceiver | |
{ | |
ILogger _logger; | |
WebSocket _webSocket; | |
IClusterClient _clusterClient; | |
string _name; | |
IChatReceiver _reference; | |
public WebSocketSession(ILogger<WebSocketSession> logger, WebSocket webSocket, IClusterClient clusterClient) | |
{ | |
_logger = logger; | |
_webSocket = webSocket; | |
_clusterClient = clusterClient; | |
} | |
public Task ReceiveMessageAsync(string message) | |
{ | |
_logger.LogInformation(message); | |
var data = Encoding.UTF8.GetBytes(message); | |
return _webSocket.SendAsync(data, WebSocketMessageType.Text, endOfMessage: true, cancellationToken: default); | |
} | |
public async ValueTask HandleReceiveAsync(CancellationToken cancellationToken) | |
{ | |
using var stream = new MemoryStream(); | |
var buffer = WebSocket.CreateServerBuffer(ushort.MaxValue); | |
var room = _clusterClient.GetGrain<IChatRoom>(1); | |
while (_webSocket.State == WebSocketState.Open) | |
{ | |
WebSocketReceiveResult receive; | |
do | |
{ | |
receive = await _webSocket.ReceiveAsync(buffer, cancellationToken); | |
if (receive.Count > 0) | |
stream.Write(buffer.Array, 0, receive.Count); | |
} | |
while (!receive.EndOfMessage); | |
if (stream.Length == 0) | |
continue; | |
stream.Position = 0; | |
var json = Encoding.UTF8.GetString(stream.ToArray()); | |
stream.SetLength(0); | |
var command = JsonSerializer.Deserialize<Command>(json); | |
switch ((command.Action, command.Parameters)) | |
{ | |
case ("join", { Length: 1 }) when !string.IsNullOrWhiteSpace(command.Parameters[0]): | |
_reference = _clusterClient.CreateObjectReference<IChatReceiver>(this); | |
_name = command.Parameters[0]; | |
await room.SubscribeAsync(_reference, _name); | |
break; | |
case ("send", { Length: 1 }) when !string.IsNullOrWhiteSpace(command.Parameters[0]): | |
await room.SendAsync(_name, command.Parameters[0]); | |
break; | |
case ("leave", null): | |
await room.UnsubscribeAsync(_reference, _name); | |
break; | |
} | |
} | |
if (_reference is IChatReceiver reference) | |
await room.UnsubscribeAsync(reference, _name); | |
} | |
} | |
public interface IChatRoom : IGrainWithIntegerKey | |
{ | |
ValueTask SendAsync(string name, string message); | |
ValueTask SubscribeAsync(IChatReceiver observer, string name); | |
ValueTask UnsubscribeAsync(IChatReceiver observer, string name); | |
} | |
sealed class ChatRoom : Grain, IChatRoom | |
{ | |
readonly ObserverManager<IChatReceiver> _subsManager; | |
public ChatRoom(ILogger<ChatRoom> logger) | |
=> _subsManager = new ObserverManager<IChatReceiver>(TimeSpan.FromMinutes(5), logger); | |
public ValueTask SendAsync(string name, string message) | |
{ | |
_subsManager.Notify(s => s.ReceiveMessageAsync($"{name}: {message}")); | |
return ValueTask.CompletedTask; | |
} | |
public ValueTask SubscribeAsync(IChatReceiver observer, string name) | |
{ | |
_subsManager.Subscribe(observer, observer); | |
_subsManager.Notify(s => s.ReceiveMessageAsync($"Welcome '{name}' to join the discussion!")); | |
return ValueTask.CompletedTask; | |
} | |
public ValueTask UnsubscribeAsync(IChatReceiver observer, string name) | |
{ | |
_subsManager.Unsubscribe(observer); | |
_subsManager.Notify(s => s.ReceiveMessageAsync($"'{name}' to leave the discussion...")); | |
return ValueTask.CompletedTask; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment