Last active
April 1, 2020 00:55
-
-
Save kalexmills/856181d7f9f090aa5e11952cbf20c895 to your computer and use it in GitHub Desktop.
Hand-over-hand locking for cats-effect, along with as many type-classes as I could reasonably cram in.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.data._ | |
import cats.implicits._ | |
import cats.syntax._ | |
import cats.effect._ | |
import cats.effect.concurrent._ | |
import cats.effect.implicits._ | |
import cats.effect.ExitCase._ | |
import cats.arrow.Compose | |
import cats.arrow.FunctionK | |
trait Lock[F[_]] { | |
def lock: F[Unit] | |
def unlock: F[Unit] | |
def andThen(other: Lock[F])(implicit F: FlatMap[F]): Lock[F] = new Lock[F] { | |
def lock: F[Unit] = lock >> other.lock | |
def unlock: F[Unit] = other.unlock >> unlock | |
} | |
} | |
object Lock { | |
def noop[F[_]: Applicative]: Lock[F] = new Lock[F] { | |
def lock: F[Unit] = Applicative[F].unit | |
def unlock: F[Unit] = Applicative[F].unit | |
} | |
def mvar[F[_]: Sync](mvar: MVar[F, Unit]): Lock[F] = new Lock[F] { | |
def lock = Sync[F].delay(println(s"locking $mvar")) >> mvar.take | |
def unlock = Sync[F].delay(println(s"unlocking $mvar")) >> mvar.tryPut(()).void | |
} | |
} | |
/** | |
* Link is a suspended Bracket operation which allows Composition via hand-over-hand locking to prevent threads | |
* from over taking one another. | |
* | |
* @param lock F[R] which will be called only once before critical is called. The result is passed as an argument to critical | |
* @param critical a critical section. | |
* @param unlock F[Unit] which is guaranteed to be called at least once after lock has been called. In case of error, unlock may possibly be called more than once. | |
*/ | |
case class Link[F[_]: Bracket[*[_], Throwable], -A, B]( // TODO: use E | |
private val lock: F[Unit], | |
private val critical: A => F[B], | |
private val unlock: F[Unit]) { | |
def run(a: A): F[B] = | |
(lock >> a.pure[F]).bracket(critical)(_ => unlock) | |
/** | |
* andThen combines this link with its argument, performing hand-over-hand locking so that other.lock is locked this.lock is unlocked. | |
*/ | |
def andThen[C, S](other: Link[F, B, C]): Link[F, A, C] = | |
Link( | |
lock, | |
(a) => | |
critical(a) | |
.flatMap(b => { | |
other.lock.bracketCase(_ => unlock >> other.critical(b))((_, exit) => | |
exit match { | |
case Completed => Applicative[F].unit | |
case _ => other.unlock | |
}) | |
}), | |
other.unlock | |
) | |
def ap[C, AA <: A, S](f: Link[F, AA, B => C]): Link[F, AA, C] = | |
Link(lock >> f.lock, (a) => Apply[F].ap(f.run(a))(run(a)), f.unlock >> lock) | |
def compose[C,S, AA <: A](other: Link[F, C, AA]): Link[F, C, B] = | |
other.andThen(this) | |
def map[C](f: B => C): Link[F, A, C] = | |
Link(lock, critical(_).map(f), unlock) | |
def contramap[C](f: C => A): Link[F, C, B] = | |
Link(lock, c => critical(f(c)), unlock) | |
def semiflatMap[C](f: B => F[C]): Link[F, A, C] = | |
Link(lock, critical(_).flatMap(f), unlock) | |
def semicoflatMap[C](f: (F[B]) => C): Link[F,A, C] = | |
Link(lock, a => f(critical(a)).pure[F], unlock) | |
} | |
object Link { | |
def fromLock[F[_]: Bracket[*[_], Throwable], A, B](lock: Lock[F], critical: A => F[B]): Link[F, A, B] = | |
Link(lock.lock, critical, lock.unlock) | |
def pure[F[_]: Bracket[*[_], Throwable], A, C](a: A): Link[F, C, A] = | |
Link(Lock.noop[F].lock, _ => a.pure[F], Lock.noop[F].unlock) | |
def chain[F[_]: Bracket[*[_], E], E](head: Link[F, Unit, Unit], tail: Link[F, Unit, Unit]*): Link[F, Unit, Unit] = | |
tail.foldLeft(head)(_ andThen _) | |
implicit def composeForLink[F[_]: Bracket[*[_], E], E]: Compose[Link[F, *, *]] = new Compose[Link[F, *, *]] { | |
def compose[A, B, C](f: Link[F,B,C], g: Link[F,A,B]): Link[F,A,C] = f compose g | |
} | |
implicit def contravariantForLink[F[_]: Bracket[*[_], E], C, E]: Contravariant[Link[F, *, C]] = new Contravariant[Link[F, *, C]] { | |
def contramap[A,B](fa: Link[F, A, C])(f: B => A): Link[F, B, C] = fa.contramap(f) | |
} | |
implicit def applicativeForLink[F[_]: Bracket[*[_], Throwable], C, E]: Applicative[Link[F, C, *]] = new Applicative[Link[F, C, *]] { | |
def ap[A, B](ff: Link[F,C,A => B])(fa: Link[F,C,A]): Link[F,C,B] = fa.ap(ff) | |
def pure[A](x: A): Link[F,C,A] = Link.pure[F, A, C](x) | |
} | |
class UnitLinkOps[F[_], B](link: Link[F, Unit, B]) { | |
def run: F[B] = link.run(()) | |
} | |
implicit def toUnitLinkOps[F[_], B, R](link: Link[F, Unit, B]): UnitLinkOps[F, B] = | |
new UnitLinkOps[F, B](link) | |
implicit def functionKToKleisli[F[_]: Bracket[*[_], E], A, B, E]: cats.arrow.FunctionK[Link[F, A, *], Kleisli[F, A, *]] = | |
λ[FunctionK[Link[F, A, *], Kleisli[F, A, *]]](link => Kleisli((a: A) => link.run(a))) | |
} | |
object Test extends IOApp { | |
import Link._ | |
def run(arg: List[String]) = { | |
implicit def mvarToLock[F[_]: Sync](mvar: MVar[F, Unit]): Lock[F] = Lock.mvar[F](mvar) | |
for { | |
lock1 <- MVar[IO].of(()) | |
lock2 <- MVar[IO].of(()) | |
link1 = Link.fromLock[IO, Unit, Int](lock1, _ => IO.delay(println("inside first")) >> IO.pure(17)) | |
link2 = Link.fromLock[IO, Int, String](lock2, | |
x => IO.delay(println("inside second")) >> IO.pure(x + "!")) | |
x <- (link1 andThen link2).run(()) | |
_ <- IO.delay(println(x)) | |
chained <- Link.chain( | |
Link.fromLock[IO, Unit, Unit](lock1, _ => IO(println("performing operation 1"))), | |
Link.fromLock[IO, Unit, Unit](lock2, _ => IO(println("performing operation 2"))), | |
Link.fromLock[IO, Unit, Unit](lock1, _ => IO(println("performing operation 3"))), | |
Link.fromLock[IO, Unit, Unit](lock2, _ => IO(println("performing operation 4"))) | |
).run(()) | |
lock3 <- MVar[IO].of(()) | |
lock4 <- MVar[IO].of(()) | |
chained <- Link.chain( | |
Link.fromLock[IO, Unit, Unit](lock1, _ => IO(println("performing operation 1"))), | |
Link.fromLock[IO, Unit, Unit](lock2, _ => IO(println("performing operation 2"))), | |
Link.fromLock[IO, Unit, Unit](lock3, _ => IO(println("performing operation 3"))), | |
Link.fromLock[IO, Unit, Unit](lock4, _ => IO(println("performing operation 4"))) | |
).run(()) | |
chained <- Link.chain( | |
Link.fromLock[IO, Unit, Unit](lock1, _ => IO(println("performing operation 1"))), | |
Link.fromLock[IO, Unit, Unit](lock2, _ => IO(println("performing operation 2"))), | |
Link.fromLock[IO, Unit, Unit](lock3, _ => IO.raiseError(new RuntimeException("aaahhhhhh!"))), | |
Link.fromLock[IO, Unit, Unit](lock4, _ => IO(println("performing operation 4"))) | |
).run(()) | |
.handleErrorWith(_ => IO(println("caught it!"))) | |
} yield (ExitCode.Success) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Notes to myself: the
SemigroupK
fromCompose
can be used in the implementation ofchain
.