8000 GitHub - ebonnal/streamable: Pythonic Stream-like manipulation of (async) iterables
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

ebonnal/streamable

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

coverage PyPI Anaconda-Server Badge

ΰΌ„ streamable

Pythonic Stream-like manipulation of (async) iterables

  • πŸ”— Fluent chainable lazy operations
  • πŸ”€ Concurrent via threads/processes/async
  • πŸ‡Ή Fully Typed, Stream[T] is an Iterable[T] (and an AsyncIterable[T])
  • πŸ›‘οΈ Battle-tested for prod, extensively tested with CPython 3.7 to 3.14 and compatible with PyPy.

1. install

pip install streamable or conda install conda-forge::streamable

(No dependencies)

2. import

from streamable import Stream

3. init

Create a Stream[T] decorating an Iterable[T] (or an AsyncIterable[T]):

integers: Stream[int] = Stream(range(10))

4. operate

Chain lazy operations (only evaluated during iteration), each returning a new immutable Stream:

inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

5. iterate

Iterate over a Stream[T] just as you would over any other Iterable[T] (or AsyncIterable[T]), elements are processed on-the-fly:

as an Iterable[T]

πŸ‘€ show snippets
  • into data structure
>>> list(inverses)
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
>>> set(inverses)
{0.5, 1.0, 0.2, 0.33, 0.25, 0.17, 0.14, 0.12, 0.11}
  • for
>>> [inverse for inverse in inverses]:
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
  • reduce
>>> sum(inverses)
2.82
>>> from functools import reduce
>>> reduce(..., inverses)
  • iter/next
>>> next(iter(inverses))
1.0

as an AsyncIterable[T]

πŸ‘€ show snippets
  • async for
>>> async def main() -> List[float]:
>>>     return [inverse async for inverse in inverses]

>>> asyncio.run(main())
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
  • aiter/anext
>>> asyncio.run(anext(aiter(inverses)))  # before 3.10: inverses.__aiter__().__anext__()
1.0

↔ Showcase: Extract-Transform-Load

Let's take an example showcasing most of the Stream's operations:

This script extracts the 67 quadruped PokΓ©mons from the first three generations using PokΓ©API and loads them into a CSV:

import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream

with open("./quadruped_pokemons.csv", mode="w") as file:
    fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
    writer = csv.DictWriter(file, fields, extrasaction='ignore')
    writer.writeheader()

    pipeline: Stream = (
        # Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
        Stream(itertools.count(1))
        # Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
        .throttle(16, per=timedelta(seconds=1))
        # GET pokemons concurrently using a pool of 8 threads
        .map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
        .map(requests.get, concurrency=8)
        .foreach(requests.Response.raise_for_status)
        .map(requests.Response.json)
        # Stop the iteration when reaching the 1st pokemon of the 4th generation
        .truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
        .observe("pokemons")
        # Keep only quadruped Pokemons
        .filter(lambda poke: poke["shape"]["name"] == "quadruped")
        .observe("quadruped pokemons")
        # Catch errors due to None "generation" or "shape"
        .catch(
            TypeError,
            when=lambda error: str(error) == "'NoneType' object is not subscriptable"
        )
        # Write a batch of pokemons every 5 seconds to the CSV file
        .group(interval=timedelta(seconds=5))
        .foreach(writer.writerows)
        .flatten()
        .observe("written pokemons")
        # Catch exceptions and raises the 1st one at the end of the iteration
        .catch(Exception, finally_raise=True)
    )

    pipeline()

or the async way

Use the .amap operation and await the Stream:

import asyncio
import csv
from datetime import timedelta
import itertools
import httpx
from streamable import Stream

