aboutsummaryrefslogtreecommitdiff
path: root/core/lib/Fiber.scala
diff options
context:
space:
mode:
authorMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-07 21:51:22 +0100
committerMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-07 21:51:22 +0100
commit6b620da09b9f05699c1cf6080de229e63de47942 (patch)
treed3061b37b98c228f0c2be18c171f9d911b0dca7a /core/lib/Fiber.scala
Add initial code and project files.
Diffstat (limited to 'core/lib/Fiber.scala')
-rw-r--r--core/lib/Fiber.scala185
1 files changed, 185 insertions, 0 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
new file mode 100644
index 0000000..0e2ef39
--- /dev/null
+++ b/core/lib/Fiber.scala
@@ -0,0 +1,185 @@
+package eu.mulk.fibers
+
+import monix.eval.Task
+import monix.execution.Scheduler
+import monix.execution.cancelables.SingleAssignmentCancelable
+import monix.reactive.{Observable, OverflowStrategy}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+import scala.util.continuations.{reset, shift}
+import scala.util.control.NonFatal
+
+/**
+ * Fiber-related functionality.
+ */
+object Fiber extends FiberConversions with FiberTypes {
+
+ sealed trait Effect[-Out] { type Result }
+
+ private[this] object Effect {
+
+ case object Finish extends Effect[Any] {
+ override type Result = Nothing
+ }
+
+ case class AwaitTask[T](task: Task[T], scheduler: Scheduler)
+ extends Effect[Any] {
+ override type Result = Try[T]
+ }
+
+ case class AwaitFuture[T](future: Future[T],
+ executionContext: ExecutionContext)
+ extends Effect[Any] {
+ override type Result = Try[T]
+ }
+
+ case class Fail(throwable: Throwable) extends Effect[Any] {
+ override type Result = Nothing
+ }
+
+ case class Emit[-T](returnValue: T) extends Effect[T] {
+ override type Result = Unit
+ }
+
+ case object GetFiberVar extends Effect[Any] {
+ override type Result = Any
+ }
+
+ case class PutFiberVar(value: Any) extends Effect[Any] {
+ override type Result = Unit
+ }
+
+ }
+
+ private[this] final class FiberState {
+ var fiberVar: Any = _
+ }
+
+ private[this] def perform[Out](
+ effect: Effect[Out]): effect.Result @fiber[Out] = {
+ val result = shift { (k: Any ⇒ Any) ⇒
+ (effect, k)
+ }
+ result.asInstanceOf[effect.Result]
+ }
+
+ /**
+ * Runs and awaits the completion of a [[monix.eval.Task]].
+ *
+ * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
+ * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception
+ * instead, consider destructuring the return value:
+ *
+ * val Success(x) = await(...)
+ *
+ * The supplied [[Scheduler]] is used to run the continuation of the fiber.
+ */
+ def await[T, Out](task: Task[T])(
+ implicit scheduler: Scheduler): Try[T] @fiber[Out] =
+ perform(Effect.AwaitTask(task, scheduler))
+
+ /**
+ * Awaits the completion of a [[scala.concurrent.Future]].
+ *
+ * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
+ * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception
+ * instead, consider destructuring the return value:
+ *
+ * val Success(x) = await(...)
+ */
+ def await[T, Out](fut: Future[T])(
+ implicit ec: ExecutionContext): Try[T] @fiber[Out] =
+ perform(Effect.AwaitFuture(fut, ec))
+
+ /**
+ * Emits a value to the output [[monix.reactive.Observable]].
+ */
+ def emit[Out](x: Out): Unit @fiber[Out] =
+ perform(Effect.Emit(x))
+
+ /**
+ * Replaces the current fiber-local state variable.
+ *
+ * This can be used like a thread-local variable, but for fibers.
+ *
+ * @see [[getFiberVar]]
+ */
+ def putFiberVar[Out](x: Any): Unit @fiber[Out] =
+ perform(Effect.PutFiberVar(x))
+
+ /**
+ * Gets the current fiber-local state variable.
+ *
+ * The fiber-local state must have been set with [[putFiberVar]] before.
+ *
+ * @see [[putFiberVar]]
+ */
+ def getFiberVar[Out]: Any @fiber[Out] =
+ perform(Effect.GetFiberVar)
+
+ /**
+ * Runs a fiber.
+ *
+ * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber.
+ * @return a [[monix.reactive.Observable]] producing the objects
+ * [[Fiber.emit]]ted by the fiber.
+ */
+ def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = {
+ var state = new FiberState
+
+ monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
+ val cancelable = SingleAssignmentCancelable()
+
+ def handle(more: (Effect[Out], Any ⇒ Any)): Unit = {
+ val (effect, continue) = more
+ try {
+ effect match {
+ case Effect.AwaitTask(task, scheduler) ⇒
+ task.asyncBoundary.runOnComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(scheduler)
+ case Effect.AwaitFuture(task, executionContext) ⇒
+ task.onComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(executionContext)
+ case Effect.GetFiberVar ⇒
+ val k = continue(state.fiberVar)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.PutFiberVar(value) ⇒
+ state.fiberVar = value
+ val k = continue(())
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.Emit(returnValue) ⇒
+ out.onNext(returnValue.asInstanceOf[Out])
+ val k = continue(())
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.Fail(throwable) ⇒
+ out.onError(throwable)
+ case Effect.Finish ⇒
+ out.onComplete()
+ }
+ } catch {
+ case NonFatal(err) ⇒
+ out.onError(err)
+ }
+ }
+
+ try {
+ val (effect, continue): (Effect[Out], Any ⇒ Any) = reset {
+ thunk
+ (Effect.Finish, (_: Any) ⇒ ???)
+ }
+ handle(effect, continue)
+ } catch {
+ case NonFatal(err) ⇒
+ out.onError(err)
+ }
+
+ cancelable
+ }
+ }
+
+}