aboutsummaryrefslogtreecommitdiff
path: root/core/t
diff options
context:
space:
mode:
authorMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:00:02 +0100
committerMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 22:08:01 +0100
commit30e0a8c249a2ff2fc451b33f1837b16d0fa9d419 (patch)
treec5f04616d78959f056ac998c8dec7c1c8efadbbc /core/t
parent7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (diff)
React to monix.execution.Ack.Stop signals properly.
Previously, we just ignored the return value of Observer.Sync.onNext in the implementation of Fiber.run. Now we stop when we receive a Stop response.
Diffstat (limited to 'core/t')
-rw-r--r--core/t/FiberSpec.scala53
1 files changed, 52 insertions, 1 deletions
diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala
index 66d4fca..22a2eb9 100644
--- a/core/t/FiberSpec.scala
+++ b/core/t/FiberSpec.scala
@@ -3,8 +3,11 @@ package eu.mulk.fibers
import minitest._
import minitest.laws.Checkers
import monix.eval.Task
+import monix.execution.Ack.{Continue, Stop}
import monix.execution.Scheduler.Implicits.global
-import monix.reactive.Observable
+import monix.execution.{Ack, Scheduler}
+import monix.reactive.observers.Subscriber
+import monix.reactive.{Consumer, Observable}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@@ -177,4 +180,52 @@ object FiberSpec extends SimpleTestSuite with Checkers {
assertEquals(t2, List(1, 100, 2, 100, 3, 100))
}
}
+
+ testAsync("can be stopped") {
+ var f3executed = false
+
+ val f1 = Task.delay(1)
+ val f2 = Task.delay(2)
+ val f3 = Task.delay({ f3executed = true; 3 })
+
+ val o: Observable[Int] = run {
+ val Success(y1) = await(f1)
+ emit(y1)
+ val Success(y2) = await(f2)
+ emit(y2)
+ val Success(y3) = await(f3)
+ emit(y3)
+ }
+
+ val consumeTask = o.consumeWith(Consumer.create[Int, Int] {
+ (scheduler, cancelable, cb) ⇒
+ new Subscriber.Sync[Int] {
+ var triggered = 0
+
+ override implicit def scheduler: Scheduler =
+ monix.execution.Scheduler.Implicits.global
+
+ override def onNext(elem: Int): Ack = {
+ triggered += 1
+ if (triggered >= 2) {
+ cb.onSuccess(triggered)
+ Stop
+ } else {
+ Continue
+ }
+ }
+
+ override def onError(ex: Throwable): Unit =
+ cb.onError(ex)
+
+ override def onComplete(): Unit =
+ ()
+ }
+ })
+
+ for (result ← consumeTask) {
+ assertEquals(result, 2)
+ assert(!f3executed)
+ }
+ }
}