aboutsummaryrefslogtreecommitdiff
package eu.mulk.fibers

import minitest._
import minitest.laws.Checkers
import monix.eval.Task
import monix.execution.Ack.{Continue, Stop}
import monix.execution.Scheduler.Implicits.global
import monix.execution.schedulers.TestScheduler
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.observers.Subscriber
import monix.reactive.{Consumer, Observable, Observer}

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object FiberSpec extends SimpleTestSuite with Checkers {
  import Fiber._

  class FakeException extends RuntimeException

  test("sanity") {
    assert(0 != 1)
  }

  testAsync("can produce nothing") {
    val o = run {}
    for (empty ← o.isEmptyL) {
      assert(empty)
    }
  }

  testAsync("can produce a single value") {
    val o = run[Int] {
      emit(100)
    }
    for (l ← o.toListL) {
      assert(l == List(100))
    }
  }

  testAsync("can produce many values") {
    val o = run[Int] {
      for (x ← 1.to(10).cps)
        emit(x)
    }
    for (l ← o.toListL) {
      assertEquals(l, List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    }
  }

  testAsync("can throw exceptions before emitting values") {
    val o = run[Int] {
      throw new FakeException
    }
    for (t ← o.toListL.failed) {
      assert(t.isInstanceOf[FakeException])
    }
  }

  testAsync("can throw exceptions after emitting values") {
    val o = run[Int] {
      emit(100)
      throw new FakeException
    }
    for (t ← o.toListL.failed) {
      assert(t.isInstanceOf[FakeException])
    }
  }

  testAsync("can await standard futures") {
    val err = new FakeException
    val f1 = Future(1)
    val f2 = Future.failed(err)
    val f3 = Future(3)
    val o = run[Try[Int]] {
      val y1 = await(f1)
      emit(y1)
      val y2 = await(f2)
      emit(y2)
      val y3 = await(f3)
      emit(y3)
    }
    for (t ← o.toListL) {
      assertEquals(t, List(Success(1), Failure(err), Success(3)))
    }
  }

  testAsync("can run Monix tasks") {
    val err = new FakeException
    val f1 = Task.delay(1)
    val f2 = Task.raiseError(err)
    val f3 = Task.delay(3)
    val o = run[Try[Int]] {
      val y1 = await(f1)
      emit(y1)
      val y2 = await(f2)
      emit(y2)
      val y3 = await(f3)
      emit(y3)
    }
    for (t ← o.toListL) {
      assertEquals(t, List(Success(1), Failure(err), Success(3)))
    }
  }

  testAsync("can spawn two fibers, one waiting for the other") {
    val o1: Observable[Int] = run[Int] {
      emit(1)
      emit(2)
      emit(3)
    }
    lazy val o2: Observable[Int] = run[Int] {
      val Success(a1) = await(o1.drop(0).firstL)
      emit(a1)
      val Success(a2) = await(o1.drop(1).firstL)
      emit(a2)
      val Success(a3) = await(o1.drop(2).firstL)
      emit(a3)
    }
    for (t2 ← o2.toListL) {
      assertEquals(t2, List(1, 2, 3))
    }
  }

  testAsync("can spawn two mutually waiting fibers") {
    lazy val o1: Observable[Int] = run[Int] {
      emit(1)
      val Success(b1) = await(o2.firstL)
      emit(b1 + 1)
      val Success(b2) = await(o2.tail.firstL)
      emit(b2 + 1)
    }.cache
    lazy val o2: Observable[Int] = run[Int] {
      val Success(a1) = await(o1.firstL)
      emit(a1 + 1)
      val Success(a2) = await(o1.tail.firstL)
      emit(a2 + 1)
    }.cache
    for (t1 ← o1.toListL;
         t2 ← o2.toListL) {
      assertEquals(t1, List(1, 3, 5))
      assertEquals(t2, List(2, 4))
    }
  }

  testAsync("can store fiber-local state") {
    lazy val o = run {
      putFiberVar(100)
      emit(1)
      val state = getFiberVar.asInstanceOf[Int]
      emit(state)
    }
    for (l ← o.toListL) {
      assertEquals(l, List(1, 100))
    }
  }

  testAsync("can properly maintain state between two alternating fibers") {
    val o1: Observable[Int] = run {
      putFiberVar(2)
      emit(1)
      val a2 = getFiberVar
      putFiberVar(3)
      emit(a2)
      val a3 = getFiberVar
      emit(a3)
    }
    lazy val o2: Observable[Int] = run {
      putFiberVar(100)
      val Success(a1) = await(o1.drop(0).firstL)
      emit(a1)
      emit(getFiberVar)
      val Success(a2) = await(o1.drop(1).firstL)
      emit(a2)
      emit(getFiberVar)
      val Success(a3) = await(o1.drop(2).firstL)
      emit(a3)
      emit(getFiberVar)
    }
    for (t2 ← o2.toListL) {
      assertEquals(t2, List(1, 100, 2, 100, 3, 100))
    }
  }

  testAsync("can be stopped") {
    var f3executed = false

    val f1 = Task.delay(1)
    val f2 = Task.delay(2)
    val f3 = Task.delay({ f3executed = true; 3 })

    val o: Observable[Int] = run {
      val Success(y1) = await(f1)
      emit(y1)
      val Success(y2) = await(f2)
      emit(y2)
      val Success(y3) = await(f3)
      emit(y3)
    }

    val consumeTask = o.consumeWith(Consumer.create[Int, Int] {
      (scheduler, cancelable, cb) ⇒
        new Subscriber.Sync[Int] {
          var triggered = 0

          override implicit def scheduler: Scheduler =
            monix.execution.Scheduler.Implicits.global

          override def onNext(elem: Int): Ack = {
            triggered += 1
            if (triggered >= 2) {
              cb.onSuccess(triggered)
              Stop
            } else {
              Continue
            }
          }

          override def onError(ex: Throwable): Unit =
            cb.onError(ex)

          override def onComplete(): Unit =
            ()
        }
    })

    for (result ← consumeTask) {
      assertEquals(result, 2)
      assert(!f3executed)
    }
  }

  test("can be cancelled") {
    implicit val sched: TestScheduler = TestScheduler()

    var f3executed = false

    val f1 = Task.delay(1)
    val f2 = Task.delay(2)
    val f3 = Task.delay({ f3executed = true; 3 })

    var triggered = 0
    val observer = new Observer[Int] {
      def onNext(elem: Int): Future[Ack] = {
        triggered += 1
        Continue
      }
      def onError(ex: Throwable): Unit = ()
      def onComplete(): Unit = ()
    }

    var subscription: Cancelable = null

    val o: Observable[Int] = run {
      val Success(y1) = await(f1)(sched)
      emit(y1)
      subscription.cancel()
      val Success(y2) = await(f2)(sched)
      emit(y2)
      val Success(y3) = await(f3)(sched)
      emit(y3)
    }

    subscription = o.subscribe(observer)(sched)

    assertEquals(triggered, 0)
    sched.tick()
    assertEquals(triggered, 1)
  }
}