aboutsummaryrefslogtreecommitdiff
path: root/core/lib
diff options
context:
space:
mode:
authorMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:00:02 +0100
committerMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:08:01 +0100
commit30e0a8c249a2ff2fc451b33f1837b16d0fa9d419 (patch)
treec5f04616d78959f056ac998c8dec7c1c8efadbbc /core/lib
parent7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (diff)
React to monix.execution.Ack.Stop signals properly.
Previously, we just ignored the return value of Observer.Sync.onNext in the implementation of Fiber.run. Now we stop when we receive a Stop response.
Diffstat (limited to 'core/lib')
-rw-r--r--core/lib/Fiber.scala15
1 files changed, 9 insertions, 6 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
index 42b416b..bf55cf4 100644
--- a/core/lib/Fiber.scala
+++ b/core/lib/Fiber.scala
@@ -1,6 +1,7 @@
package eu.mulk.fibers
import monix.eval.Task
+import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.MultiAssignmentCancelable
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, OverflowStrategy}
@@ -135,10 +136,8 @@ object Fiber extends FiberConversions with FiberTypes {
var more = init
var done = false
cancelable := Cancelable { () ⇒
- if (!done) {
- done = true
- out.onComplete()
- }
+ out.onComplete()
+ done = true
}
while (!done) {
try {
@@ -163,8 +162,12 @@ object Fiber extends FiberConversions with FiberTypes {
state.fiberVar = value
more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
case Effect.Emit(returnValue) ⇒
- out.onNext(returnValue.asInstanceOf[Out])
- more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ out.onNext(returnValue.asInstanceOf[Out]) match {
+ case _: Continue ⇒
+ more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case _: Stop ⇒
+ cancelable.cancel()
+ }
case Effect.Fail(throwable) ⇒
out.onError(throwable)
done = true