diff options
author | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-10 20:48:38 +0100 |
---|---|---|
committer | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-10 20:48:38 +0100 |
commit | 7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (patch) | |
tree | 644b0b853de4b4eb242e7de9f518abce4a02f93c /core/lib | |
parent | aabb3b82757b715876fafe4d7ffaced183c04fd6 (diff) |
Make fibers cancellable.
The Observable returned by Fiber.run now implements
monix.execution.Cancelable properly and will finish execution of a
fiber on the next shift point when its corresponding Observable is
cancelled.
Diffstat (limited to 'core/lib')
-rw-r--r-- | core/lib/Fiber.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 2d73f9e..42b416b 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -1,8 +1,8 @@ package eu.mulk.fibers import monix.eval.Task -import monix.execution.Scheduler -import monix.execution.cancelables.SingleAssignmentCancelable +import monix.execution.cancelables.MultiAssignmentCancelable +import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, OverflowStrategy} import scala.concurrent.{ExecutionContext, Future} @@ -129,11 +129,17 @@ object Fiber extends FiberConversions with FiberTypes { var state = new FiberState monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ - val cancelable = SingleAssignmentCancelable() + val cancelable = MultiAssignmentCancelable() def handle(init: (Effect[Out], Any ⇒ Any)): Unit = { var more = init var done = false + cancelable := Cancelable { () ⇒ + if (!done) { + done = true + out.onComplete() + } + } while (!done) { try { val (effect, continue) = more |