diff options
| author | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-07 21:51:22 +0100 | 
|---|---|---|
| committer | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-07 21:51:22 +0100 | 
| commit | 6b620da09b9f05699c1cf6080de229e63de47942 (patch) | |
| tree | d3061b37b98c228f0c2be18c171f9d911b0dca7a /core/lib | |
Add initial code and project files.
Diffstat (limited to 'core/lib')
| -rw-r--r-- | core/lib/Fiber.scala | 185 | ||||
| -rw-r--r-- | core/lib/FiberConversions.scala | 46 | ||||
| -rw-r--r-- | core/lib/FiberTypes.scala | 19 | 
3 files changed, 250 insertions, 0 deletions
| diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala new file mode 100644 index 0000000..0e2ef39 --- /dev/null +++ b/core/lib/Fiber.scala @@ -0,0 +1,185 @@ +package eu.mulk.fibers + +import monix.eval.Task +import monix.execution.Scheduler +import monix.execution.cancelables.SingleAssignmentCancelable +import monix.reactive.{Observable, OverflowStrategy} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scala.util.continuations.{reset, shift} +import scala.util.control.NonFatal + +/** +  * Fiber-related functionality. +  */ +object Fiber extends FiberConversions with FiberTypes { + +  sealed trait Effect[-Out] { type Result } + +  private[this] object Effect { + +    case object Finish extends Effect[Any] { +      override type Result = Nothing +    } + +    case class AwaitTask[T](task: Task[T], scheduler: Scheduler) +        extends Effect[Any] { +      override type Result = Try[T] +    } + +    case class AwaitFuture[T](future: Future[T], +                              executionContext: ExecutionContext) +        extends Effect[Any] { +      override type Result = Try[T] +    } + +    case class Fail(throwable: Throwable) extends Effect[Any] { +      override type Result = Nothing +    } + +    case class Emit[-T](returnValue: T) extends Effect[T] { +      override type Result = Unit +    } + +    case object GetFiberVar extends Effect[Any] { +      override type Result = Any +    } + +    case class PutFiberVar(value: Any) extends Effect[Any] { +      override type Result = Unit +    } + +  } + +  private[this] final class FiberState { +    var fiberVar: Any = _ +  } + +  private[this] def perform[Out]( +      effect: Effect[Out]): effect.Result @fiber[Out] = { +    val result = shift { (k: Any ⇒ Any) ⇒ +      (effect, k) +    } +    result.asInstanceOf[effect.Result] +  } + +  /** +    * Runs and awaits the completion of a [[monix.eval.Task]]. +    * +    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on +    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception +    * instead, consider destructuring the return value: +    * +    *     val Success(x) = await(...) +    * +    * The supplied [[Scheduler]] is used to run the continuation of the fiber. +    */ +  def await[T, Out](task: Task[T])( +      implicit scheduler: Scheduler): Try[T] @fiber[Out] = +    perform(Effect.AwaitTask(task, scheduler)) + +  /** +    * Awaits the completion of a [[scala.concurrent.Future]]. +    * +    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on +    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception +    * instead, consider destructuring the return value: +    * +    *     val Success(x) = await(...) +    */ +  def await[T, Out](fut: Future[T])( +      implicit ec: ExecutionContext): Try[T] @fiber[Out] = +    perform(Effect.AwaitFuture(fut, ec)) + +  /** +    * Emits a value to the output [[monix.reactive.Observable]]. +    */ +  def emit[Out](x: Out): Unit @fiber[Out] = +    perform(Effect.Emit(x)) + +  /** +    * Replaces the current fiber-local state variable. +    * +    * This can be used like a thread-local variable, but for fibers. +    * +    * @see [[getFiberVar]] +    */ +  def putFiberVar[Out](x: Any): Unit @fiber[Out] = +    perform(Effect.PutFiberVar(x)) + +  /** +    * Gets the current fiber-local state variable. +    * +    * The fiber-local state must have been set with [[putFiberVar]] before. +    * +    * @see [[putFiberVar]] +    */ +  def getFiberVar[Out]: Any @fiber[Out] = +    perform(Effect.GetFiberVar) + +  /** +    * Runs a fiber. +    * +    * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. +    * @return     a [[monix.reactive.Observable]] producing the objects +    *             [[Fiber.emit]]ted by the fiber. +    */ +  def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = { +    var state = new FiberState + +    monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ +      val cancelable = SingleAssignmentCancelable() + +      def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { +        val (effect, continue) = more +        try { +          effect match { +            case Effect.AwaitTask(task, scheduler) ⇒ +              task.asyncBoundary.runOnComplete({ result ⇒ +                val k = continue(result) +                handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) +              })(scheduler) +            case Effect.AwaitFuture(task, executionContext) ⇒ +              task.onComplete({ result ⇒ +                val k = continue(result) +                handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) +              })(executionContext) +            case Effect.GetFiberVar ⇒ +              val k = continue(state.fiberVar) +              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) +            case Effect.PutFiberVar(value) ⇒ +              state.fiberVar = value +              val k = continue(()) +              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) +            case Effect.Emit(returnValue) ⇒ +              out.onNext(returnValue.asInstanceOf[Out]) +              val k = continue(()) +              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) +            case Effect.Fail(throwable) ⇒ +              out.onError(throwable) +            case Effect.Finish ⇒ +              out.onComplete() +          } +        } catch { +          case NonFatal(err) ⇒ +            out.onError(err) +        } +      } + +      try { +        val (effect, continue): (Effect[Out], Any ⇒ Any) = reset { +          thunk +          (Effect.Finish, (_: Any) ⇒ ???) +        } +        handle(effect, continue) +      } catch { +        case NonFatal(err) ⇒ +          out.onError(err) +      } + +      cancelable +    } +  } + +} diff --git a/core/lib/FiberConversions.scala b/core/lib/FiberConversions.scala new file mode 100644 index 0000000..fe79796 --- /dev/null +++ b/core/lib/FiberConversions.scala @@ -0,0 +1,46 @@ +package eu.mulk.fibers + +import scala.collection.generic.CanBuildFrom +import scala.collection.{GenTraversableOnce, IterableLike} +import scala.language.implicitConversions + +/** +  * Implicit conversions for convenient use of containers from within +  * fibers. +  */ +private[fibers] trait FiberConversions extends FiberTypes { +  // +  // This is based on code by James Earl Dougles, taken from: +  // +  //     https://earldouglas.com/posts/monadic-continuations.html +  // + +  /** +    * Defines the `cps` helper on [[IterableLike]] objects, which you can call to +    * traverse them from fibers. +    */ +  implicit def cpsIterable[A, Repr](xs: IterableLike[A, Repr]) = new { + +    /** +      * Provides fiber-compatible iteration functions. +      */ +    def cps = new { +      def foreach[B, T](f: A => Any @fiber[T]): Unit @fiber[T] = { +        val it = xs.iterator +        while (it.hasNext) f(it.next) +      } +      def map[B, That, T](f: A => B @fiber[T])( +          implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { +        val b = cbf(xs.repr) +        foreach(b += f(_)) +        b.result +      } +      def flatMap[B, That, T](f: A => GenTraversableOnce[B] @fiber[T])( +          implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { +        val b = cbf(xs.repr) +        for (x <- this) b ++= f(x) +        b.result +      } +    } +  } +} diff --git a/core/lib/FiberTypes.scala b/core/lib/FiberTypes.scala new file mode 100644 index 0000000..35f6a42 --- /dev/null +++ b/core/lib/FiberTypes.scala @@ -0,0 +1,19 @@ +package eu.mulk.fibers + +import scala.language.implicitConversions +import scala.util.continuations.cpsParam + +private[fibers] trait FiberTypes { +  type Effect[_] + +  /** +    * Annotates a computation as a fiber. +    * +    * Annotate the return type of a fibrational function with this type +    * annotator. +    * +    * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. +    * @see [[Fiber.run]] +    */ +  type fiber[Out] = cpsParam[Any, (Effect[Out], Any ⇒ Any)] +} | 
