From 30e0a8c249a2ff2fc451b33f1837b16d0fa9d419 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Wed, 10 Jan 2018 22:00:02 +0100 Subject: React to monix.execution.Ack.Stop signals properly. Previously, we just ignored the return value of Observer.Sync.onNext in the implementation of Fiber.run. Now we stop when we receive a Stop response. --- core/lib/Fiber.scala | 15 ++++++++------ core/t/FiberSpec.scala | 53 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 42b416b..bf55cf4 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -1,6 +1,7 @@ package eu.mulk.fibers import monix.eval.Task +import monix.execution.Ack.{Continue, Stop} import monix.execution.cancelables.MultiAssignmentCancelable import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, OverflowStrategy} @@ -135,10 +136,8 @@ object Fiber extends FiberConversions with FiberTypes { var more = init var done = false cancelable := Cancelable { () ⇒ - if (!done) { - done = true - out.onComplete() - } + out.onComplete() + done = true } while (!done) { try { @@ -163,8 +162,12 @@ object Fiber extends FiberConversions with FiberTypes { state.fiberVar = value more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] case Effect.Emit(returnValue) ⇒ - out.onNext(returnValue.asInstanceOf[Out]) - more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + out.onNext(returnValue.asInstanceOf[Out]) match { + case _: Continue ⇒ + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case _: Stop ⇒ + cancelable.cancel() + } case Effect.Fail(throwable) ⇒ out.onError(throwable) done = true diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala index 66d4fca..22a2eb9 100644 --- a/core/t/FiberSpec.scala +++ b/core/t/FiberSpec.scala @@ -3,8 +3,11 @@ package eu.mulk.fibers import minitest._ import minitest.laws.Checkers import monix.eval.Task +import monix.execution.Ack.{Continue, Stop} import monix.execution.Scheduler.Implicits.global -import monix.reactive.Observable +import monix.execution.{Ack, Scheduler} +import monix.reactive.observers.Subscriber +import monix.reactive.{Consumer, Observable} import scala.concurrent.Future import scala.util.{Failure, Success, Try} @@ -177,4 +180,52 @@ object FiberSpec extends SimpleTestSuite with Checkers { assertEquals(t2, List(1, 100, 2, 100, 3, 100)) } } + + testAsync("can be stopped") { + var f3executed = false + + val f1 = Task.delay(1) + val f2 = Task.delay(2) + val f3 = Task.delay({ f3executed = true; 3 }) + + val o: Observable[Int] = run { + val Success(y1) = await(f1) + emit(y1) + val Success(y2) = await(f2) + emit(y2) + val Success(y3) = await(f3) + emit(y3) + } + + val consumeTask = o.consumeWith(Consumer.create[Int, Int] { + (scheduler, cancelable, cb) ⇒ + new Subscriber.Sync[Int] { + var triggered = 0 + + override implicit def scheduler: Scheduler = + monix.execution.Scheduler.Implicits.global + + override def onNext(elem: Int): Ack = { + triggered += 1 + if (triggered >= 2) { + cb.onSuccess(triggered) + Stop + } else { + Continue + } + } + + override def onError(ex: Throwable): Unit = + cb.onError(ex) + + override def onComplete(): Unit = + () + } + }) + + for (result ← consumeTask) { + assertEquals(result, 2) + assert(!f3executed) + } + } } -- cgit v1.2.3