|
package docs.akka.persistence.typed |
|
|
|
import scala.concurrent.duration.FiniteDuration |
|
|
|
import akka.actor.typed.ActorRef |
|
import akka.actor.typed.Behavior |
|
import akka.actor.typed.scaladsl.Behaviors |
|
import akka.actor.typed.scaladsl.LoggerOps |
|
import akka.persistence.typed.PersistenceId |
|
import akka.persistence.typed.RecoveryCompleted |
|
import akka.persistence.typed.scaladsl.Effect |
|
import akka.persistence.typed.scaladsl.EventSourcedBehavior |
|
import akka.persistence.typed.scaladsl.RetentionCriteria |
|
|
|
object AtLeastOnceExample { |
|
|
|
object Sender { |
|
sealed trait Command |
|
final case class DeliverMsg(payload: Destination.MsgPayload) extends Command with CborSerializable |
|
private final case class WrappedConfirm(confirm: Destination.Confirm) extends Command |
|
private case object DestinationTerminated extends Command |
|
private case class Redeliver(deliveryId: Long, attempt: Int) extends Command |
|
|
|
sealed trait Event extends CborSerializable |
|
final case class MsgSent(payload: Destination.MsgPayload) extends Event |
|
final case class MsgConfirmed(deliveryId: Long) extends Event |
|
|
|
// key in the `pending` Map is the deliveryId |
|
// using Map[String, _] due to serialization problem of Map[Long, _] |
|
final case class State(deliveryId: Long, pending: Map[String, Destination.MsgPayload]) extends CborSerializable |
|
|
|
def apply( |
|
persistenceId: PersistenceId, |
|
destination: ActorRef[Destination.Command], |
|
redeliverAfter: FiniteDuration): Behavior[Command] = { |
|
|
|
Behaviors.setup { context => |
|
Behaviors.withTimers { timers => |
|
context.watchWith(destination, DestinationTerminated) |
|
|
|
val confirmAdapter = context.messageAdapter[Destination.Confirm](WrappedConfirm.apply) |
|
|
|
EventSourcedBehavior[Command, Event, State]( |
|
persistenceId, |
|
emptyState = State(0L, Map.empty), |
|
commandHandler = (state, command) => |
|
command match { |
|
case DeliverMsg(payload) => |
|
Effect.persist(MsgSent(payload)).thenRun { newState => |
|
val deliveryId = newState.deliveryId |
|
context.log.info("Deliver #{} to {}", deliveryId, destination) |
|
destination ! Destination.Msg(deliveryId, payload, confirmAdapter) |
|
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, 2), redeliverAfter) |
|
} |
|
case WrappedConfirm(Destination.Confirm(deliveryId)) => |
|
context.log.info("Confirmed #{} from {}", deliveryId, destination) |
|
timers.cancel(deliveryId) |
|
Effect.persist(MsgConfirmed(deliveryId)) |
|
case Redeliver(deliveryId, attempt) => |
|
context.log.infoN("Redeliver #{}, attempt {}, to {}", deliveryId, attempt, destination) |
|
val payload = state.pending(deliveryId.toString) |
|
destination ! Destination.Msg(deliveryId, payload, confirmAdapter) |
|
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, attempt + 1), redeliverAfter) |
|
Effect.none |
|
case DestinationTerminated => |
|
context.log.warn("Destination {} terminated", destination) |
|
Effect.stop() |
|
}, |
|
eventHandler = (state, event) => |
|
event match { |
|
case MsgSent(payload) => |
|
val nextDeliveryId = state.deliveryId + 1 |
|
state.copy( |
|
deliveryId = nextDeliveryId, |
|
pending = state.pending + (nextDeliveryId.toString -> payload)) |
|
case MsgConfirmed(deliveryId) => |
|
state.copy(pending = state.pending - deliveryId.toString) |
|
}) |
|
.receiveSignal { |
|
case (state, RecoveryCompleted) => |
|
state.pending.toList.sortBy { case (deliveryId, _) => deliveryId }.foreach { |
|
case (id, payload) => |
|
val deliveryId = id.toLong // workaround for CborSerializaton issue of Map[Long, _] |
|
context.log.info("Deliver #{} to {} after recovery", deliveryId, destination) |
|
destination ! Destination.Msg(deliveryId, payload, confirmAdapter) |
|
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, 2), redeliverAfter) |
|
} |
|
} |
|
.withRetention(RetentionCriteria.snapshotEvery(100, 3).withDeleteEventsOnSnapshot) |
|
} |
|
} |
|
} |
|
} |
|
|
|
object Destination { |
|
sealed trait Command |
|
final case class Msg(deliveryId: Long, payload: MsgPayload, replyTo: ActorRef[Confirm]) |
|
extends Command |
|
with CborSerializable |
|
final case class MsgPayload(s: String) |
|
final case class Confirm(deliveryId: Long) extends CborSerializable |
|
|
|
def apply(): Behavior[Command] = |
|
Behaviors.receive { (context, message) => |
|
message match { |
|
case Msg(deliveryId, payload, replyTo) => |
|
context.log.info2("Received #{}: {}", deliveryId, payload.s) |
|
replyTo ! Confirm(deliveryId) |
|
Behaviors.same |
|
|
|
} |
|
} |
|
|
|
} |
|
|
|
/** |
|
* Marker trait for serialization with Jackson CBOR |
|
*/ |
|
trait CborSerializable |
|
} |