async def main() -> None:
    with open("./quadruped_pokemons.csv", mode="w") as file:
        fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
        writer = csv.DictWriter(file, fields, extrasaction='ignore')
        writer.writeheader()

        async with httpx.AsyncClient() as http_async_client:
            pipeline: Stream = (
                # Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
                Stream(itertools.count(1))
                # Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
                .throttle(16, per=timedelta(seconds=1))
                # GET pokemons via 8 concurrent coroutines
                .map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
                .amap(http_async_client.get, concurrency=8)
                .foreach(httpx.Response.raise_for_status)
                .map(httpx.Response.json)
                # Stop the iteration when reaching the 1st pokemon of the 4th generation
                .truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
                .observe("pokemons")
                # Keep only quadruped Pokemons
                .filter(lambda poke: poke["shape"]["name"] == "quadruped")
                .observe("quadruped pokemons")
                # Catch errors due to None "generation" or "shape"
                .catch(
                    TypeError,
                    when=lambda error: str(error) == "'NoneType' object is not subscriptable"
                )
                # Write a batch of pokemons every 5 seconds to the CSV file
                .group(interval=timedelta(seconds=5))
                .foreach(writer.writerows)
                .flatten()
                .observe("written pokemons")
                # Catch exceptions and raises the 1st one at the end of the iteration
                .catch(Exception, finally_raise=True)
            )

            await pipeline

asyncio.run(main())

πŸ“’ Operations

A dozen expressive lazy operations and that's it.

Note

async twin operations: Each operation that takes a function (e.g. .map) also has a version that accepts an async function (e.g. .amap). You can mix both types of operations on the same Stream, which can then be used as either an Iterable or an AsyncIterable.

.map/.amap

Applies a transformation on elements:

πŸ‘€ show snippet
integer_strings: Stream[str] = integers.map(str)

assert list(integer_strings) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

thread-based concurrency

Applies the transformation via concurrency threads, yielding results in the upstream order (FIFO), set the parameter ordered=False to yield results as they become available (First Done, First Out).

πŸ‘€ show snippet
import requests

pokemon_names: Stream[str] = (
    Stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(requests.get, concurrency=3)
    .map(requests.Response.json)
    .map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']

Note

Memory-efficient: Only concurrency upstream elements are pulled for processing; the next upstream element is pulled only when a result is yielded downstream.

process-based concurrency

Set via="process":

πŸ‘€ show snippet
if __name__ == "__main__":
    state: List[int] = []
    # integers are mapped
    assert integers.map(state.append, concurrency=4, via="process").count() == 10
    # but the `state` of the main process is not mutated
    assert state == []

async-based concurrency: .amap

.amap can apply an async transformation concurrently.

πŸ‘€ show snippet
  • consumed as an Iterable[T]:
import httpx

pokemon_names: Stream[str] = (
    Stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .amap(httpx.AsyncClient().get, concurrency=3)
    .map(httpx.Response.json)
    .map(lambda poke: poke["name"])
)

assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
  • consumed as an AsyncIterable[T]:
import asyncio
import httpx

async def main() -> None:
    async with httpx.AsyncClient() as http_async_client:
        pokemon_names: Stream[str] = (
            Stream(range(1, 4))
            .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
            .amap(http_async_client.get, concurrency=3)
            .map(httpx.Response.json)
            .map(lambda poke: poke["name"])
        )
        assert [name async for name in pokemon_names] == ['bulbasaur', 'ivysaur', 'venusaur']

asyncio.run(main())

"starmap"

The star function decorator transforms a function that takes several positional arguments into a function that takes a tuple:

πŸ‘€ show snippet
from streamable import star

zeros: Stream[int] = (
    Stream(enumerate(integers))
    .map(star(lambda index, integer: index - integer))
)

assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

.foreach / .aforeach

Applies a side effect on elements:

πŸ‘€ show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)

assert list(appending_integers) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Similar to .map:

  • set the concurrency parameter for thread-based concurrency
  • set via="process" for process-based concurrency
  • set ordered=False for First Done First Out
  • The .aforeach operation can apply an async effect concurrently.

.group / .agroup

Groups into Lists

... up to a given group size:

πŸ‘€ show snippet
integers_by_5: Stream[List[int]] = integers.group(size=5)

assert list(integers_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... and/or co-groups by a given key:

πŸ‘€ show snippet
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)

assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]

... and/or co-groups the elements yielded by the upstream within a given time interval:

πŸ‘€ show snippet
from datetime import timedelta

integers_within_1_sec: Stream[List[int]] = (
    integers
    .throttle(2, per=timedelta(seconds=1))
    .group(interval=timedelta(seconds=0.99))
)

assert list(integers_within_1_sec) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]

Tip

Combine the size/by/interval parameters:

