aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:00:02 +0100
committerMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:08:01 +0100
commit30e0a8c249a2ff2fc451b33f1837b16d0fa9d419 (patch)
treec5f04616d78959f056ac998c8dec7c1c8efadbbc
parent7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (diff)
React to monix.execution.Ack.Stop signals properly.
Previously, we just ignored the return value of Observer.Sync.onNext in the implementation of Fiber.run. Now we stop when we receive a Stop response.
-rw-r--r--core/lib/Fiber.scala15
-rw-r--r--core/t/FiberSpec.scala53
2 files changed, 61 insertions, 7 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
index 42b416b..bf55cf4 100644
--- a/core/lib/Fiber.scala
+++ b/core/lib/Fiber.scala
@@ -1,6 +1,7 @@
package eu.mulk.fibers
import monix.eval.Task
+import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.MultiAssignmentCancelable
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, OverflowStrategy}
@@ -135,10 +136,8 @@ object Fiber extends FiberConversions with FiberTypes {
var more = init
var done = false
cancelable := Cancelable { () ⇒
- if (!done) {
- done = true
- out.onComplete()
- }
+ out.onComplete()
+ done = true
}
while (!done) {
try {
@@ -163,8 +162,12 @@ object Fiber extends FiberConversions with FiberTypes {
state.fiberVar = value
more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
case Effect.Emit(returnValue) ⇒
- out.onNext(returnValue.asInstanceOf[Out])
- more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ out.onNext(returnValue.asInstanceOf[Out]) match {
+ case _: Continue ⇒
+ more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case _: Stop ⇒
+ cancelable.cancel()
+ }
case Effect.Fail(throwable) ⇒
out.onError(throwable)
done = true
diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala
index 66d4fca..22a2eb9 100644
--- a/core/t/FiberSpec.scala
+++ b/core/t/FiberSpec.scala
@@ -3,8 +3,11 @@ package eu.mulk.fibers
import minitest._
import minitest.laws.Checkers
import monix.eval.Task
+import monix.execution.Ack.{Continue, Stop}
import monix.execution.Scheduler.Implicits.global
-import monix.reactive.Observable
+import monix.execution.{Ack, Scheduler}
+import monix.reactive.observers.Subscriber
+import monix.reactive.{Consumer, Observable}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@@ -177,4 +180,52 @@ object FiberSpec extends SimpleTestSuite with Checkers {
assertEquals(t2, List(1, 100, 2, 100, 3, 100))
}
}
+
+ testAsync("can be stopped") {
+ var f3executed = false
+
+ val f1 = Task.delay(1)
+ val f2 = Task.delay(2)
+ val f3 = Task.delay({ f3executed = true; 3 })
+
+ val o: Observable[Int] = run {
+ val Success(y1) = await(f1)
+ emit(y1)
+ val Success(y2) = await(f2)
+ emit(y2)
+ val Success(y3) = await(f3)
+ emit(y3)
+ }
+
+ val consumeTask = o.consumeWith(Consumer.create[Int, Int] {
+ (scheduler, cancelable, cb) ⇒
+ new Subscriber.Sync[Int] {
+ var triggered = 0
+
+ override implicit def scheduler: Scheduler =
+ monix.execution.Scheduler.Implicits.global
+
+ override def onNext(elem: Int): Ack = {
+ triggered += 1
+ if (triggered >= 2) {
+ cb.onSuccess(triggered)
+ Stop
+ } else {
+ Continue
+ }
+ }
+
+ override def onError(ex: Throwable): Unit =
+ cb.onError(ex)
+
+ override def onComplete(): Unit =
+ ()
+ }
+ })
+
+ for (result ← consumeTask) {
+ assertEquals(result, 2)
+ assert(!f3executed)
+ }
+ }
}