diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/lib/Fiber.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 2d73f9e..42b416b 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -1,8 +1,8 @@ package eu.mulk.fibers import monix.eval.Task -import monix.execution.Scheduler -import monix.execution.cancelables.SingleAssignmentCancelable +import monix.execution.cancelables.MultiAssignmentCancelable +import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, OverflowStrategy} import scala.concurrent.{ExecutionContext, Future} @@ -129,11 +129,17 @@ object Fiber extends FiberConversions with FiberTypes { var state = new FiberState monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ - val cancelable = SingleAssignmentCancelable() + val cancelable = MultiAssignmentCancelable() def handle(init: (Effect[Out], Any ⇒ Any)): Unit = { var more = init var done = false + cancelable := Cancelable { () ⇒ + if (!done) { + done = true + out.onComplete() + } + } while (!done) { try { val (effect, continue) = more |