πŸ‘€ show snippet
integers_by_parity_by_2: Stream[List[int]] = (
    integers
    .group(by=lambda n: n % 2, size=2)
)

assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]

.groupby / .agroupby

Like .group, but groups into (key, elements) tuples:

πŸ‘€ show snippet
integers_by_parity: Stream[Tuple[str, List[int]]] = (
    integers
    .groupby(lambda n: "odd" if n % 2 else "even")
)

assert list(integers_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

Tip

Then "starmap" over the tuples:

πŸ‘€ show snippet
from streamable import star

counts_by_parity: Stream[Tuple[str, int]] = (
    integers_by_parity
    .map(star(lambda parity, ints: (parity, len(ints))))
)

assert list(counts_by_parity) == [("even", 5), ("odd", 5)]

.flatten / .aflatten

Ungroups elements assuming that they are Iterables (or AsyncIterables for .aflatten):

πŸ‘€ show snippet
even_then_odd_integers: Stream[int] = integers_by_parity.flatten()

assert list(even_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]

thread-based concurrency

Concurrently flattens concurrency iterables via threads (or via coroutines for .aflatten):

πŸ‘€ show snippet
mixed_ones_and_zeros: Stream[int] = (
    Stream([[0] * 4, [1] * 4])
    .flatten(concurrency=2)
)
assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1]

.filter / .afilter

Keeps only the elements that satisfy a condition:

πŸ‘€ show snippet
even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

assert list(even_integers) == [0, 2, 4, 6, 8]

.distinct / .adistinct

Removes duplicates:

πŸ‘€ show snippet
distinct_chars: Stream[str] = Stream("foobarfooo").distinct()

assert list(distinct_chars) == ["f", "o", "b", "a", "r"]

specifying a deduplication key:

πŸ‘€ show snippet
strings_of_distinct_lengths: Stream[str] = (
    Stream(["a", "foo", "bar", "z"])
    .distinct(len)
)

assert list(strings_of_distinct_lengths) == ["a", "foo"]

Warning

During iteration, all distinct elements that are yielded are retained in memory to perform deduplication. However, you can remove only consecutive duplicates without a memory footprint by setting consecutive_only=True:

πŸ‘€ show snippet
consecutively_distinct_chars: Stream[str] = (
    Stream("foobarfooo")
    .distinct(consecutive_only=True)
)

assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"]

.truncate / .atruncate

Ends iteration once a given number of elements have been yielded:

