Skip to content

Instantly share code, notes, and snippets.

@Nk185
Last active April 10, 2020 16:17
Show Gist options
  • Save Nk185/1ca82463bb1709728a23b4aee6a50180 to your computer and use it in GitHub Desktop.
Save Nk185/1ca82463bb1709728a23b4aee6a50180 to your computer and use it in GitHub Desktop.
Akka.Net actors freezing
namespace AkkaNetRepro
{
public class CustomRouter<TActor> : ReceiveActor where TActor : ActorBase
{
private readonly uint _nrOfInstance;
private IActorRef[] _routees;
private uint _invocationNr = 0;
public CustomRouter(uint nrOfInstance)
{
_nrOfInstance = nrOfInstance;
Receive<GetRoutees>(routees =>
{
var actorRefRoutees = _routees.Select(@ref => new ActorRefRoutee(@ref));
var resp = new Routees(actorRefRoutees);
Sender.Tell(resp);
});
ReceiveAny(HandleAny);
}
protected override void PreStart()
{
_routees = new IActorRef[_nrOfInstance];
var props = Props.Create<TActor>();
for (var i = 0; i < _nrOfInstance; i++)
{
var actorRef = Context.ActorOf(props);
_routees[i] = actorRef;
}
}
protected override void PreRestart(Exception reason, object message)
{
// avoid killing children
}
protected override void PostRestart(Exception reason)
{
_routees = Context.GetChildren().ToArray();
// avoid calling PreStart
}
private void HandleAny(object msg)
{
_routees[_invocationNr % _nrOfInstance].Forward(msg);
unchecked
{
_invocationNr++;
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(Decider.From(Directive.Escalate));
}
}
public class SimpleActor : ReceiveActor
{
public SimpleActor()
{
Receive<int>(HandleInt);
}
private void HandleInt(int obj)
{
if (obj == 1)
{
throw new InvalidOperationException($"I'm dead. {Self}");
}
Console.WriteLine($" Received msg: {Self}");
}
}
public class Program
{
static async Task Main(string[] args)
{
var actorSystem = ActorSystem.Create("Test");
var poolProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10));
var poolActorRef = actorSystem.ActorOf(poolProps, "pool-exceptions-freeze-test");
var poolRestartProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null,
new OneForOneStrategy(Decider.From(Directive.Restart)), Dispatchers.DefaultDispatcherId));
var poolRestartActorRef = actorSystem.ActorOf(poolRestartProps, "pool-ss_restart-exceptions-freeze-test");
var poolResumeProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null,
new OneForOneStrategy(Decider.From(Directive.Resume)), Dispatchers.DefaultDispatcherId));
var poolResumeActorRef = actorSystem.ActorOf(poolResumeProps, "pool-ss_resume-exceptions-freeze-test");
var poolEscalateProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null,
new OneForOneStrategy(Decider.From(Directive.Escalate)), Dispatchers.DefaultDispatcherId));
var poolEscalateActorRef = actorSystem.ActorOf(poolEscalateProps, "pool-ss_escalate-exceptions-freeze-test");
var customPoolProps = Props.Create<CustomRouter<SimpleActor>>(10u);
var customPoolActorRef = actorSystem.ActorOf(customPoolProps, "custom-pool-exceptions-freeze-test");
var singleProps = Props.Create<SimpleActor>();
var singleActorRef = actorSystem.ActorOf(singleProps, "exceptions-freeze-test");
var petabridgeCmd = PetabridgeCmd.Get(actorSystem);
petabridgeCmd.Start();
await Task.Delay(100); // small delay to give time for setup
// has errors when multiple escalations. some of routees stop working
Console.WriteLine("Starting pool tests");
await TestPool(poolActorRef);
// works fine
Console.WriteLine("\nStarting pool with Restart directive tests");
await TestPool(poolRestartActorRef);
// works fine
Console.WriteLine("\nStarting pool with Resume directive tests");
await TestPool(poolResumeActorRef);
// has errors when multiple escalations. some of routees stop working
Console.WriteLine("\nStarting pool with Escalate directive tests");
await TestPool(poolEscalateActorRef);
// has errors when multiple escalations. some of routees stop working
Console.WriteLine("\nStarting custom pool tests");
await TestPool(customPoolActorRef);
// works fine
Console.WriteLine("\nStarting single tests");
await TestSingle(singleActorRef);
var taskCompletionSource = new TaskCompletionSource<int>();
Console.CancelKeyPress += (sender, eventArgs) => taskCompletionSource.SetResult(1);
await taskCompletionSource.Task;
}
private static async Task TestSingle(IActorRef singleActorRef)
{
Console.WriteLine("Ensuring actor is fine: sending one msg");
singleActorRef.Tell(2);
await Task.Delay(TimeSpan.FromSeconds(1)); // small delay to give time to reach convergence
Console.WriteLine("Hit 'Enter' to start warm-up with exception msgs");
Console.ReadLine();
for (int i = 0; i < 500; i++)
{
singleActorRef.Tell(1);
}
await Task.Delay(TimeSpan.FromSeconds(10)); // small delay to give time to reach convergence
Console.WriteLine("Finished sending exception warm-up msgs");
Console.WriteLine("Hit 'Enter' to start pinging");
Console.ReadLine();
var messagesToSend = 5u;
for (int i = 0; i < messagesToSend; i++)
{
Console.WriteLine($"Sending {i + 1} of {messagesToSend} msgs");
singleActorRef.Tell(2);
await Task.Delay(TimeSpan.FromSeconds(1));
}
Console.WriteLine("Finished pinging");
}
private static async Task TestPool(IActorRef poolActorRef)
{
Console.WriteLine("Hit 'Enter' to start warm-up with exception msgs");
Console.ReadLine();
var routees = (await poolActorRef.Ask<Routees>(GetRoutees.Instance))
.Members
.OfType<ActorRefRoutee>();
PrintRoutees(routees);
for (int i = 0; i < 500; i++)
{
poolActorRef.Tell(1);
}
await Task.Delay(TimeSpan.FromSeconds(10)); // small delay to give time to reach convergence
Console.WriteLine("Finished sending exception warm-up msgs");
Console.WriteLine("Hit 'Enter' to start pinging");
Console.ReadLine();
routees = (await poolActorRef.Ask<Routees>(GetRoutees.Instance))
.Members
.OfType<ActorRefRoutee>();
PrintRoutees(routees);
var isSuspendedMethodInfo = typeof(Mailbox).GetMethod("IsSuspended",
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.InvokeMethod);
foreach (var actorRefRoutee in routees)
{
var actorCell = ((LocalActorRef)actorRefRoutee.Actor).Cell;
var result = (bool) isSuspendedMethodInfo.Invoke(actorCell.Mailbox, null);
if (result)
{
Console.WriteLine("Sending two msgs but not output expected: mailbox status had SuspendMask");
}
else
{
Console.WriteLine("Sending two msgs. Output expected");
}
poolActorRef.Tell(2);
actorRefRoutee.Send(2, ActorRefs.NoSender);
await Task.Delay(TimeSpan.FromSeconds(1));
}
Console.WriteLine("Finished pinging");
}
public static void PrintRoutees(IEnumerable<ActorRefRoutee> routees)
{
foreach (var routeesMember in routees)
{
Console.WriteLine(routeesMember.Actor);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment