8000 [core] stable fiber identity by fwbrasil · Pull Request #1190 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[core] stable fiber identity #1190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? 8000 Sign in to your account

Merged
merged 1 commit into from
May 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 50 additions & 42 deletions kyo-core/shared/src/main/scala/kyo/scheduler/IOTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,71 +34,79 @@ sealed private[kyo] class IOTask[Ctx, E, A] private (

private inline def locally[A](inline f: A): A = f

final private def eval(startMillis: Long, clock: InternalClock, deadline: Long)(using Safepoint) =
final private def eval(startMillis: Long, clock: InternalClock, deadline: Long)(using Safepoint): A < (Ctx & Async & Abort[E]) =
try
curr = Isolate.restoring(trace, this) {
ArrowEffect.handlePartial(erasedAbortTag, Tag[Async.Join], curr, context)(
stop =
shouldPreempt() || (deadline != Long.MaxValue && clock.currentMillis() > deadline),
[C] =>
(input, cont) =>
locally {
completeDiscard(input.asInstanceOf[Result[E, A]])
nullResult
},
[C] =>
(input, cont) =>
locally {
input.poll() match
case null =>
cont(null)
case Present(r) =>
cont(r.asInstanceOf[Result[Nothing, C]])
case Absent =>
val runtime = (clock.currentMillis() - startMillis + this.runtime()).toInt
val finalizers = this.finalizers
this.finalizers = Finalizers.empty
val trace = this.trace
this.trace = null.asInstanceOf[Trace]
this.interrupts(input)
input.onComplete { r =>
val task =
IOTask(IO(cont(r.asInstanceOf[Result[Nothing, C]])), trace, context, finalizers, runtime)
this.removeInterrupt(input)
this.becomeDiscard(task)
}
nullResult
}
)
}
if !isNull(curr) then
curr.evalNow.foreach(a => completeDiscard(Result.succeed(a)))
val next: A < (Ctx & Async & Abort[E]) =
Isolate.restoring(trace, this) {
ArrowEffect.handlePartial(erasedAbortTag, Tag[Async.Join], curr, context)(
stop =
shouldPreempt() || (deadline != Long.MaxValue && clock.currentMillis() > deadline),
[C] =>
(input, cont) =>
locally {
completeDiscard(input.asInstanceOf[Result[E, A]])
nullResult
},
[C] =>
(input, cont) =>
locally {
input.poll() match
case null =>
cont(null)
case Present(r) =>
cont(r.asInstanceOf[Result[Nothing, C]])
case Absent =>
curr = nullResult
this.interrupts(input)
input.onComplete { r =>
this.removeInterrupt(input)
curr = IO(cont(r.asInstanceOf[Result[Nothing, C]]))
Scheduler.get.schedule(this)
}
nullResult
}
)
}
if !isNull(next) then
next.evalNow match
case Absent =>
next
case Present(a) =>
completeDiscard(Result.succeed(a))
nullResult
else
next
end if
catch
case ex =>
completeDiscard(Result.panic(ex))
nullResult
end try
end eval

final def run(startMillis: Long, clock: InternalClock, deadline: Long): Task.Result =
val safepoint = Safepoint.get
eval(startMillis, clock, deadline)(using safepoint)
if isNull(curr) || !isPending() then
val next = eval(startMillis, clock, deadline)(using safepoint)
if !isPending() then
finalizers.run()
finalizers = Finalizers.empty
if trace ne null then
safepoint.releaseTrace(trace)
trace = null.asInstanceOf[Trace]
curr = nullResult
Task.Done
else
else if !isNull(next) then
curr = next
Task.Preempted
else
Task.Done
end if
end run

private inline def nullResult = null.asInstanceOf[A < Ctx & Async & Abort[E]]

override def toString =
s"IOTask(state = ${stateString()}, preempt = ${{ shouldPreempt() }}, finalizers = ${finalizers.size()}, curr = ${curr})"
s"IOTask(id = ${hashCode()}, state = ${stateString()}, preempt = ${{ shouldPreempt() }}, finalizers = ${finalizers.size()}, curr = ${curr})"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should add an id method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered exposing it but I haven't seen a good use case yet to justify the API commitment


end IOTask

Expand Down
Loading
0