From 7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Wed, 10 Jan 2018 20:48:38 +0100 Subject: Make fibers cancellable. The Observable returned by Fiber.run now implements monix.execution.Cancelable properly and will finish execution of a fiber on the next shift point when its corresponding Observable is cancelled. --- core/lib/Fiber.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 2d73f9e..42b416b 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -1,8 +1,8 @@ package eu.mulk.fibers import monix.eval.Task -import monix.execution.Scheduler -import monix.execution.cancelables.SingleAssignmentCancelable +import monix.execution.cancelables.MultiAssignmentCancelable +import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, OverflowStrategy} import scala.concurrent.{ExecutionContext, Future} @@ -129,11 +129,17 @@ object Fiber extends FiberConversions with FiberTypes { var state = new FiberState monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ - val cancelable = SingleAssignmentCancelable() + val cancelable = MultiAssignmentCancelable() def handle(init: (Effect[Out], Any ⇒ Any)): Unit = { var more = init var done = false + cancelable := Cancelable { () ⇒ + if (!done) { + done = true + out.onComplete() + } + } while (!done) { try { val (effect, continue) = more -- cgit v1.2.3