8000 parallelMerge() does not propagate return() back to the merged iterables · Issue #46 · reconbot/streaming-iterables · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

parallelMerge() does not propagate return() back to the merged iterables #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
dko-slapdash opened this issue Aug 29, 2020 · 4 comments

Comments

@dko-slapdash
Copy link

Example code:

import delay from "delay";
import { parallelMerge } from "streaming-iterables";

async function* iterable(name: string, dt: number) {
  try {
    for (let i = 0; ; i++) {
      console.log(`${name}: ${i}`);
      yield `${name}: ${i}`;
      await delay(dt);
    }
  } finally {
    console.log(`Exited ${name}`);
  }
}

async function* caller() {
  //yield* iterable("A", 900);
  yield* parallelMerge(iterable("A", 900));
}

async function main() {
  for await (const message of caller()) {
    if (message.includes("4")) {
      break;
    }

    console.log(`Received ${message}`);
  }

  console.log("Finishing");
  await delay(3000);
}

main().catch((e) => console.log(e));

In this example I do a "dummy" merging of 1 iterable for simplicity (but we can merge multiple, the effect persists). The output is:

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Finishing

Notice that finally {} block in iterable() function was never executed. But it should: replace the call to parallelMerge() with yield* iterable("A", 900); to see the correct output (with "Exited A"):

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Exited A
Finishing

How it works: both for-await and yield* instructions call the source iterator's return() method once the loop is over, and they propagate that signal further up the stack. I think parallelMerge() just doesn't do this.

@dko-slapdash
Copy link
Author

A few more notes:

  1. I think this library does a call to return BTW: https://github.com/fraxken/combine-async-iterators/blob/master/index.js
  2. There is not only .return() in each iterator, but also .throw() method which allows to "inject" an exception and throw it once the generator functions runs a yield. It can also be covered (but I think it's trickier, and propagating .return() would be enough).

@reconbot
Copy link
Owner
reconbot commented Sep 8, 2020

Sounds like something we should fix. I haven't had time to look into it but I'm happy to merge it in.

@reconbot
Copy link
Owner

So I did some research and this is part of the generator spec not the iterator spec. Iterator results only have a next() function and require no other functions. Parallel merge happens to be a generator but not all our functions are generators. It might be worth supporting this for everything, I'm wondering if doing so however would break any use cases. 🤔

@dko-slapdash
Copy link
Author
dko-slapdash commented Oct 4, 2020

Yeah. The entire topic of AsyncIterators (and especially AsyncGenerators) is extremely complicated. I spent several days so far trying to match all the corner cases in my own implementation of merge(), where return() is properly propagated to all merging iterables (and unfreezes at the proper moment), and I still don't have any luck. The idea is to guarantee that iterable =~= merge(iterable) where "=~=" means "equivalent in all aspects", i.e. merge() of one iterable acts exactly the same way as this iterable itself (when the iterable is received from e.g. a generator or not - doesn't matter).

For instance, here is one corner case for that (see inline comments):

async function standardThrow(wrap: <T>(inner: T) => T = (inner) => inner) {
  const log: any[] = [];

  async function* gen() {
    try {
      for (let i = 0; i < 3; i++) {
        await sleep(500);
        yield i;
      }
    } catch (e) {
      log.push(["gen caught", e]);
      throw e;
    }
  }

  const iterator = wrap(gen())[Symbol.asyncIterator]();
  iterator
    .next()
    .then((v) => {
      log.push(["next 1", v]);
      iterator
        .next()
        .then((v) => log.push(["next 2", v]))
        .catch(() => {});
    })
    .catch((e) => log.push(["next 1 e", e]));
  const promise = sleep(100)
    .then(async () => {
      log.push("calling throw()");
      return iterator.throw("test");
    })
    .catch((e) => log.push(["throw e", e]));
  log.push("start");
  await promise;
  log.push("finish");

  // Calling iterator.throw() in a random moment of time does not cause next()
  // to throw or return done=true. Instead, next() successfully yields the value
  // and returns it, and only the following call to next() returns done=true.
  // Also, iterator.throw() itself unfreezes only after next() returns done=true
  // and not earlier.
  expect(log).toEqual([
    "start",
    "calling throw()",
    ["gen caught", "test"],
    ["next 1", { done: false, value: 0 }],
    ["next 2", { done: true, value: undefined }],
    ["throw e", "test"],
    "finish",
  ]);
}

I have 6 test cases in total like that, and this all seems to be covered in https: 5D77 //tc39.es/proposal-async-iteration/ spec (this spec is unusable in practice BTW):

  • standardReturn
  • standardThrow
  • standardNextCalledTwiceImmediately
  • standardReturnIsNotCalledWhenIterableIsIteratedAndNextThrows
  • standardReturnIsCalledWhenIterationBroke
  • standardNextCalledAfterThrow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants
0