Last active
April 10, 2020 16:17
-
-
Save Nk185/1ca82463bb1709728a23b4aee6a50180 to your computer and use it in GitHub Desktop.
Akka.Net actors freezing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AkkaNetRepro | |
{ | |
public class CustomRouter<TActor> : ReceiveActor where TActor : ActorBase | |
{ | |
private readonly uint _nrOfInstance; | |
private IActorRef[] _routees; | |
private uint _invocationNr = 0; | |
public CustomRouter(uint nrOfInstance) | |
{ | |
_nrOfInstance = nrOfInstance; | |
Receive<GetRoutees>(routees => | |
{ | |
var actorRefRoutees = _routees.Select(@ref => new ActorRefRoutee(@ref)); | |
var resp = new Routees(actorRefRoutees); | |
Sender.Tell(resp); | |
}); | |
ReceiveAny(HandleAny); | |
} | |
protected override void PreStart() | |
{ | |
_routees = new IActorRef[_nrOfInstance]; | |
var props = Props.Create<TActor>(); | |
for (var i = 0; i < _nrOfInstance; i++) | |
{ | |
var actorRef = Context.ActorOf(props); | |
_routees[i] = actorRef; | |
} | |
} | |
protected override void PreRestart(Exception reason, object message) | |
{ | |
// avoid killing children | |
} | |
protected override void PostRestart(Exception reason) | |
{ | |
_routees = Context.GetChildren().ToArray(); | |
// avoid calling PreStart | |
} | |
private void HandleAny(object msg) | |
{ | |
_routees[_invocationNr % _nrOfInstance].Forward(msg); | |
unchecked | |
{ | |
_invocationNr++; | |
} | |
} | |
protected override SupervisorStrategy SupervisorStrategy() | |
{ | |
return new OneForOneStrategy(Decider.From(Directive.Escalate)); | |
} | |
} | |
public class SimpleActor : ReceiveActor | |
{ | |
public SimpleActor() | |
{ | |
Receive<int>(HandleInt); | |
} | |
private void HandleInt(int obj) | |
{ | |
if (obj == 1) | |
{ | |
throw new InvalidOperationException($"I'm dead. {Self}"); | |
} | |
Console.WriteLine($" Received msg: {Self}"); | |
} | |
} | |
public class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
var actorSystem = ActorSystem.Create("Test"); | |
var poolProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10)); | |
var poolActorRef = actorSystem.ActorOf(poolProps, "pool-exceptions-freeze-test"); | |
var poolRestartProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null, | |
new OneForOneStrategy(Decider.From(Directive.Restart)), Dispatchers.DefaultDispatcherId)); | |
var poolRestartActorRef = actorSystem.ActorOf(poolRestartProps, "pool-ss_restart-exceptions-freeze-test"); | |
var poolResumeProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null, | |
new OneForOneStrategy(Decider.From(Directive.Resume)), Dispatchers.DefaultDispatcherId)); | |
var poolResumeActorRef = actorSystem.ActorOf(poolResumeProps, "pool-ss_resume-exceptions-freeze-test"); | |
var poolEscalateProps = Props.Create<SimpleActor>().WithRouter(new RoundRobinPool(10, null, | |
new OneForOneStrategy(Decider.From(Directive.Escalate)), Dispatchers.DefaultDispatcherId)); | |
var poolEscalateActorRef = actorSystem.ActorOf(poolEscalateProps, "pool-ss_escalate-exceptions-freeze-test"); | |
var customPoolProps = Props.Create<CustomRouter<SimpleActor>>(10u); | |
var customPoolActorRef = actorSystem.ActorOf(customPoolProps, "custom-pool-exceptions-freeze-test"); | |
var singleProps = Props.Create<SimpleActor>(); | |
var singleActorRef = actorSystem.ActorOf(singleProps, "exceptions-freeze-test"); | |
var petabridgeCmd = PetabridgeCmd.Get(actorSystem); | |
petabridgeCmd.Start(); | |
await Task.Delay(100); // small delay to give time for setup | |
// has errors when multiple escalations. some of routees stop working | |
Console.WriteLine("Starting pool tests"); | |
await TestPool(poolActorRef); | |
// works fine | |
Console.WriteLine("\nStarting pool with Restart directive tests"); | |
await TestPool(poolRestartActorRef); | |
// works fine | |
Console.WriteLine("\nStarting pool with Resume directive tests"); | |
await TestPool(poolResumeActorRef); | |
// has errors when multiple escalations. some of routees stop working | |
Console.WriteLine("\nStarting pool with Escalate directive tests"); | |
await TestPool(poolEscalateActorRef); | |
// has errors when multiple escalations. some of routees stop working | |
Console.WriteLine("\nStarting custom pool tests"); | |
await TestPool(customPoolActorRef); | |
// works fine | |
Console.WriteLine("\nStarting single tests"); | |
await TestSingle(singleActorRef); | |
var taskCompletionSource = new TaskCompletionSource<int>(); | |
Console.CancelKeyPress += (sender, eventArgs) => taskCompletionSource.SetResult(1); | |
await taskCompletionSource.Task; | |
} | |
private static async Task TestSingle(IActorRef singleActorRef) | |
{ | |
Console.WriteLine("Ensuring actor is fine: sending one msg"); | |
singleActorRef.Tell(2); | |
await Task.Delay(TimeSpan.FromSeconds(1)); // small delay to give time to reach convergence | |
Console.WriteLine("Hit 'Enter' to start warm-up with exception msgs"); | |
Console.ReadLine(); | |
for (int i = 0; i < 500; i++) | |
{ | |
singleActorRef.Tell(1); | |
} | |
await Task.Delay(TimeSpan.FromSeconds(10)); // small delay to give time to reach convergence | |
Console.WriteLine("Finished sending exception warm-up msgs"); | |
Console.WriteLine("Hit 'Enter' to start pinging"); | |
Console.ReadLine(); | |
var messagesToSend = 5u; | |
for (int i = 0; i < messagesToSend; i++) | |
{ | |
Console.WriteLine($"Sending {i + 1} of {messagesToSend} msgs"); | |
singleActorRef.Tell(2); | |
await Task.Delay(TimeSpan.FromSeconds(1)); | |
} | |
Console.WriteLine("Finished pinging"); | |
} | |
private static async Task TestPool(IActorRef poolActorRef) | |
{ | |
Console.WriteLine("Hit 'Enter' to start warm-up with exception msgs"); | |
Console.ReadLine(); | |
var routees = (await poolActorRef.Ask<Routees>(GetRoutees.Instance)) | |
.Members | |
.OfType<ActorRefRoutee>(); | |
PrintRoutees(routees); | |
for (int i = 0; i < 500; i++) | |
{ | |
poolActorRef.Tell(1); | |
} | |
await Task.Delay(TimeSpan.FromSeconds(10)); // small delay to give time to reach convergence | |
Console.WriteLine("Finished sending exception warm-up msgs"); | |
Console.WriteLine("Hit 'Enter' to start pinging"); | |
Console.ReadLine(); | |
routees = (await poolActorRef.Ask<Routees>(GetRoutees.Instance)) | |
.Members | |
.OfType<ActorRefRoutee>(); | |
PrintRoutees(routees); | |
var isSuspendedMethodInfo = typeof(Mailbox).GetMethod("IsSuspended", | |
BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.InvokeMethod); | |
foreach (var actorRefRoutee in routees) | |
{ | |
var actorCell = ((LocalActorRef)actorRefRoutee.Actor).Cell; | |
var result = (bool) isSuspendedMethodInfo.Invoke(actorCell.Mailbox, null); | |
if (result) | |
{ | |
Console.WriteLine("Sending two msgs but not output expected: mailbox status had SuspendMask"); | |
} | |
else | |
{ | |
Console.WriteLine("Sending two msgs. Output expected"); | |
} | |
poolActorRef.Tell(2); | |
actorRefRoutee.Send(2, ActorRefs.NoSender); | |
await Task.Delay(TimeSpan.FromSeconds(1)); | |
} | |
Console.WriteLine("Finished pinging"); | |
} | |
public static void PrintRoutees(IEnumerable<ActorRefRoutee> routees) | |
{ | |
foreach (var routeesMember in routees) | |
{ | |
Console.WriteLine(routeesMember.Actor); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment