From 6b620da09b9f05699c1cf6080de229e63de47942 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Sun, 7 Jan 2018 21:51:22 +0100 Subject: Add initial code and project files. --- core/lib/Fiber.scala | 185 ++++++++++++++++++++++++++++++++++++++++ core/lib/FiberConversions.scala | 46 ++++++++++ core/lib/FiberTypes.scala | 19 +++++ 3 files changed, 250 insertions(+) create mode 100644 core/lib/Fiber.scala create mode 100644 core/lib/FiberConversions.scala create mode 100644 core/lib/FiberTypes.scala (limited to 'core/lib') diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala new file mode 100644 index 0000000..0e2ef39 --- /dev/null +++ b/core/lib/Fiber.scala @@ -0,0 +1,185 @@ +package eu.mulk.fibers + +import monix.eval.Task +import monix.execution.Scheduler +import monix.execution.cancelables.SingleAssignmentCancelable +import monix.reactive.{Observable, OverflowStrategy} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scala.util.continuations.{reset, shift} +import scala.util.control.NonFatal + +/** + * Fiber-related functionality. + */ +object Fiber extends FiberConversions with FiberTypes { + + sealed trait Effect[-Out] { type Result } + + private[this] object Effect { + + case object Finish extends Effect[Any] { + override type Result = Nothing + } + + case class AwaitTask[T](task: Task[T], scheduler: Scheduler) + extends Effect[Any] { + override type Result = Try[T] + } + + case class AwaitFuture[T](future: Future[T], + executionContext: ExecutionContext) + extends Effect[Any] { + override type Result = Try[T] + } + + case class Fail(throwable: Throwable) extends Effect[Any] { + override type Result = Nothing + } + + case class Emit[-T](returnValue: T) extends Effect[T] { + override type Result = Unit + } + + case object GetFiberVar extends Effect[Any] { + override type Result = Any + } + + case class PutFiberVar(value: Any) extends Effect[Any] { + override type Result = Unit + } + + } + + private[this] final class FiberState { + var fiberVar: Any = _ + } + + private[this] def perform[Out]( + effect: Effect[Out]): effect.Result @fiber[Out] = { + val result = shift { (k: Any ⇒ Any) ⇒ + (effect, k) + } + result.asInstanceOf[effect.Result] + } + + /** + * Runs and awaits the completion of a [[monix.eval.Task]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + * + * The supplied [[Scheduler]] is used to run the continuation of the fiber. + */ + def await[T, Out](task: Task[T])( + implicit scheduler: Scheduler): Try[T] @fiber[Out] = + perform(Effect.AwaitTask(task, scheduler)) + + /** + * Awaits the completion of a [[scala.concurrent.Future]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + */ + def await[T, Out](fut: Future[T])( + implicit ec: ExecutionContext): Try[T] @fiber[Out] = + perform(Effect.AwaitFuture(fut, ec)) + + /** + * Emits a value to the output [[monix.reactive.Observable]]. + */ + def emit[Out](x: Out): Unit @fiber[Out] = + perform(Effect.Emit(x)) + + /** + * Replaces the current fiber-local state variable. + * + * This can be used like a thread-local variable, but for fibers. + * + * @see [[getFiberVar]] + */ + def putFiberVar[Out](x: Any): Unit @fiber[Out] = + perform(Effect.PutFiberVar(x)) + + /** + * Gets the current fiber-local state variable. + * + * The fiber-local state must have been set with [[putFiberVar]] before. + * + * @see [[putFiberVar]] + */ + def getFiberVar[Out]: Any @fiber[Out] = + perform(Effect.GetFiberVar) + + /** + * Runs a fiber. + * + * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. + * @return a [[monix.reactive.Observable]] producing the objects + * [[Fiber.emit]]ted by the fiber. + */ + def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = { + var state = new FiberState + + monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ + val cancelable = SingleAssignmentCancelable() + + def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { + val (effect, continue) = more + try { + effect match { + case Effect.AwaitTask(task, scheduler) ⇒ + task.asyncBoundary.runOnComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(scheduler) + case Effect.AwaitFuture(task, executionContext) ⇒ + task.onComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(executionContext) + case Effect.GetFiberVar ⇒ + val k = continue(state.fiberVar) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.PutFiberVar(value) ⇒ + state.fiberVar = value + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Emit(returnValue) ⇒ + out.onNext(returnValue.asInstanceOf[Out]) + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Fail(throwable) ⇒ + out.onError(throwable) + case Effect.Finish ⇒ + out.onComplete() + } + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + } + + try { + val (effect, continue): (Effect[Out], Any ⇒ Any) = reset { + thunk + (Effect.Finish, (_: Any) ⇒ ???) + } + handle(effect, continue) + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + + cancelable + } + } + +} diff --git a/core/lib/FiberConversions.scala b/core/lib/FiberConversions.scala new file mode 100644 index 0000000..fe79796 --- /dev/null +++ b/core/lib/FiberConversions.scala @@ -0,0 +1,46 @@ +package eu.mulk.fibers + +import scala.collection.generic.CanBuildFrom +import scala.collection.{GenTraversableOnce, IterableLike} +import scala.language.implicitConversions + +/** + * Implicit conversions for convenient use of containers from within + * fibers. + */ +private[fibers] trait FiberConversions extends FiberTypes { + // + // This is based on code by James Earl Dougles, taken from: + // + // https://earldouglas.com/posts/monadic-continuations.html + // + + /** + * Defines the `cps` helper on [[IterableLike]] objects, which you can call to + * traverse them from fibers. + */ + implicit def cpsIterable[A, Repr](xs: IterableLike[A, Repr]) = new { + + /** + * Provides fiber-compatible iteration functions. + */ + def cps = new { + def foreach[B, T](f: A => Any @fiber[T]): Unit @fiber[T] = { + val it = xs.iterator + while (it.hasNext) f(it.next) + } + def map[B, That, T](f: A => B @fiber[T])( + implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { + val b = cbf(xs.repr) + foreach(b += f(_)) + b.result + } + def flatMap[B, That, T](f: A => GenTraversableOnce[B] @fiber[T])( + implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { + val b = cbf(xs.repr) + for (x <- this) b ++= f(x) + b.result + } + } + } +} diff --git a/core/lib/FiberTypes.scala b/core/lib/FiberTypes.scala new file mode 100644 index 0000000..35f6a42 --- /dev/null +++ b/core/lib/FiberTypes.scala @@ -0,0 +1,19 @@ +package eu.mulk.fibers + +import scala.language.implicitConversions +import scala.util.continuations.cpsParam + +private[fibers] trait FiberTypes { + type Effect[_] + + /** + * Annotates a computation as a fiber. + * + * Annotate the return type of a fibrational function with this type + * annotator. + * + * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. + * @see [[Fiber.run]] + */ + type fiber[Out] = cpsParam[Any, (Effect[Out], Any ⇒ Any)] +} -- cgit v1.2.3