aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/t/FiberSpec.scala43
1 files changed, 41 insertions, 2 deletions
diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala
index 22a2eb9..63e9df0 100644
--- a/core/t/FiberSpec.scala
+++ b/core/t/FiberSpec.scala
@@ -5,9 +5,10 @@ import minitest.laws.Checkers
import monix.eval.Task
import monix.execution.Ack.{Continue, Stop}
import monix.execution.Scheduler.Implicits.global
-import monix.execution.{Ack, Scheduler}
+import monix.execution.schedulers.TestScheduler
+import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.observers.Subscriber
-import monix.reactive.{Consumer, Observable}
+import monix.reactive.{Consumer, Observable, Observer}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@@ -228,4 +229,42 @@ object FiberSpec extends SimpleTestSuite with Checkers {
assert(!f3executed)
}
}
+
+ test("can be cancelled") {
+ implicit val sched: TestScheduler = TestScheduler()
+
+ var f3executed = false
+
+ val f1 = Task.delay(1)
+ val f2 = Task.delay(2)
+ val f3 = Task.delay({ f3executed = true; 3 })
+
+ var triggered = 0
+ val observer = new Observer[Int] {
+ def onNext(elem: Int): Future[Ack] = {
+ triggered += 1
+ Continue
+ }
+ def onError(ex: Throwable): Unit = ()
+ def onComplete(): Unit = ()
+ }
+
+ var subscription: Cancelable = null
+
+ val o: Observable[Int] = run {
+ val Success(y1) = await(f1)(sched)
+ emit(y1)
+ subscription.cancel()
+ val Success(y2) = await(f2)(sched)
+ emit(y2)
+ val Success(y3) = await(f3)(sched)
+ emit(y3)
+ }
+
+ subscription = o.subscribe(observer)(sched)
+
+ assertEquals(triggered, 0)
+ sched.tick()
+ assertEquals(triggered, 1)
+ }
}