aboutsummaryrefslogtreecommitdiff
path: root/core/lib/Fiber.scala
blob: f4edffa230c39037fc97242e9c94aed9e112f862 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package eu.mulk.fibers

import monix.eval.Task
import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.MultiAssignmentCancelable
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, OverflowStrategy}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.continuations.{reset, shift}
import scala.util.control.NonFatal

/**
  * Fiber-related functionality.
  */
object Fiber extends FiberConversions with FiberTypes {

  sealed trait Effect[-Out] { type Result }

  private[this] object Effect {

    case object Finish extends Effect[Any] {
      override type Result = Nothing
    }

    case class AwaitTask[T](task: Task[T], scheduler: Scheduler)
        extends Effect[Any] {
      override type Result = Try[T]
    }

    case class AwaitFuture[T](future: Future[T],
                              executionContext: ExecutionContext)
        extends Effect[Any] {
      override type Result = Try[T]
    }

    case class Fail(throwable: Throwable) extends Effect[Any] {
      override type Result = Nothing
    }

    case class Emit[-T](returnValue: T) extends Effect[T] {
      override type Result = Unit
    }

    case object GetFiberVar extends Effect[Any] {
      override type Result = Any
    }

    case class PutFiberVar(value: Any) extends Effect[Any] {
      override type Result = Unit
    }

  }

  private[this] final class FiberState {
    var fiberVar: Any = _
  }

  private[this] def perform[Out](
      effect: Effect[Out]): effect.Result @fiber[Out] = {
    val result = shift { (k: Any ⇒ Any) ⇒
      (effect, k)
    }
    result.asInstanceOf[effect.Result]
  }

  /**
    * Runs and awaits the completion of a [[monix.eval.Task]].
    *
    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception
    * instead, consider destructuring the return value:
    *
    *     val Success(x) = await(...)
    *
    * The supplied [[Scheduler]] is used to run the continuation of the fiber.
    */
  def await[T, Out](task: Task[T])(
      implicit scheduler: Scheduler): Try[T] @fiber[Out] =
    perform(Effect.AwaitTask(task, scheduler))

  /**
    * Awaits the completion of a [[scala.concurrent.Future]].
    *
    * Returns either a [[scala.util.Success]] or [[scala.util.Failure]] depending on
    * the result of the [[scala.concurrent.Future]].  If you prefer raising an exception
    * instead, consider destructuring the return value:
    *
    *     val Success(x) = await(...)
    */
  def await[T, Out](fut: Future[T])(
      implicit ec: ExecutionContext): Try[T] @fiber[Out] =
    perform(Effect.AwaitFuture(fut, ec))

  /**
    * Emits a value to the output [[monix.reactive.Observable]].
    */
  def emit[Out](x: Out): Unit @fiber[Out] =
    perform(Effect.Emit(x))

  /**
    * Replaces the current fiber-local state variable.
    *
    * This can be used like a thread-local variable, but for fibers.
    *
    * @see [[getFiberVar]]
    */
  def putFiberVar[Out](x: Any): Unit @fiber[Out] =
    perform(Effect.PutFiberVar(x))

  /**
    * Gets the current fiber-local state variable.
    *
    * The fiber-local state must have been set with [[putFiberVar]] before.
    *
    * @see [[putFiberVar]]
    */
  def getFiberVar[Out]: Any @fiber[Out] =
    perform(Effect.GetFiberVar)

  /**
    * Runs a fiber.
    *
    * @tparam Out the type of objects [[Fiber.emit]]ted by the fiber.
    * @return     a [[monix.reactive.Observable]] producing the objects
    *             [[Fiber.emit]]ted by the fiber.
    */
  def run[Out](thunk: ⇒ Unit @fiber[Out]): Observable[Out] = {
    val state = new FiberState

    monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
      val cancelable = MultiAssignmentCancelable()

      def handle(init: (Effect[Out], Any ⇒ Any)): Unit = {
        var more = init
        var done = false
        cancelable := Cancelable { () ⇒
          out.onComplete()
          done = true
        }
        while (!done) {
          try {
            val (effect, continue) = more
            effect match {
              case Effect.AwaitTask(task, scheduler) ⇒
                task.asyncBoundary.runOnComplete({ result ⇒
                  val k = continue(result)
                  handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
                })(scheduler)
                done = true
              case Effect.AwaitFuture(task, executionContext) ⇒
                task.onComplete({ result ⇒
                  val k = continue(result)
                  handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
                })(executionContext)
                done = true
              case Effect.GetFiberVar ⇒
                more = continue(state.fiberVar)
                  .asInstanceOf[(Effect[Out], Any ⇒ Any)]
              case Effect.PutFiberVar(value) ⇒
                state.fiberVar = value
                more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
              case Effect.Emit(returnValue) ⇒
                out.onNext(returnValue.asInstanceOf[Out]) match {
                  case _: Continue ⇒
                    more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
                  case _: Stop ⇒
                    cancelable.cancel()
                }
              case Effect.Fail(throwable) ⇒
                out.onError(throwable)
                done = true
              case Effect.Finish ⇒
                out.onComplete()
                done = true
            }
          } catch {
            case NonFatal(err) ⇒
              out.onError(err)
              done = true
          }
        }
      }

      try {
        val (effect, continue): (Effect[Out], Any ⇒ Any) = reset {
          thunk
          (Effect.Finish, (_: Any) ⇒ ???)
        }
        handle(effect, continue)
      } catch {
        case NonFatal(err) ⇒
          out.onError(err)
      }

      cancelable
    }
  }

}