diff options
-rw-r--r-- | core/t/FiberSpec.scala | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala index 22a2eb9..63e9df0 100644 --- a/core/t/FiberSpec.scala +++ b/core/t/FiberSpec.scala @@ -5,9 +5,10 @@ import minitest.laws.Checkers import monix.eval.Task import monix.execution.Ack.{Continue, Stop} import monix.execution.Scheduler.Implicits.global -import monix.execution.{Ack, Scheduler} +import monix.execution.schedulers.TestScheduler +import monix.execution.{Ack, Cancelable, Scheduler} import monix.reactive.observers.Subscriber -import monix.reactive.{Consumer, Observable} +import monix.reactive.{Consumer, Observable, Observer} import scala.concurrent.Future import scala.util.{Failure, Success, Try} @@ -228,4 +229,42 @@ object FiberSpec extends SimpleTestSuite with Checkers { assert(!f3executed) } } + + test("can be cancelled") { + implicit val sched: TestScheduler = TestScheduler() + + var f3executed = false + + val f1 = Task.delay(1) + val f2 = Task.delay(2) + val f3 = Task.delay({ f3executed = true; 3 }) + + var triggered = 0 + val observer = new Observer[Int] { + def onNext(elem: Int): Future[Ack] = { + triggered += 1 + Continue + } + def onError(ex: Throwable): Unit = () + def onComplete(): Unit = () + } + + var subscription: Cancelable = null + + val o: Observable[Int] = run { + val Success(y1) = await(f1)(sched) + emit(y1) + subscription.cancel() + val Success(y2) = await(f2)(sched) + emit(y2) + val Success(y3) = await(f3)(sched) + emit(y3) + } + + subscription = o.subscribe(observer)(sched) + + assertEquals(triggered, 0) + sched.tick() + assertEquals(triggered, 1) + } } |