From 6b620da09b9f05699c1cf6080de229e63de47942 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Sun, 7 Jan 2018 21:51:22 +0100 Subject: Add initial code and project files. --- core/lib/Fiber.scala | 185 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 core/lib/Fiber.scala (limited to 'core/lib/Fiber.scala') diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala new file mode 100644 index 0000000..0e2ef39 --- /dev/null +++ b/core/lib/Fiber.scala @@ -0,0 +1,185 @@ +package eu.mulk.fibers + +import monix.eval.Task +import monix.execution.Scheduler +import monix.execution.cancelables.SingleAssignmentCancelable +import monix.reactive.{Observable, OverflowStrategy} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scala.util.continuations.{reset, shift} +import scala.util.control.NonFatal + +/** + * Fiber-related functionality. + */ +object Fiber extends FiberConversions with FiberTypes { + + sealed trait Effect[-Out] { type Result } + + private[this] object Effect { + + case object Finish extends Effect[Any] { + override type Result = Nothing + } + + case class AwaitTask[T](task: Task[T], scheduler: Scheduler) + extends Effect[Any] { + override type Result = Try[T] + } + + case class AwaitFuture[T](future: Future[T], + executionContext: ExecutionContext) + extends Effect[Any] { + override type Result = Try[T] + } + + case class Fail(throwable: Throwable) extends Effect[Any] { + override type Result = Nothing + } + + case class Emit[-T](returnValue: T) extends Effect[T] { + override type Result = Unit + } + + case object GetFiberVar extends Effect[Any] { + override type Result = Any + } + + case class PutFiberVar(value: Any) extends Effect[Any] { + override type Result = Unit + } + + } + + private[this] final class FiberState { + var fiberVar: Any = _ + } + + private[this] def perform[Out]( + effect: Effect[Out]): effect.Result @fiber[Out] = { + val result = shift { (k: Any ⇒ Any) ⇒ + (effect, k) + } + result.asInstanceOf[effect.Result] + } + + /** + * Runs and awaits the completion of a [[monix.eval.Task]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + * + * The supplied [[Scheduler]] is used to run the continuation of the fiber. + */ + def await[T, Out](task: Task[T])( + implicit scheduler: Scheduler): Try[T] @fiber[Out] = + perform(Effect.AwaitTask(task, scheduler)) + + /** + * Awaits the completion of a [[scala.concurrent.Future]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + */ + def await[T, Out](fut: Future[T])( + implicit ec: ExecutionContext): Try[T] @fiber[Out] = + perform(Effect.AwaitFuture(fut, ec)) + + /** + * Emits a value to the output [[monix.reactive.Observable]]. + */ + def emit[Out](x: Out): Unit @fiber[Out] = + perform(Effect.Emit(x)) + + /** + * Replaces the current fiber-local state variable. + * + * This can be used like a thread-local variable, but for fibers. + * + * @see [[getFiberVar]] + */ + def putFiberVar[Out](x: Any): Unit @fiber[Out] = + perform(Effect.PutFiberVar(x)) + + /** + * Gets the current fiber-local state variable. + * + * The fiber-local state must have been set with [[putFiberVar]] before. + * + * @see [[putFiberVar]] + */ + def getFiberVar[Out]: Any @fiber[Out] = + perform(Effect.GetFiberVar) + + /** + * Runs a fiber. + * + * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. + * @return a [[monix.reactive.Observable]] producing the objects + * [[Fiber.emit]]ted by the fiber. + */ + def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = { + var state = new FiberState + + monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ + val cancelable = SingleAssignmentCancelable() + + def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { + val (effect, continue) = more + try { + effect match { + case Effect.AwaitTask(task, scheduler) ⇒ + task.asyncBoundary.runOnComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(scheduler) + case Effect.AwaitFuture(task, executionContext) ⇒ + task.onComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(executionContext) + case Effect.GetFiberVar ⇒ + val k = continue(state.fiberVar) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.PutFiberVar(value) ⇒ + state.fiberVar = value + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Emit(returnValue) ⇒ + out.onNext(returnValue.asInstanceOf[Out]) + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Fail(throwable) ⇒ + out.onError(throwable) + case Effect.Finish ⇒ + out.onComplete() + } + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + } + + try { + val (effect, continue): (Effect[Out], Any ⇒ Any) = reset { + thunk + (Effect.Finish, (_: Any) ⇒ ???) + } + handle(effect, continue) + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + + cancelable + } + } + +} -- cgit v1.2.3