diff options
Diffstat (limited to 'core/lib')
-rw-r--r-- | core/lib/Fiber.scala | 71 |
1 files changed, 39 insertions, 32 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 0e2ef39..2d73f9e 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -131,39 +131,46 @@ object Fiber extends FiberConversions with FiberTypes { monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ val cancelable = SingleAssignmentCancelable() - def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { - val (effect, continue) = more - try { - effect match { - case Effect.AwaitTask(task, scheduler) ⇒ - task.asyncBoundary.runOnComplete({ result ⇒ - val k = continue(result) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - })(scheduler) - case Effect.AwaitFuture(task, executionContext) ⇒ - task.onComplete({ result ⇒ - val k = continue(result) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - })(executionContext) - case Effect.GetFiberVar ⇒ - val k = continue(state.fiberVar) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.PutFiberVar(value) ⇒ - state.fiberVar = value - val k = continue(()) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.Emit(returnValue) ⇒ - out.onNext(returnValue.asInstanceOf[Out]) - val k = continue(()) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.Fail(throwable) ⇒ - out.onError(throwable) - case Effect.Finish ⇒ - out.onComplete() + def handle(init: (Effect[Out], Any ⇒ Any)): Unit = { + var more = init + var done = false + while (!done) { + try { + val (effect, continue) = more + effect match { + case Effect.AwaitTask(task, scheduler) ⇒ + task.asyncBoundary.runOnComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(scheduler) + done = true + case Effect.AwaitFuture(task, executionContext) ⇒ + task.onComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(executionContext) + done = true + case Effect.GetFiberVar ⇒ + more = continue(state.fiberVar) + .asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.PutFiberVar(value) ⇒ + state.fiberVar = value + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.Emit(returnValue) ⇒ + out.onNext(returnValue.asInstanceOf[Out]) + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.Fail(throwable) ⇒ + out.onError(throwable) + done = true + case Effect.Finish ⇒ + out.onComplete() + done = true + } + } catch { + case NonFatal(err) ⇒ + out.onError(err) + done = true } - } catch { - case NonFatal(err) ⇒ - out.onError(err) } } |