aboutsummaryrefslogtreecommitdiff
path: root/core/lib/Fiber.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/lib/Fiber.scala')
-rw-r--r--core/lib/Fiber.scala15
1 files changed, 9 insertions, 6 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
index 42b416b..bf55cf4 100644
--- a/core/lib/Fiber.scala
+++ b/core/lib/Fiber.scala
@@ -1,6 +1,7 @@
package eu.mulk.fibers
import monix.eval.Task
+import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.MultiAssignmentCancelable
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, OverflowStrategy}
@@ -135,10 +136,8 @@ object Fiber extends FiberConversions with FiberTypes {
var more = init
var done = false
cancelable := Cancelable { () ⇒
- if (!done) {
- done = true
- out.onComplete()
- }
+ out.onComplete()
+ done = true
}
while (!done) {
try {
@@ -163,8 +162,12 @@ object Fiber extends FiberConversions with FiberTypes {
state.fiberVar = value
more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
case Effect.Emit(returnValue) ⇒
- out.onNext(returnValue.asInstanceOf[Out])
- more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ out.onNext(returnValue.asInstanceOf[Out]) match {
+ case _: Continue ⇒
+ more = continue(()).asInstanceOf[(Effect[Out], Any ⇒ Any)]
+ case _: Stop ⇒
+ cancelable.cancel()
+ }
case Effect.Fail(throwable) ⇒
out.onError(throwable)
done = true