aboutsummaryrefslogtreecommitdiff
path: root/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'core/lib')
-rw-r--r--core/lib/Fiber.scala71
1 files changed, 39 insertions, 32 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
index 0e2ef39..2d73f9e 100644
--- a/core/lib/Fiber.scala
+++ b/core/lib/Fiber.scala
@@ -131,39 +131,46 @@ object Fiber extends FiberConversions with FiberTypes {
monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
val cancelable = SingleAssignmentCancelable()
- def handle(more: (Effect[Out], Any ⇒ Any)): Unit = {
- val (effect, continue) = more
- try {
- effect match {
- case Effect.AwaitTask(task, scheduler) ⇒
- task.asyncBoundary.runOnComplete({ result ⇒
- val k = continue(result)
- handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
- })(scheduler)
- case Effect.AwaitFuture(task, executionContext) ⇒
- task.onComplete({ result ⇒
- val k = continue(result)
- handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
- })(executionContext)
- case Effect.GetFiberVar ⇒
- val k = continue(state.fiberVar)
- handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
- case Effect.PutFiberVar(value) ⇒
- state.fiberVar = value
- val k = continue(())
- handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
- case Effect.Emit(returnValue) ⇒
- out.onNext(returnValue.asInstanceOf[Out])
- val k = continue(())
- handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
- case Effect.Fail(throwable) ⇒
- out.onError(throwable)
- case Effect.Finish ⇒
- out.onComplete()
+ def handle(init: (Effect[Out], Any ⇒ Any)): Unit = {
+ var more = init
+ var done = false
+ while (!done) {
+ try {
+ val (effect, continue) = more
+ effect match {
+ case Effect.AwaitTask(task, scheduler) ⇒
+ task.asyncBoundary.runOnComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(scheduler)
+ done = true
+ case Effect.AwaitFuture(task, executionContext) ⇒
+ task.onComplete({ result ⇒
+ val k = continue(result)
+ handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)])
+ })(executionContext)
+ done = true
+ case Effect.GetFiberVar ⇒
+ more = continue(state.fiberVar)
+ .asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case Effect.PutFiberVar(value) ⇒
+ state.fiberVar = value
+ more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case Effect.Emit(returnValue) ⇒
+ out.onNext(returnValue.asInstanceOf[Out])
+ more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case Effect.Fail(throwable) ⇒
+ out.onError(throwable)
+ done = true
+ case Effect.Finish ⇒
+ out.onComplete()
+ done = true
+ }
+ } catch {
+ case NonFatal(err) ⇒
+ out.onError(err)
+ done = true
}
- } catch {
- case NonFatal(err) ⇒
- out.onError(err)
}
}