diff options
Diffstat (limited to 'core/t')
| -rw-r--r-- | core/t/FiberSpec.scala | 53 | 
1 files changed, 52 insertions, 1 deletions
| diff --git a/core/t/FiberSpec.scala b/core/t/FiberSpec.scala index 66d4fca..22a2eb9 100644 --- a/core/t/FiberSpec.scala +++ b/core/t/FiberSpec.scala @@ -3,8 +3,11 @@ package eu.mulk.fibers  import minitest._  import minitest.laws.Checkers  import monix.eval.Task +import monix.execution.Ack.{Continue, Stop}  import monix.execution.Scheduler.Implicits.global -import monix.reactive.Observable +import monix.execution.{Ack, Scheduler} +import monix.reactive.observers.Subscriber +import monix.reactive.{Consumer, Observable}  import scala.concurrent.Future  import scala.util.{Failure, Success, Try} @@ -177,4 +180,52 @@ object FiberSpec extends SimpleTestSuite with Checkers {        assertEquals(t2, List(1, 100, 2, 100, 3, 100))      }    } + +  testAsync("can be stopped") { +    var f3executed = false + +    val f1 = Task.delay(1) +    val f2 = Task.delay(2) +    val f3 = Task.delay({ f3executed = true; 3 }) + +    val o: Observable[Int] = run { +      val Success(y1) = await(f1) +      emit(y1) +      val Success(y2) = await(f2) +      emit(y2) +      val Success(y3) = await(f3) +      emit(y3) +    } + +    val consumeTask = o.consumeWith(Consumer.create[Int, Int] { +      (scheduler, cancelable, cb) ⇒ +        new Subscriber.Sync[Int] { +          var triggered = 0 + +          override implicit def scheduler: Scheduler = +            monix.execution.Scheduler.Implicits.global + +          override def onNext(elem: Int): Ack = { +            triggered += 1 +            if (triggered >= 2) { +              cb.onSuccess(triggered) +              Stop +            } else { +              Continue +            } +          } + +          override def onError(ex: Throwable): Unit = +            cb.onError(ex) + +          override def onComplete(): Unit = +            () +        } +    }) + +    for (result ← consumeTask) { +      assertEquals(result, 2) +      assert(!f3executed) +    } +  }  } | 
