From ec2402dad0a96888f74ff6dec394c9d0aaa43bef Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 13 May 2025 20:08:38 -0700 Subject: [PATCH] [core] stable fiber identity --- .../src/main/scala/kyo/scheduler/IOTask.scala | 92 ++++++++++--------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/scheduler/IOTask.scala b/kyo-core/shared/src/main/scala/kyo/scheduler/IOTask.scala index 1316748db..9cae73f12 100644 --- a/kyo-core/shared/src/main/scala/kyo/scheduler/IOTask.scala +++ b/kyo-core/shared/src/main/scala/kyo/scheduler/IOTask.scala @@ -34,55 +34,60 @@ sealed private[kyo] class IOTask[Ctx, E, A] private ( private inline def locally[A](inline f: A): A = f - final private def eval(startMillis: Long, clock: InternalClock, deadline: Long)(using Safepoint) = + final private def eval(startMillis: Long, clock: InternalClock, deadline: Long)(using Safepoint): A < (Ctx & Async & Abort[E]) = try - curr = Isolate.restoring(trace, this) { - ArrowEffect.handlePartial(erasedAbortTag, Tag[Async.Join], curr, context)( - stop = - shouldPreempt() || (deadline != Long.MaxValue && clock.currentMillis() > deadline), - [C] => - (input, cont) => - locally { - completeDiscard(input.asInstanceOf[Result[E, A]]) - nullResult - }, - [C] => - (input, cont) => - locally { - input.poll() match - case null => - cont(null) - case Present(r) => - cont(r.asInstanceOf[Result[Nothing, C]]) - case Absent => - val runtime = (clock.currentMillis() - startMillis + this.runtime()).toInt - val finalizers = this.finalizers - this.finalizers = Finalizers.empty - val trace = this.trace - this.trace = null.asInstanceOf[Trace] - this.interrupts(input) - input.onComplete { r => - val task = - IOTask(IO(cont(r.asInstanceOf[Result[Nothing, C]])), trace, context, finalizers, runtime) - this.removeInterrupt(input) - this.becomeDiscard(task) - } - nullResult - } - ) - } - if !isNull(curr) then - curr.evalNow.foreach(a => completeDiscard(Result.succeed(a))) + val next: A < (Ctx & Async & Abort[E]) = + Isolate.restoring(trace, this) { + ArrowEffect.handlePartial(erasedAbortTag, Tag[Async.Join], curr, context)( + stop = + shouldPreempt() || (deadline != Long.MaxValue && clock.currentMillis() > deadline), + [C] => + (input, cont) => + locally { + completeDiscard(input.asInstanceOf[Result[E, A]]) + nullResult + }, + [C] => + (input, cont) => + locally { + input.poll() match + case null => + cont(null) + case Present(r) => + cont(r.asInstanceOf[Result[Nothing, C]]) + case Absent => + curr = nullResult + this.interrupts(input) + input.onComplete { r => + this.removeInterrupt(input) + curr = IO(cont(r.asInstanceOf[Result[Nothing, C]])) + Scheduler.get.schedule(this) + } + nullResult + } + ) + } + if !isNull(next) then + next.evalNow match + case Absent => + next + case Present(a) => + completeDiscard(Result.succeed(a)) + nullResult + else + next + end if catch case ex => completeDiscard(Result.panic(ex)) + nullResult end try end eval final def run(startMillis: Long, clock: InternalClock, deadline: Long): Task.Result = val safepoint = Safepoint.get - eval(startMillis, clock, deadline)(using safepoint) - if isNull(curr) || !isPending() then + val next = eval(startMillis, clock, deadline)(using safepoint) + if !isPending() then finalizers.run() finalizers = Finalizers.empty if trace ne null then @@ -90,15 +95,18 @@ sealed private[kyo] class IOTask[Ctx, E, A] private ( trace = null.asInstanceOf[Trace] curr = nullResult Task.Done - else + else if !isNull(next) then + curr = next Task.Preempted + else + Task.Done end if end run private inline def nullResult = null.asInstanceOf[A < Ctx & Async & Abort[E]] override def toString = - s"IOTask(state = ${stateString()}, preempt = ${{ shouldPreempt() }}, finalizers = ${finalizers.size()}, curr = ${curr})" + s"IOTask(id = ${hashCode()}, state = ${stateString()}, preempt = ${{ shouldPreempt() }}, finalizers = ${finalizers.size()}, curr = ${curr})" end IOTask