diff options
author | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-10 22:00:02 +0100 |
---|---|---|
committer | Matthias Andreas Benkard <code@mail.matthias.benkard.de> | 2018-01-10 22:08:01 +0100 |
commit | 30e0a8c249a2ff2fc451b33f1837b16d0fa9d419 (patch) | |
tree | c5f04616d78959f056ac998c8dec7c1c8efadbbc /core/lib | |
parent | 7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (diff) |
React to monix.execution.Ack.Stop signals properly.
Previously, we just ignored the return value of Observer.Sync.onNext
in the implementation of Fiber.run. Now we stop when we receive a
Stop response.
Diffstat (limited to 'core/lib')
-rw-r--r-- | core/lib/Fiber.scala | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 42b416b..bf55cf4 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -1,6 +1,7 @@ package eu.mulk.fibers import monix.eval.Task +import monix.execution.Ack.{Continue, Stop} import monix.execution.cancelables.MultiAssignmentCancelable import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, OverflowStrategy} @@ -135,10 +136,8 @@ object Fiber extends FiberConversions with FiberTypes { var more = init var done = false cancelable := Cancelable { () ⇒ - if (!done) { - done = true - out.onComplete() - } + out.onComplete() + done = true } while (!done) { try { @@ -163,8 +162,12 @@ object Fiber extends FiberConversions with FiberTypes { state.fiberVar = value more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] case Effect.Emit(returnValue) ⇒ - out.onNext(returnValue.asInstanceOf[Out]) - more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + out.onNext(returnValue.asInstanceOf[Out]) match { + case _: Continue ⇒ + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case _: Stop ⇒ + cancelable.cancel() + } case Effect.Fail(throwable) ⇒ out.onError(throwable) done = true |