From aabb3b82757b715876fafe4d7ffaced183c04fd6 Mon Sep 17 00:00:00 2001 From: Matthias Andreas Benkard Date: Wed, 10 Jan 2018 20:46:29 +0100 Subject: Make Fiber.run stack-safe. Previously, Fiber.run was implemented with non-tail recursion, which would blow the stack after too many effects were invoked. It is now a stack-safe loop. --- core/lib/Fiber.scala | 71 +++++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 32 deletions(-) (limited to 'core') diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala index 0e2ef39..2d73f9e 100644 --- a/core/lib/Fiber.scala +++ b/core/lib/Fiber.scala @@ -131,39 +131,46 @@ object Fiber extends FiberConversions with FiberTypes { monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒ val cancelable = SingleAssignmentCancelable() - def handle(more: (Effect[Out], Any ⇒ Any)): Unit = { - val (effect, continue) = more - try { - effect match { - case Effect.AwaitTask(task, scheduler) ⇒ - task.asyncBoundary.runOnComplete({ result ⇒ - val k = continue(result) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - })(scheduler) - case Effect.AwaitFuture(task, executionContext) ⇒ - task.onComplete({ result ⇒ - val k = continue(result) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - })(executionContext) - case Effect.GetFiberVar ⇒ - val k = continue(state.fiberVar) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.PutFiberVar(value) ⇒ - state.fiberVar = value - val k = continue(()) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.Emit(returnValue) ⇒ - out.onNext(returnValue.asInstanceOf[Out]) - val k = continue(()) - handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) - case Effect.Fail(throwable) ⇒ - out.onError(throwable) - case Effect.Finish ⇒ - out.onComplete() + def handle(init: (Effect[Out], Any ⇒ Any)): Unit = { + var more = init + var done = false + while (!done) { + try { + val (effect, continue) = more + effect match { + case Effect.AwaitTask(task, scheduler) ⇒ + task.asyncBoundary.runOnComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(scheduler) + done = true + case Effect.AwaitFuture(task, executionContext) ⇒ + task.onComplete({ result ⇒ + val k = continue(result) + handle(k.asInstanceOf[(Effect[Out], Any ⇒ Any)]) + })(executionContext) + done = true + case Effect.GetFiberVar ⇒ + more = continue(state.fiberVar) + .asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.PutFiberVar(value) ⇒ + state.fiberVar = value + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.Emit(returnValue) ⇒ + out.onNext(returnValue.asInstanceOf[Out]) + more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)] + case Effect.Fail(throwable) ⇒ + out.onError(throwable) + done = true + case Effect.Finish ⇒ + out.onComplete() + done = true + } + } catch { + case NonFatal(err) ⇒ + out.onError(err) + done = true } - } catch { - case NonFatal(err) ⇒ - out.onError(err) } } -- cgit v1.2.3