package eu.mulk.fibers

import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.cancelables.SingleAssignmentCancelable
import monix.reactive.{Observable, OverflowStrategy}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.continuations.{reset, shift}
import scala.util.control.NonFatal

/**
  * Fiber-related functionality.
  */
object Fiber extends FiberConversions with FiberTypes {

  sealed trait Effect[-Out] { type Result }

  private[this] object Effect {

    case object Finish extends Effect[Any] {
      override type Result = Nothing
    }

    case class AwaitTask[T](task: Task[T], scheduler: Scheduler)
        extends Effect[Any] {
      override type Result = Try[T]
    }

    case class AwaitFuture[T](future: Future[T],
                              executionContext: ExecutionContext)
        extends Effect[Any] {
      override type Result = Try[T]
    }

    case class Fail(throwable: Throwable) extends Effect[Any] {
      override type Result = Nothing
    }

    case class Emit[-T](returnValue: T) extends Effect[T] {
      override type Result = Unit
    }

    case object GetFiberVar extends Effect[Any] {
      override type Result = Any
    }

    case class PutFiberVar(value: Any) extends Effect[Any] {
      override type Result = Unit
    }

  }

  private[this] final class FiberState {
    var fiberVar: Any = _
  }

  private[this] def perform[Out](
      effect: Effect[Out]): effect.Result @fiber[Out] = {
    val result = shift { (k: Any ⇒ Any) ⇒
      (effect, k)
    }
    result.asInstanceOf[effect.Result]
  }

  /**
    * Runs and awaits the completion of a [[monix.eval.Task]].
    *
    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception
    * instead, consider destructuring the return value:
    *
    *     val Success(x) = await(...)
    *
    * The supplied [[Scheduler]] is used to run the continuation of the fiber.
    */
  def await[T, Out](task: Task[T])(
      implicit scheduler: Scheduler): Try[T] @fiber[Out] =
    perform(Effect.AwaitTask(task, scheduler))

  /**
    * Awaits the completion of a [[scala.concurrent.Future]].
    *
    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception
    * instead, consider destructuring the return value:
    *
    *     val Success(x) = await(...)
    */
  def await[T, Out](fut: Future[T])(
      implicit ec: ExecutionContext): Try[T] @fiber[Out] =
    perform(Effect.AwaitFuture(fut, ec))

  /**
    * Emits a value to the output [[monix.reactive.Observable]].
    */
  def emit[Out](x: Out): Unit @fiber[Out] =
    perform(Effect.Emit(x))

  /**
    * Replaces the current fiber-local state variable.
    *
    * This can be used like a thread-local variable, but for fibers.
    *
    * @see [[getFiberVar]]
    */
  def putFiberVar[Out](x: Any): Unit @fiber[Out] =
    perform(Effect.PutFiberVar(x))

  /**
    * Gets the current fiber-local state variable.
    *
    * The fiber-local state must have been set with [[putFiberVar]] before.
    *
    * @see [[putFiberVar]]
    */
  def getFiberVar[Out]: Any @fiber[Out] =
    perform(Effect.GetFiberVar)

  /**
    * Runs a fiber.
    *
    * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber.
    * @return     a [[monix.reactive.Observable]] producing the objects
    *             [[Fiber.emit]]ted by the fiber.
    */
  def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = {
    var state = new FiberState

    monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
      val cancelable = SingleAssignmentCancelable()

      def handle(more: (Effect[Out], Any ⇒ Any)): Unit = {
        val (effect, continue) = more
        try {
          effect match {
            case Effect.AwaitTask(task, scheduler) ⇒
              task.asyncBoundary.runOnComplete({ result ⇒
                val k = continue(result)
                handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
              })(scheduler)
            case Effect.AwaitFuture(task, executionContext) ⇒
              task.onComplete({ result ⇒
                val k = continue(result)
                handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
              })(executionContext)
            case Effect.GetFiberVar ⇒
              val k = continue(state.fiberVar)
              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
            case Effect.PutFiberVar(value) ⇒
              state.fiberVar = value
              val k = continue(())
              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
            case Effect.Emit(returnValue) ⇒
              out.onNext(returnValue.asInstanceOf[Out])
              val k = continue(())
              handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
            case Effect.Fail(throwable) ⇒
              out.onError(throwable)
            case Effect.Finish ⇒
              out.onComplete()
          }
        } catch {
          case NonFatal(err) ⇒
            out.onError(err)
        }
      }

      try {
        val (effect, continue): (Effect[Out], Any ⇒ Any) = reset {
          thunk
          (Effect.Finish, (_: Any) ⇒ ???)
        }
        handle(effect, continue)
      } catch {
        case NonFatal(err) ⇒
          out.onError(err)
      }

      cancelable
    }
  }

}