aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 20:48:38 +0100
committerMatthias Andreas Benkard <code@mail.matthias.benkard.de>2018-01-10 20:48:38 +0100
commit7e034b1c19b2e0686a38e0562ed26ddb1fd912d0 (patch)
tree644b0b853de4b4eb242e7de9f518abce4a02f93c /core
parentaabb3b82757b715876fafe4d7ffaced183c04fd6 (diff)
Make fibers cancellable.
The Observable returned by Fiber.run now implements monix.execution.Cancelable properly and will finish execution of a fiber on the next shift point when its corresponding Observable is cancelled.
Diffstat (limited to 'core')
-rw-r--r--core/lib/Fiber.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/core/lib/Fiber.scala b/core/lib/Fiber.scala
index 2d73f9e..42b416b 100644
--- a/core/lib/Fiber.scala
+++ b/core/lib/Fiber.scala
@@ -1,8 +1,8 @@
package eu.mulk.fibers
import monix.eval.Task
-import monix.execution.Scheduler
-import monix.execution.cancelables.SingleAssignmentCancelable
+import monix.execution.cancelables.MultiAssignmentCancelable
+import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, OverflowStrategy}
import scala.concurrent.{ExecutionContext, Future}
@@ -129,11 +129,17 @@ object Fiber extends FiberConversions with FiberTypes {
var state = new FiberState
monix.reactive.Observable.create[Out](OverflowStrategy.Unbounded) { out ⇒
- val cancelable = SingleAssignmentCancelable()
+ val cancelable = MultiAssignmentCancelable()
def handle(init: (Effect[Out], Any ⇒ Any)): Unit = {
var more = init
var done = false
+ cancelable := Cancelable { () ⇒
+ if (!done) {
+ done = true
+ out.onComplete()
+ }
+ }
while (!done) {
try {
val (effect, continue) = more