Description
I've not found an issue discussing this topic, so I'll open one:
I'm an huge user of Observable, but I've never found the right answer to this question: How to handle "async aborting"?
This is something recurring especially when we want to convert streams to Observable:
function readableStreamToObservable<GValue>(stream: ReadableStream<GValue>): Observable<GValue> {
return new Observable((subscriber) => {
const reader = stream.getReader();
subscriber.addTeardown(() => {
// `cancel` is async, so what to do with the result ?
// what to do in case of error ?
reader.cancel(subscriber.signal.reason);
});
// quick implementation
const loop = async () => {
while (subscriber.active) {
let result: ReadableStreamReadResult<GValue>;
try {
result = await reader.read();
} catch (error: unknown) {
subscriber.error(error);
break;
}
if (result.done) {
subscriber.complete(result.value);
break;
} else {
subscriber.next(result.value);
}
}
};
loop();
});
}
I explored a few avenues but was never completely satisfied:
- an alternative to
AbortController/Signal
, but able to listen to "async" functions, and returning a promise when callingabort()
(something like emittery)
interface AsyncAbortController {
readonly signal: AsyncAbortSignal;
abort(reason?: unknown): Promise<void>;
};
interface AsyncAbortSignal {
listen(callback: () => (void | PromiseLike<void>)): () => void;
};
But adding an alternative to an AbortController
may add confusion.
- having a
fourth
property on SubscriptionObserver, namedabort
, and replicate it onSubscriber
:
interface SubscriptionObserver {
// ... other props
abort?(reason?: unknown): void;
};
interface Subscriber {
// ... other props
abort(reason?: unknown): void;
addTeardown(callback: () => (void | PromiseLike<void>)): void;
interface SubscriptionObserver {
// ... other props
abort?(reason?: unknown): void;
};
This solution adds complexity, as we have to handle another "terminal" state of Observable.
- simply ignore the async result when aborting an Observable ?
Why not, but this may result in "hidden" concurrency issues.
For example, we may imagine, opening a lock on a file inside an Observable, and emitting its content in chunks, and finally unlocking it (an async operation), when the Observable is aborted.
However, if the "consumer" of such an Observable, aborts it, and then immediately re-subscribe to it while the file is still unlocking: he will get an error without any way to know when the file is "ready" to be consumed.
So any idea about this particular use case ?
Thanks in advance for your time.