aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/lib/Fiber.scala185
-rw-r--r--core/lib/FiberConversions.scala46
-rw-r--r--core/lib/FiberTypes.scala19
-rw-r--r--core/t/FiberSpec.scala180
4 files changed, 430 insertions, 0 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
new file mode 100644
index 0000000..0e2ef39
--- /dev/null
+++ b/core/lib/Fiber.scala
@@ -0,0 +1,185 @@
+package eu.mulk.fibers
+
+import monix.eval.Task
+import monix.execution.Scheduler
+import monix.execution.cancelables.SingleAssignmentCancelable
+import monix.reactive.{Observable, OverflowStrategy}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+import scala.util.continuations.{reset, shift}
+import scala.util.control.NonFatal
+
+/**
+ * Fiber-related functionality.
+ */
+object Fiber extends FiberConversions with FiberTypes {
+
+ sealed trait Effect[-Out] { type Result }
+
+ private[this] object Effect {
+
+ case object Finish extends Effect[Any] {
+ override type Result = Nothing
+ }
+
+ case class AwaitTask[T](task: Task[T], scheduler: Scheduler)
+ extends Effect[Any] {
+ override type Result = Try[T]
+ }
+
+ case class AwaitFuture[T](future: Future[T],
+ executionContext: ExecutionContext)
+ extends Effect[Any] {
+ override type Result = Try[T]
+ }
+
+ case class Fail(throwable: Throwable) extends Effect[Any] {
+ override type Result = Nothing
+ }
+
+ case class Emit[-T](returnValue: T) extends Effect[T] {
+ override type Result = Unit
+ }
+
+ case object GetFiberVar extends Effect[Any] {
+ override type Result = Any
+ }
+
+ case class PutFiberVar(value: Any) extends Effect[Any] {
+ override type Result = Unit
+ }
+
+ }
+
+ private[this] final class FiberState {
+ var fiberVar: Any = _
+ }
+
+ private[this] def perform[Out](
+ effect: Effect[Out]): effect.Result @fiber[Out] = {
+ val result = shift { (k: Any ⇒ Any) ⇒
+ (effect, k)
+ }
+ result.asInstanceOf[effect.Result]
+ }
+
+ /**
+ * Runs and awaits the completion of a [[monix.eval.Task]].
+ *
+ * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
+ * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception
+ * instead, consider destructuring the return value:
+ *
+ * val Success(x) = await(...)
+ *
+ * The supplied [[Scheduler]] is used to run the continuation of the fiber.
+ */
+ def await[T, Out](task: Task[T])(
+ implicit scheduler: Scheduler): Try[T] @fiber[Out] =
+ perform(Effect.AwaitTask(task, scheduler))
+
+ /**
+ * Awaits the completion of a [[scala.concurrent.Future]].
+ *
+ * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
+ * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception
+ * instead, consider destructuring the return value:
+ *
+ * val Success(x) = await(...)
+ */
+ def await[T, Out](fut: Future[T])(
+ implicit ec: ExecutionContext): Try[T] @fiber[Out] =
+ perform(Effect.AwaitFuture(fut, ec))
+
+ /**
+ * Emits a value to the output [[monix.reactive.Observable]].
+ */
+ def emit[Out](x: Out): Unit @fiber[Out] =
+ perform(Effect.Emit(x))
+
+ /**
+ * Replaces the current fiber-local state variable.
+ *
+ * This can be used like a thread-local variable, but for fibers.
+ *
+ * @see [[getFiberVar]]
+ */
+ def putFiberVar[Out](x: Any): Unit @fiber[Out] =
+ perform(Effect.PutFiberVar(x))
+
+ /**
+ * Gets the current fiber-local state variable.
+ *
+ * The fiber-local state must have been set with [[putFiberVar]] before.
+ *
+ * @see [[putFiberVar]]
+ */
+ def getFiberVar[Out]: Any @fiber[Out] =
+ perform(Effect.GetFiberVar)
+
+ /**
+ * Runs a fiber.
+ *
+ * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber.
+ * @return a [[monix.reactive.Observable]] producing the objects
+ * [[Fiber.emit]]ted by the fiber.
+ */
+ def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = {
+ var state = new FiberState
+
+ monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
+ val cancelable = SingleAssignmentCancelable()
+
+ def handle(more: (Effect[Out], Any ⇒ Any)): Unit = {
+ val (effect, continue) = more
+ try {
+ effect match {
+ case Effect.AwaitTask(task, scheduler) ⇒
+ task.asyncBoundary.runOnComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(scheduler)
+ case Effect.AwaitFuture(task, executionContext) ⇒
+ task.onComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(executionContext)
+ case Effect.GetFiberVar ⇒
+ val k = continue(state.fiberVar)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.PutFiberVar(value) ⇒
+ state.fiberVar = value
+ val k = continue(())
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.Emit(returnValue) ⇒
+ out.onNext(returnValue.asInstanceOf[Out])
+ val k = continue(())
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ case Effect.Fail(throwable) ⇒
+ out.onError(throwable)
+ case Effect.Finish ⇒
+ out.onComplete()
+ }
+ } catch {
+ case NonFatal(err) ⇒
+ out.onError(err)
+ }
+ }
+
+ try {
+ val (effect, continue): (Effect[Out], Any ⇒ Any) = reset {
+ thunk
+ (Effect.Finish, (_: Any) ⇒ ???)
+ }
+ handle(effect, continue)
+ } catch {
+ case NonFatal(err) ⇒
+ out.onError(err)
+ }
+
+ cancelable
+ }
+ }
+
+}
diff --git a/core/lib/FiberConversions.scala b/core/lib/FiberConversions.scala
new file mode 100644
index 0000000..fe79796
--- /dev/null
+++ b/core/lib/FiberConversions.scala
@@ -0,0 +1,46 @@
+package eu.mulk.fibers
+
+import scala.collection.generic.CanBuildFrom
+import scala.collection.{GenTraversableOnce, IterableLike}
+import scala.language.implicitConversions
+
+/**
+ * Implicit conversions for convenient use of containers from within
+ * fibers.
+ */
+private[fibers] trait FiberConversions extends FiberTypes {
+ //
+ // This is based on code by James Earl Dougles, taken from:
+ //
+ // https://earldouglas.com/posts/monadic-continuations.html
+ //
+
+ /**
+ * Defines the `cps` helper on [[IterableLike]] objects, which you can call to
+ * traverse them from fibers.
+ */
+ implicit def cpsIterable[A, Repr](xs: IterableLike[A, Repr]) = new {
+
+ /**
+ * Provides fiber-compatible iteration functions.
+ */
+ def cps = new {
+ def foreach[B, T](f: A => Any @fiber[T]): Unit @fiber[T] = {
+ val it = xs.iterator
+ while (it.hasNext) f(it.next)
+ }
+ def map[B, That, T](f: A => B @fiber[T])(
+ implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = {
+ val b = cbf(xs.repr)
+ foreach(b += f(_))
+ b.result
+ }
+ def flatMap[B, That, T](f: A => GenTraversableOnce[B] @fiber[T])(
+ implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = {
+ val b = cbf(xs.repr)
+ for (x <- this) b ++= f(x)
+ b.result
+ }
+ }
+ }
+}
diff --git a/core/lib/FiberTypes.scala b/core/lib/FiberTypes.scala
new file mode 100644
index 0000000..35f6a42
--- /dev/null
+++ b/core/lib/FiberTypes.scala
@@ -0,0 +1,19 @@
+package eu.mulk.fibers
+
+import scala.language.implicitConversions
+import scala.util.continuations.cpsParam
+
+private[fibers] trait FiberTypes {
+ type Effect[_]
+
+ /**
+ * Annotates a computation as a fiber.
+ *
+ * Annotate the return type of a fibrational function with this type
+ * annotator.
+ *
+ * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber.
+ * @see [[Fiber.run]]
+ */
+ type fiber[Out] = cpsParam[Any, (Effect[Out], Any ⇒ Any)]
+}
diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala
new file mode 100644
index 0000000..66d4fca
--- /dev/null
+++ b/core/t/FiberSpec.scala
@@ -0,0 +1,180 @@
+package eu.mulk.fibers
+
+import minitest._
+import minitest.laws.Checkers
+import monix.eval.Task
+import monix.execution.Scheduler.Implicits.global
+import monix.reactive.Observable
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
+
+object FiberSpec extends SimpleTestSuite with Checkers {
+ import Fiber._
+
+ class FakeException extends RuntimeException
+
+ test("sanity") {
+ assert(0 != 1)
+ }
+
+ testAsync("can produce nothing") {
+ val o = run {}
+ for (empty ← o.isEmptyL) {
+ assert(empty)
+ }
+ }
+
+ testAsync("can produce a single value") {
+ val o = run[Int] {
+ emit(100)
+ }
+ for (l ← o.toListL) {
+ assert(l == List(100))
+ }
+ }
+
+ testAsync("can produce many values") {
+ val o = run[Int] {
+ for (x ← 1.to(10).cps)
+ emit(x)
+ }
+ for (l ← o.toListL) {
+ assertEquals(l, List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ }
+ }
+
+ testAsync("can throw exceptions before emitting values") {
+ val o = run[Int] {
+ throw new FakeException
+ }
+ for (t ← o.toListL.failed) {
+ assert(t.isInstanceOf[FakeException])
+ }
+ }
+
+ testAsync("can throw exceptions after emitting values") {
+ val o = run[Int] {
+ emit(100)
+ throw new FakeException
+ }
+ for (t ← o.toListL.failed) {
+ assert(t.isInstanceOf[FakeException])
+ }
+ }
+
+ testAsync("can await standard futures") {
+ val err = new FakeException
+ val f1 = Future(1)
+ val f2 = Future.failed(err)
+ val f3 = Future(3)
+ val o = run[Try[Int]] {
+ val y1 = await(f1)
+ emit(y1)
+ val y2 = await(f2)
+ emit(y2)
+ val y3 = await(f3)
+ emit(y3)
+ }
+ for (t ← o.toListL) {
+ assertEquals(t, List(Success(1), Failure(err), Success(3)))
+ }
+ }
+
+ testAsync("can run Monix tasks") {
+ val err = new FakeException
+ val f1 = Task.delay(1)
+ val f2 = Task.raiseError(err)
+ val f3 = Task.delay(3)
+ val o = run[Try[Int]] {
+ val y1 = await(f1)
+ emit(y1)
+ val y2 = await(f2)
+ emit(y2)
+ val y3 = await(f3)
+ emit(y3)
+ }
+ for (t ← o.toListL) {
+ assertEquals(t, List(Success(1), Failure(err), Success(3)))
+ }
+ }
+
+ testAsync("can spawn two fibers, one waiting for the other") {
+ val o1: Observable[Int] = run[Int] {
+ emit(1)
+ emit(2)
+ emit(3)
+ }
+ lazy val o2: Observable[Int] = run[Int] {
+ val Success(a1) = await(o1.drop(0).firstL)
+ emit(a1)
+ val Success(a2) = await(o1.drop(1).firstL)
+ emit(a2)
+ val Success(a3) = await(o1.drop(2).firstL)
+ emit(a3)
+ }
+ for (t2 ← o2.toListL) {
+ assertEquals(t2, List(1, 2, 3))
+ }
+ }
+
+ testAsync("can spawn two mutually waiting fibers") {
+ lazy val o1: Observable[Int] = run[Int] {
+ emit(1)
+ val Success(b1) = await(o2.firstL)
+ emit(b1 + 1)
+ val Success(b2) = await(o2.tail.firstL)
+ emit(b2 + 1)
+ }.cache
+ lazy val o2: Observable[Int] = run[Int] {
+ val Success(a1) = await(o1.firstL)
+ emit(a1 + 1)
+ val Success(a2) = await(o1.tail.firstL)
+ emit(a2 + 1)
+ }.cache
+ for (t1 ← o1.toListL;
+ t2 ← o2.toListL) {
+ assertEquals(t1, List(1, 3, 5))
+ assertEquals(t2, List(2, 4))
+ }
+ }
+
+ testAsync("can store fiber-local state") {
+ lazy val o = run {
+ putFiberVar(100)
+ emit(1)
+ val state = getFiberVar.asInstanceOf[Int]
+ emit(state)
+ }
+ for (l ← o.toListL) {
+ assertEquals(l, List(1, 100))
+ }
+ }
+
+ testAsync("can properly maintain state between two alternating fibers") {
+ val o1: Observable[Int] = run {
+ putFiberVar(2)
+ emit(1)
+ val a2 = getFiberVar
+ putFiberVar(3)
+ emit(a2)
+ val a3 = getFiberVar
+ emit(a3)
+ }
+ lazy val o2: Observable[Int] = run {
+ putFiberVar(100)
+ val Success(a1) = await(o1.drop(0).firstL)
+ emit(a1)
+ emit(getFiberVar)
+ val Success(a2) = await(o1.drop(1).firstL)
+ emit(a2)
+ emit(getFiberVar)
+ val Success(a3) = await(o1.drop(2).firstL)
+ emit(a3)
+ emit(getFiberVar)
+ }
+ for (t2 ← o2.toListL) {
+ assertEquals(t2, List(1, 100, 2, 100, 3, 100))
+ }
+ }
+}