From 6b620da09b9f05699c1cf6080de229e63de47942 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Sun, 7 Jan 2018 21:51:22 +0100 Subject: Add initial code and project files. --- .gitignore | 4 + README.md | 2 + build.sbt | 49 +++++++++++ core/lib/Fiber.scala | 185 ++++++++++++++++++++++++++++++++++++++++ core/lib/FiberConversions.scala | 46 ++++++++++ core/lib/FiberTypes.scala | 19 +++++ core/t/FiberSpec.scala | 180 ++++++++++++++++++++++++++++++++++++++ project/build.properties | 1 + project/revolver.sbt | 1 + project/scalajs.sbt | 2 + 10 files changed, 489 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 build.sbt create mode 100644 core/lib/Fiber.scala create mode 100644 core/lib/FiberConversions.scala create mode 100644 core/lib/FiberTypes.scala create mode 100644 core/t/FiberSpec.scala create mode 100644 project/build.properties create mode 100644 project/revolver.sbt create mode 100644 project/scalajs.sbt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..af8d8fe --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +target +*~ + diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc1b258 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# Fibers for Scala + diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..43fd17a --- /dev/null +++ b/build.sbt @@ -0,0 +1,49 @@ +name := "fibers" +mainClass in Compile := None + +inThisBuild(Seq( + scalaOrganization := "org.typelevel", + scalaVersion := "2.12.4-bin-typelevel-4", + + version := "0.1.0-SNAPSHOT", + organization := "eu.mulk", + + run / fork := true, + cancelable := true, + + scalacOptions ++= Seq( + "-deprecation", + ), +)) + +lazy val core = (project in file("core")) + .settings( + name := "fibers-core", + + scalaSource in Compile := baseDirectory.value / "lib", + scalaSource in Test := baseDirectory.value / "t", + + scalacOptions += "-Yliteral-types", + + scalacOptions ++= Seq( + "-Ypartial-unification", + ), + + // Continuations + addCompilerPlugin("org.scala-lang.plugins" % "scala-continuations-plugin_2.12.2" % "1.0.3"), + libraryDependencies += "org.scala-lang.plugins" %% "scala-continuations-library" % "1.0.3", + scalacOptions += "-P:continuations:enable", + + // Monix + libraryDependencies ++= Seq( + "io.monix" %% "monix" % "2.3.0", + "io.monix" %% "monix-cats" % "2.3.0", + ), + + // Minitest + libraryDependencies ++= Seq( + "io.monix" %% "minitest" % "2.0.0" % "test", + "io.monix" %% "minitest-laws" % "2.0.0" % "test", + ), + testFrameworks += new TestFramework("minitest.runner.Framework"), + ) diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala new file mode 100644 index 0000000..0e2ef39 --- /dev/null +++ b/core/lib/Fiber.scala @@ -0,0 +1,185 @@ +package eu.mulk.fibers + +import monix.eval.Task +import monix.execution.Scheduler +import monix.execution.cancelables.SingleAssignmentCancelable +import monix.reactive.{Observable, OverflowStrategy} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scala.util.continuations.{reset, shift} +import scala.util.control.NonFatal + +/** + * Fiber-related functionality. + */ +object Fiber extends FiberConversions with FiberTypes { + + sealed trait Effect[-Out] { type Result } + + private[this] object Effect { + + case object Finish extends Effect[Any] { + override type Result = Nothing + } + + case class AwaitTask[T](task: Task[T], scheduler: Scheduler) + extends Effect[Any] { + override type Result = Try[T] + } + + case class AwaitFuture[T](future: Future[T], + executionContext: ExecutionContext) + extends Effect[Any] { + override type Result = Try[T] + } + + case class Fail(throwable: Throwable) extends Effect[Any] { + override type Result = Nothing + } + + case class Emit[-T](returnValue: T) extends Effect[T] { + override type Result = Unit + } + + case object GetFiberVar extends Effect[Any] { + override type Result = Any + } + + case class PutFiberVar(value: Any) extends Effect[Any] { + override type Result = Unit + } + + } + + private[this] final class FiberState { + var fiberVar: Any = _ + } + + private[this] def perform[Out]( + effect: Effect[Out]): effect.Result @fiber[Out] = { + val result = shift { (k: Any ⇒ Any) ⇒ + (effect, k) + } + result.asInstanceOf[effect.Result] + } + + /** + * Runs and awaits the completion of a [[monix.eval.Task]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + * + * The supplied [[Scheduler]] is used to run the continuation of the fiber. + */ + def await[T, Out](task: Task[T])( + implicit scheduler: Scheduler): Try[T] @fiber[Out] = + perform(Effect.AwaitTask(task, scheduler)) + + /** + * Awaits the completion of a [[scala.concurrent.Future]]. + * + * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on + * the result of the [[scala.concurrent.Future]]. If you prefer raising an exception + * instead, consider destructuring the return value: + * + * val Success(x) = await(...) + */ + def await[T, Out](fut: Future[T])( + implicit ec: ExecutionContext): Try[T] @fiber[Out] = + perform(Effect.AwaitFuture(fut, ec)) + + /** + * Emits a value to the output [[monix.reactive.Observable]]. + */ + def emit[Out](x: Out): Unit @fiber[Out] = + perform(Effect.Emit(x)) + + /** + * Replaces the current fiber-local state variable. + * + * This can be used like a thread-local variable, but for fibers. + * + * @see [[getFiberVar]] + */ + def putFiberVar[Out](x: Any): Unit @fiber[Out] = + perform(Effect.PutFiberVar(x)) + + /** + * Gets the current fiber-local state variable. + * + * The fiber-local state must have been set with [[putFiberVar]] before. + * + * @see [[putFiberVar]] + */ + def getFiberVar[Out]: Any @fiber[Out] = + perform(Effect.GetFiberVar) + + /** + * Runs a fiber. + * + * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. + * @return a [[monix.reactive.Observable]] producing the objects + * [[Fiber.emit]]ted by the fiber. + */ + def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = { + var state = new FiberState + + monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ + val cancelable = SingleAssignmentCancelable() + + def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { + val (effect, continue) = more + try { + effect match { + case Effect.AwaitTask(task, scheduler) ⇒ + task.asyncBoundary.runOnComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(scheduler) + case Effect.AwaitFuture(task, executionContext) ⇒ + task.onComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(executionContext) + case Effect.GetFiberVar ⇒ + val k = continue(state.fiberVar) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.PutFiberVar(value) ⇒ + state.fiberVar = value + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Emit(returnValue) ⇒ + out.onNext(returnValue.asInstanceOf[Out]) + val k = continue(()) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + case Effect.Fail(throwable) ⇒ + out.onError(throwable) + case Effect.Finish ⇒ + out.onComplete() + } + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + } + + try { + val (effect, continue): (Effect[Out], Any ⇒ Any) = reset { + thunk + (Effect.Finish, (_: Any) ⇒ ???) + } + handle(effect, continue) + } catch { + case NonFatal(err) ⇒ + out.onError(err) + } + + cancelable + } + } + +} diff --git a/core/lib/FiberConversions.scala b/core/lib/FiberConversions.scala new file mode 100644 index 0000000..fe79796 --- /dev/null +++ b/core/lib/FiberConversions.scala @@ -0,0 +1,46 @@ +package eu.mulk.fibers + +import scala.collection.generic.CanBuildFrom +import scala.collection.{GenTraversableOnce, IterableLike} +import scala.language.implicitConversions + +/** + * Implicit conversions for convenient use of containers from within + * fibers. + */ +private[fibers] trait FiberConversions extends FiberTypes { + // + // This is based on code by James Earl Dougles, taken from: + // + // https://earldouglas.com/posts/monadic-continuations.html + // + + /** + * Defines the `cps` helper on [[IterableLike]] objects, which you can call to + * traverse them from fibers. + */ + implicit def cpsIterable[A, Repr](xs: IterableLike[A, Repr]) = new { + + /** + * Provides fiber-compatible iteration functions. + */ + def cps = new { + def foreach[B, T](f: A => Any @fiber[T]): Unit @fiber[T] = { + val it = xs.iterator + while (it.hasNext) f(it.next) + } + def map[B, That, T](f: A => B @fiber[T])( + implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { + val b = cbf(xs.repr) + foreach(b += f(_)) + b.result + } + def flatMap[B, That, T](f: A => GenTraversableOnce[B] @fiber[T])( + implicit cbf: CanBuildFrom[Repr, B, That]): That @fiber[T] = { + val b = cbf(xs.repr) + for (x <- this) b ++= f(x) + b.result + } + } + } +} diff --git a/core/lib/FiberTypes.scala b/core/lib/FiberTypes.scala new file mode 100644 index 0000000..35f6a42 --- /dev/null +++ b/core/lib/FiberTypes.scala @@ -0,0 +1,19 @@ +package eu.mulk.fibers + +import scala.language.implicitConversions +import scala.util.continuations.cpsParam + +private[fibers] trait FiberTypes { + type Effect[_] + + /** + * Annotates a computation as a fiber. + * + * Annotate the return type of a fibrational function with this type + * annotator. + * + * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber. + * @see [[Fiber.run]] + */ + type fiber[Out] = cpsParam[Any, (Effect[Out], Any ⇒ Any)] +} diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala new file mode 100644 index 0000000..66d4fca --- /dev/null +++ b/core/t/FiberSpec.scala @@ -0,0 +1,180 @@ +package eu.mulk.fibers + +import minitest._ +import minitest.laws.Checkers +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.reactive.Observable + +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + +object FiberSpec extends SimpleTestSuite with Checkers { + import Fiber._ + + class FakeException extends RuntimeException + + test("sanity") { + assert(0 != 1) + } + + testAsync("can produce nothing") { + val o = run {} + for (empty ← o.isEmptyL) { + assert(empty) + } + } + + testAsync("can produce a single value") { + val o = run[Int] { + emit(100) + } + for (l ← o.toListL) { + assert(l == List(100)) + } + } + + testAsync("can produce many values") { + val o = run[Int] { + for (x ← 1.to(10).cps) + emit(x) + } + for (l ← o.toListL) { + assertEquals(l, List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + } + } + + testAsync("can throw exceptions before emitting values") { + val o = run[Int] { + throw new FakeException + } + for (t ← o.toListL.failed) { + assert(t.isInstanceOf[FakeException]) + } + } + + testAsync("can throw exceptions after emitting values") { + val o = run[Int] { + emit(100) + throw new FakeException + } + for (t ← o.toListL.failed) { + assert(t.isInstanceOf[FakeException]) + } + } + + testAsync("can await standard futures") { + val err = new FakeException + val f1 = Future(1) + val f2 = Future.failed(err) + val f3 = Future(3) + val o = run[Try[Int]] { + val y1 = await(f1) + emit(y1) + val y2 = await(f2) + emit(y2) + val y3 = await(f3) + emit(y3) + } + for (t ← o.toListL) { + assertEquals(t, List(Success(1), Failure(err), Success(3))) + } + } + + testAsync("can run Monix tasks") { + val err = new FakeException + val f1 = Task.delay(1) + val f2 = Task.raiseError(err) + val f3 = Task.delay(3) + val o = run[Try[Int]] { + val y1 = await(f1) + emit(y1) + val y2 = await(f2) + emit(y2) + val y3 = await(f3) + emit(y3) + } + for (t ← o.toListL) { + assertEquals(t, List(Success(1), Failure(err), Success(3))) + } + } + + testAsync("can spawn two fibers, one waiting for the other") { + val o1: Observable[Int] = run[Int] { + emit(1) + emit(2) + emit(3) + } + lazy val o2: Observable[Int] = run[Int] { + val Success(a1) = await(o1.drop(0).firstL) + emit(a1) + val Success(a2) = await(o1.drop(1).firstL) + emit(a2) + val Success(a3) = await(o1.drop(2).firstL) + emit(a3) + } + for (t2 ← o2.toListL) { + assertEquals(t2, List(1, 2, 3)) + } + } + + testAsync("can spawn two mutually waiting fibers") { + lazy val o1: Observable[Int] = run[Int] { + emit(1) + val Success(b1) = await(o2.firstL) + emit(b1 + 1) + val Success(b2) = await(o2.tail.firstL) + emit(b2 + 1) + }.cache + lazy val o2: Observable[Int] = run[Int] { + val Success(a1) = await(o1.firstL) + emit(a1 + 1) + val Success(a2) = await(o1.tail.firstL) + emit(a2 + 1) + }.cache + for (t1 ← o1.toListL; + t2 ← o2.toListL) { + assertEquals(t1, List(1, 3, 5)) + assertEquals(t2, List(2, 4)) + } + } + + testAsync("can store fiber-local state") { + lazy val o = run { + putFiberVar(100) + emit(1) + val state = getFiberVar.asInstanceOf[Int] + emit(state) + } + for (l ← o.toListL) { + assertEquals(l, List(1, 100)) + } + } + + testAsync("can properly maintain state between two alternating fibers") { + val o1: Observable[Int] = run { + putFiberVar(2) + emit(1) + val a2 = getFiberVar + putFiberVar(3) + emit(a2) + val a3 = getFiberVar + emit(a3) + } + lazy val o2: Observable[Int] = run { + putFiberVar(100) + val Success(a1) = await(o1.drop(0).firstL) + emit(a1) + emit(getFiberVar) + val Success(a2) = await(o1.drop(1).firstL) + emit(a2) + emit(getFiberVar) + val Success(a3) = await(o1.drop(2).firstL) + emit(a3) + emit(getFiberVar) + } + for (t2 ← o2.toListL) { + assertEquals(t2, List(1, 100, 2, 100, 3, 100)) + } + } +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..670b792 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.1.0-RC4 diff --git a/project/revolver.sbt b/project/revolver.sbt new file mode 100644 index 0000000..d4312f7 --- /dev/null +++ b/project/revolver.sbt @@ -0,0 +1 @@ +addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") diff --git a/project/scalajs.sbt b/project/scalajs.sbt new file mode 100644 index 0000000..c3db196 --- /dev/null +++ b/project/scalajs.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.21") + -- cgit v1.2.3