πŸ‘€ show snippet
five_first_integers: Stream[int] = integers.truncate(5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

or when a condition is satisfied:

πŸ‘€ show snippet
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

If both count and when are set, truncation occurs as soon as either condition is met.

.skip / .askip

Skips the first specified number of elements:

πŸ‘€ show snippet
integers_after_five: Stream[int] = integers.skip(5)

assert list(integers_after_five) == [5, 6, 7, 8, 9]

or skips elements until a predicate is satisfied:

πŸ‘€ show snippet
integers_after_five: Stream[int] = integers.skip(until=lambda n: n >= 5)

assert list(integers_after_five) == [5, 6, 7, 8, 9]

If both count and until are set, skipping stops as soon as either condition is met.

.catch / .acatch

Catches a given type of exception, and optionally yields a replacement value:

πŸ‘€ show snippet
inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replacement=float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

You can specify an additional when condition for the catch:

πŸ‘€ show snippet
import requests
from requests.exceptions import ConnectionError

status_codes_ignoring_resolution_errors: Stream[int] = (
    Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
    .map(requests.get, concurrency=2)
    .catch(ConnectionError, when=lambda error: "Max retries exceeded with url" in str(error))
    .map(lambda response: response.status_code)
)

assert list(status_codes_ignoring_resolution_errors) == [200, 404]

It has an optional finally_raise: bool parameter to raise the first exception caught (if any) when the iteration terminates.

Tip

Leverage when to apply side effects on catch:

πŸ‘€ show snippet
errors: List[Exception] = []

def store_error(error: Exception) -> bool:
    errors.append(error)  # applies effect
    return True  # signals to catch the error

integers_in_string: Stream[int] = (
    Stream("012345foo6789")
    .map(int)
    .catch(ValueError, when=store_error)
)

assert list(integers_in_string) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(errors) == len("foo")

.throttle

Limits the number of yields per time interval:

πŸ‘€ show snippet
from datetime import timedelta

three_integers_per_second: Stream[int] = integers.throttle(3, per=timedelta(seconds=1))

# takes 3s: ceil(10 integers / 3 per_second) - 1
assert list(three_integers_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.observe

Logs the progress of iterations:

πŸ‘€ show snippet
>>> assert list(integers.throttle(2, per=timedelta(seconds=1)).observe("integers")) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
INFO: [duration=0:00:00.001793 errors=0] 1 integers yielded
INFO: [duration=0:00:00.004388 errors=0] 2 integers yielded
INFO: [duration=0:00:01.003655 errors=0] 4 integers yielded
INFO: [duration=0:00:03.003196 errors=0] 8 integers yielded
INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded

Note

To avoid flooding, logs are emitted only when the number of yielded elements (or errors) reaches powers of 2.

Tip

To mute these logs, set the logging level above INFO:

πŸ‘€ show snippet
import logging
logging.getLogger("streamable").setLevel(logging.WARNING)

+

Concatenates streams:

πŸ‘€ show snippet
assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9]

zip

Use the standard zip function:

πŸ‘€ show snippet
from streamable import star

cubes: Stream[int] = (
    Stream(zip(integers, integers, integers))  # Stream[Tuple[int, int, int]]
    .map(star(lambda a, b, c: a * b * c))  # Stream[int]
)

assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

Shorthands for consuming the stream

Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration:

.count / .acount

.count iterates over the stream until exhaustion and returns the number of elements yielded:

πŸ‘€ show snippet
assert integers.count() == 10

The .acount (async method) iterates over the stream as an AsyncIterable until exhaustion and returns the number of elements yielded:

πŸ‘€ show snippet
assert asyncio.run(integers.acount()) == 10

() / await

Calling the stream iterates over it until exhaustion, and returns it:

πŸ‘€ show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert appending_integers() is appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Awaiting the stream iterates over it as an AsyncIterable until exhaustion, and returns it:

πŸ‘€ show snippet
async def test_await() -> None:
    state: List[int] = []
    appending_integers: Stream[int] = integers.foreach(state.append)
    appending_integers is await appending_integers
    assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncio.run(test_await())

.pipe

Calls a function, passing the stream as first argument, followed by *args/**kwargs if any (inspired by the .pipe from pandas or polars):

πŸ‘€ show snippet
import pandas as pd

(
    integers
    .observe("ints")
    .pipe(pd.DataFrame, columns=["integer"])
    .to_csv("integers.csv", index=False)
)

πŸ’‘ Notes

Exceptions are not terminating the iteration

Tip

If any of the operations raises an exception, you can resume the iteration after handling it:

πŸ‘€ show snippet
from contextlib import suppress

casted_ints: Iterator[int] = iter(
    Stream("0123_56789")
    .map(int)
    .group(3)
    .flatten()
)
collected: List[int] = []

with suppress(ValueError):
    collected.extend(casted_ints)
assert collected == [0, 1, 2, 3]

collected.extend(casted_ints)
assert collected == [0, 1, 2, 3, 5, 6, 7, 8, 9]

Visitor Pattern

Tip

A Stream can be visited via its .accept method: implement a custom visitor by extending the abstract class streamable.visitors.Visitor:

πŸ‘€ show snippet
from streamable.visitors import Visitor

class DepthVisitor(Visitor[int]):
    def visit_stream(self, stream: Stream) -> int:
        if not stream.upstream:
            return 1
        return 1 + stream.upstream.accept(self)

def depth(stream: Stream) -> int:
    return stream.accept(DepthVisitor())

assert depth(Stream(range(10)).map(str).foreach(print)) == 3

Functions

Tip

The Stream's methods are also exposed as functions:

πŸ‘€ show snippet
from streamable.functions import catch

inverse_integers: Iterator[int] = map(lambda n: 1 / n, range(10))
safe_inverse_integers: Iterator[int] = catch(inverse_integers, ZeroDivisionError)

Contributing

Many thanks to our contributors!

Feel very welcome to help us improve streamable via issues and PRs, check CONTRIBUTING.md.

πŸ™ Community Highlights – Thank You!

0