-
Notifications
You must be signed in to change notification settings - Fork 68
introduce Scope + ensureMap #1315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
import kyo.Tag | ||
import kyo.kernel.ContextEffect | ||
|
||
opaque type Scope = Sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is planned to be the replacement for Resource
. Instead of using ContextEffect
, which has more strict type-level tracking, I'm using an opaque type + Local
. The effect tracking is meant to ensure the Local
is set in a scope to allow runFinalizer
to dynamically check if there's an outer Scope
finalizer. It's similar to how STM
handles transactions.
def acquireRelease[A, S](acquire: => A < S)(release: A => Any < (Async & Abort[Throwable]))(using Frame): A < (Scope & S) = | ||
withFinalizerUnsafe { finalizer => | ||
acquire.ensureMap { resource => | ||
val result = finalizer.ensure(_ => release(resource)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensureMap
avoids potential interruptions after acquire
finishes. Paired with an unsafe finalizer.ensure
, there's no possibility of interruption between the acquisition finishing and the finalizer callback being registered.
import AllowUnsafe.embrace.danger | ||
val child = new Finalizer.Unsafe | ||
parent.foreach(p => discard(p.ensure(child.close(_, closeParallelism)))) | ||
run(child, closeParallelism, awaitClose)(v(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hearnadam does this cover the scenario you were exploring? The finalizer will be registered with the parent to ensure closing if the parent is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it shouldn't set the local so the resources are isolated. Updating
run(child, closeParallelism, awaitClose)(v(child)) | ||
} | ||
|
||
private def run[A, S](finalizer: Finalizer, closeParallelism: Int, awaitClose: Boolean)(v: A < (Scope & S))( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitClose
is a new feature and indicates if the execution should wait for the finalizer to close. If false
, the closing happens in the background without backpressure. We could have separate apis returning a pending Sync
instead of Async
when it's false
but it doesn't seem worth it.
} | ||
} | ||
|
||
"can't be interrupted without registering the finalizer" in run { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't supported by the current Resource
|
||
type Callback = Maybe[Error[Any]] => Any < (Async & Abort[Throwable]) | ||
|
||
final class Unsafe(using frame: Frame, allow: AllowUnsafe): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: I would make the AllowUnsafe an inline given in the body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added an import since it's simpler and I think it's equivalent to an inline given
ensure(_ => v) | ||
|
||
def ensure(callback: Maybe[Error[Any]] => Any < (Async & Abort[Throwable]))(using Frame): Unit < Scope = | ||
withFinalizerUnsafe(f => Abort.get(f.ensure(callback))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not selecting the extension method on the safe variant? Can you use Finalizer.ensure(f)(callback)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I've added f.unsafe.ensure
to select the unsafe version since there's no need for an additional suspension
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually it isn't necessary. Unsafe
is a class, not an opaque type
so it's not ambiguous
Result.panic(new Closed( | ||
"Finalizer", | ||
frame, | ||
"This finalizer is already closed. This may happen if a background fiber escapes the scope of a 'Resource.run' call." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"This finalizer is already closed. This may happen if a background fiber escapes the scope of a 'Resource.run' call." | |
"This finalizer is already closed. This may happen if a background fiber escapes the scope of a 'Scope.run' call." 8000 |
Sync.Unsafe.evalOrThrow( | ||
Async.foreachDiscard(tasks, parallelism) { task => | ||
Abort.run[Throwable](task(ex)) | ||
.map(_.foldError(_ => (), ex => Log.error("Resource finalizer failed", ex.exception))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(_.foldError(_ => (), ex => Log.error("Resource finalizer failed", ex.exception))) | |
.map(_.foldError(_ => (), ex => Log.error("Scope finalizer failed", ex.exception))) |
def runIsolated[A, S](f: Finalizer => A < (Scope & S))(using Frame): A < (Async & S) = | ||
runIsolated(1)(f) | ||
|
||
def runIsolated[A, S](closeParallelism: Int, awaitClose: Boolean = true)(f: Finalizer => A < (Scope & S))( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the name Isolated is helpful to me.
Perhaps runLocally
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
* @return | ||
* The acquired resource wrapped in Scope and S effects. | ||
*/ | ||
def acquireRelease[A, S](acquire: => A < S)(release: A => Any < (Async & Abort[Throwable]))(using Frame): A < (Scope & S) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have used A < (Scope & S & Sync)
instead of A < (Scope & S)
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, only for Kyo. acquireRelease
!
* [[kyo.Scope.run]] For executing resource-managed computations | ||
*/ | ||
opaque type Scope <: Sync = Sync | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deprecated("use `Scope`", "1.0-RC") | |
type Resource = Scope | |
@deprecated("use `Scope`", "1.0-RC") | |
val Resource = Scope | |
I think we could remove them after the 1.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, but can you note that in the message?
} | ||
} | ||
|
||
"hierarchical behavior" - { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new tests from here
* @see | ||
* [[kyo.Scope.run]] For executing resource-managed computations | ||
*/ | ||
opaque type Scope <: Sync = Sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this result in Scope & Sync
becoming Sync
? I am not sure that's desirable for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, just that Scope can subsume Sync. Sync can't subsume Scope
* [[kyo.Scope.run]] For executing resource-managed computations | ||
*/ | ||
opaque type Scope <: Sync = Sync | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, but can you note that in the message?
private def withFinalizerUnsafe[A, S](f: AllowUnsafe ?=> Finalizer => A < S)(using Frame): A < (Scope & S) = | ||
local.use { | ||
case Present(finalizer) => f(using AllowUnsafe.embrace.danger)(finalizer) | ||
case Absent => bug("Missing finalizer from context") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder if this is the correct approach, but I suppose type level tracking is all that's needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should never happen given the type-level tracking. STM does something similar
.map(promise.safe.becomeDiscard) | ||
) | ||
|
||
def await()(using AllowUnsafe): Fiber.Unsafe[Nothing, Unit] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be fiber
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the unsafe apis normally return unsafe types?
escapedFiber <- Scope.run { | ||
for | ||
r <- Scope.acquire(resource) | ||
fiber <- Fiber.init { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bit awkward (and pre-existing). I am not sure we should have closed as a result here, but see if we can just fire the finalizer immediately. Or forking should by default create a child scope that's closed when the parent is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's actually something I implemented (see Scope.internal.tryEnsure
). If the outer scope is already closed, I don't see a reason to fail. For example some daemon fiber that forks others. Note that the scope always propagates
* @return | ||
* The result of the effect wrapped in Async and S effects. | ||
*/ | ||
def runLocally[A, S](closeParallelism: Int, awaitClose: Boolean = true)(f: Finalizer => A < (Scope & S))( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should handle Scope
in it's current state - it doesn't set the Finalizer to Present
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
} | ||
} | ||
|
||
"runLocally" - { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test that stacks runLocally
instances? I also think we need to use finalizer.ensure
in some tests.
Fixes #1224
This PR reworks resource-handling abstraction and improves safety of finalizer registration.
Resource
toScope
opaque type Scope <: Sync
) replacingResource
;ensureMap
avoids races by registering finalizers synchronously after acquisition.awaitClose
flagrunLocally
methodrunIsolated
.Scope
.Impact: