Description
I'm sure there are concrete bugs related to what I'm going to describe. But there's a conceptual problem here, so.... here's a brain dump.
Cancellations in flows are a conceptual problem for Turbine right now, especially now that Turbine can report issues with multiple Turbines at once.
From the standpoint of a single Flow
, cancellations are not a problem and the existing Event
modeling of the underlying tool is accurate and good:
public sealed interface Event<out T> {
public object Complete : Event<Nothing>
public class Error(public val throwable: Throwable) : Event<Nothing>
public class Item<T>(public val value: T) : Event<T>
}
A CancellationException
is just another kind of Error
. We report it to the turbine, caller either handles it or they don't.
Great. So you can meaningfully write this:
@Test
fun cancellationsAreCaptured() = runTest {
flow<Nothing> {
currentCoroutineContext().cancel()
suspendCancellableCoroutine { }
}.test {
assertTrue(awaitError() is CancellationException)
}
}
This is good and nice because it is transparent: if you understand what a Flow
is (a suspend fun
that calls suspending emit
callbacks, and terminates either by falling out the end of the method or throwing an exception), you can see that every possible terminal state of the Flow
is reflected (it ends with an exception, it ends with a completion, or it never ends at all)
Things get more complicated when one of our testIn
invocations blows up:
@Test
fun whatHappensToOurOtherTurbines() = runTest {
val flow1 = flow {
emit(1)
emit(2)
suspendCancellableCoroutine<Nothing> { }
}
flow1.test {
val flow2 = flow<Nothing> {
throw RuntimeException("pow")
}
flow2.testIn(this@runTest)
}
}
When flow2
dies, it will tear down the Job
it is running in, which tears down its parent Job
, etc... that will have the effect of cancelling flow1
. Which will throw a CancellationException
.
So purely in terms of streams of events, the turbineScope
inside of test
receives this to work with:
flow1:
* Item(1)
* Item(2)
* Error(CancellationException)
flow2:
* Error(RuntimeException("pow"))
flow2
makes sense. But what about flow1
? How are we to deal with this?
Here are some options to consider:
Ignore all CancellationException
Let's talk about the big gun first: "Let's just ignore CancellationException
." This idea is actually pretty appealing to me, because so many cancellations happen automatically, either directly in response to Turbine's management work, or in response to thrown exceptions.
And that's all well and good, except for the fact that Flows can and do cancel themselves. This should be validatable behavior:
val selfCancellingFlow = flow {
emit(1)
currentCoroutineContext().cancel()
}
If we ignore all CancellationException
s, selfCancellingFlow
becomes impossible to validate. That alone makes me feel like this is not a valid approach.
Ignore CancellationException only when reporting crashes
We currently do this, but I'm having second thoughts:
When an exception bubbles up through turbineScope
, we just strip out any cancellation exceptions. I hacked this together to achieve a reporting outcome that felt good.
This seems to work pretty well when turbineScope
's block throws: in that case, we already know that, but unfortunately it doesn't say how to deal with CancellationException when there is no exception from the validation block. We have to check for unconsumed events in that scenario, and filtering out all cancellations in that scenario makes it impossible to validate selfCancellingFlow
.
Ignore CancellationException in testIn
With #248, we are now doing this just for testIn
(I think).
Again, this breaks the ability to validate selfCancellingFlow
.
Ignore CancellationException, but only when teardown starts
This seems more promising. ChannelTurbine
has ignoreTerminalEvents
, which is intended to do exactly this. So maybe our answer is to get rid of all the special casing for CancellationException
, and instead find a way to apply ignoreTerminalEvents
to all the turbines.
But even then, the current implementation of ignoreTerminalEvents
has the benefit of being driven by the complete control that .test
has over control flow driving its subject Flow
: the flow will either be cancelled implicitly when the validation block completes, or by the exception thrown from within that validation block.
This is not true of cancellation for flows run via testIn
: e.g. if we do something like this:
turbineScope {
coroutineScope {
launch {
throw RuntimeException()
}
flow {
suspendCancellableCoroutine<Nothing> { }
}.testIn(this)
}
}
....the RuntimeException
bubbling up will kill the flow, cancelling it before Turbine can even know what happened.
So what do we do?
Probably figure out a way to execute on the "only after teardown" approach. This will probably mean that we have to force a particular Job
on all Turbines — this is the only way to have control on what actually cancels them.