- π Fluent chainable lazy operations
- π Concurrent via threads/processes/
async
- πΉ Fully Typed,
Stream[T]
is anIterable[T]
(and anAsyncIterable[T]
) - π‘οΈ Battle-tested for prod, extensively tested with CPython 3.7 to 3.14 and compatible with PyPy.
pip install streamable
or conda install conda-forge::streamable
(No dependencies)
from streamable import Stream
Create a Stream[T]
decorating an Iterable[T]
(or an AsyncIterable[T]
):
integers: Stream[int] = Stream(range(10))
Chain lazy operations (only evaluated during iteration), each returning a new immutable Stream
:
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)
Iterate over a Stream[T]
just as you would over any other Iterable[T]
(or AsyncIterable[T]
), elements are processed on-the-fly:
π show snippets
- into data structure
>>> list(inverses)
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
>>> set(inverses)
{0.5, 1.0, 0.2, 0.33, 0.25, 0.17, 0.14, 0.12, 0.11}
for
>>> [inverse for inverse in inverses]:
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
reduce
>>> sum(inverses)
2.82
>>> from functools import reduce
>>> reduce(..., inverses)
iter
/next
>>> next(iter(inverses))
1.0
π show snippets
async for
>>> async def main() -> List[float]:
>>> return [inverse async for inverse in inverses]
>>> asyncio.run(main())
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
aiter
/anext
>>> asyncio.run(anext(aiter(inverses))) # before 3.10: inverses.__aiter__().__anext__()
1.0
Let's take an example showcasing most of the Stream
's operations:
This script extracts the 67 quadruped PokΓ©mons from the first three generations using PokΓ©API and loads them into a CSV:
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open("./quadruped_pokemons.csv", mode="w") as file:
fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
writer = csv.DictWriter(file, fields, extrasaction='ignore')
writer.writeheader()
pipeline: Stream = (
# Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
Stream(itertools.count(1))
# Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
.throttle(16, per=timedelta(seconds=1))
# GET pokemons concurrently using a pool of 8 threads
.map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
.map(requests.get, concurrency=8)
.foreach(requests.Response.raise_for_status)
.map(requests.Response.json)
# Stop the iteration when reaching the 1st pokemon of the 4th generation
.truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
.observe("pokemons")
# Keep only quadruped Pokemons
.filter(lambda poke: poke["shape"]["name"] == "quadruped")
.observe("quadruped pokemons")
# Catch errors due to None "generation" or "shape"
.catch(
TypeError,
when=lambda error: str(error) == "'NoneType' object is not subscriptable"
)
# Write a batch of pokemons every 5 seconds to the CSV file
.group(interval=timedelta(seconds=5))
.foreach(writer.writerows)
.flatten()
.observe("written pokemons")
# Catch exceptions and raises the 1st one at the end of the iteration
.catch(Exception, finally_raise=True)
)
pipeline()
Use the .amap
operation and await
the Stream
:
import asyncio
import csv
from datetime import timedelta
import itertools
import httpx
from streamable import Stream
async def main() -> None:
with open("./quadruped_pokemons.csv", mode="w") as file:
fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
writer = csv.DictWriter(file, fields, extrasaction='ignore')
writer.writeheader()
async with httpx.AsyncClient() as http_async_client:
pipeline: Stream = (
# Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
Stream(itertools.count(1))
# Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
.throttle(16, per=timedelta(seconds=1))
# GET pokemons via 8 concurrent coroutines
.map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
.amap(http_async_client.get, concurrency=8)
.foreach(httpx.Response.raise_for_status)
.map(httpx.Response.json)
# Stop the iteration when reaching the 1st pokemon of the 4th generation
.truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
.observe("pokemons")
# Keep only quadruped Pokemons
.filter(lambda poke: poke["shape"]["name"] == "quadruped")
.observe("quadruped pokemons")
# Catch errors due to None "generation" or "shape"
.catch(
TypeError,
when=lambda error: str(error) == "'NoneType' object is not subscriptable"
)
# Write a batch of pokemons every 5 seconds to the CSV file
.group(interval=timedelta(seconds=5))
.foreach(writer.writerows)
.flatten()
.observe("written pokemons")
# Catch exceptions and raises the 1st one at the end of the iteration
.catch(Exception, finally_raise=True)
)
await pipeline
asyncio.run(main())
A dozen expressive lazy operations and that's it.
Note
async
twin operations: Each operation that takes a function (e.g. .map
) also has a version that accepts an async
function (e.g. .amap
). You can mix both types of operations on the same Stream
, which can then be used as either an Iterable
or an AsyncIterable
.
Applies a transformation on elements:
π show snippet
integer_strings: Stream[str] = integers.map(str)
assert list(integer_strings) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
Applies the transformation via
concurrency
threads, yielding results in the upstream order (FIFO), set the parameterordered=False
to yield results as they become available (First Done, First Out).
π show snippet
import requests
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(requests.get, concurrency=3)
.map(requests.Response.json)
.map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
Note
Memory-efficient: Only concurrency
upstream elements are pulled for processing; the next upstream element is pulled only when a result is yielded downstream.
Set
via="process"
:
π show snippet
if __name__ == "__main__":
state: List[int] = []
# integers are mapped
assert integers.map(state.append, concurrency=4, via="process").count() == 10
# but the `state` of the main process is not mutated
assert state == []
.amap
can apply anasync
transformation concurrently.
π show snippet
- consumed as an
Iterable[T]
:
import httpx
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.amap(httpx.AsyncClient().get, concurrency=3)
.map(httpx.Response.json)
.map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
- consumed as an
AsyncIterable[T]
:
import asyncio
import httpx
async def main() -> None:
async with httpx.AsyncClient() as http_async_client:
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.amap(http_async_client.get, concurrency=3)
.map(httpx.Response.json)
.map(lambda poke: poke["name"])
)
assert [name async for name in pokemon_names] == ['bulbasaur', 'ivysaur', 'venusaur']
asyncio.run(main())
The
star
function decorator transforms a function that takes several positional arguments into a function that takes a tuple:
π show snippet
from streamable import star
zeros: Stream[int] = (
Stream(enumerate(integers))
.map(star(lambda index, integer: index - integer))
)
assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Applies a side effect on elements:
π show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert list(appending_integers) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Similar to
.map
:
- set the
concurrency
parameter for thread-based concurrency- set
via="process"
for process-based concurrency- set
ordered=False
for First Done First Out- The
.aforeach
operation can apply anasync
effect concurrently.
Groups into
List
s
... up to a given group
size
:
π show snippet
integers_by_5: Stream[List[int]] = integers.group(size=5)
assert list(integers_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
... and/or co-groups
by
a given key:
π show snippet
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)
assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
... and/or co-groups the elements yielded by the upstream within a given time
interval
:
π show snippet
from datetime import timedelta
integers_within_1_sec: Stream[List[int]] = (
integers
.throttle(2, per=timedelta(seconds=1))
.group(interval=timedelta(seconds=0.99))
)
assert list(integers_within_1_sec) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]
Tip
Combine the size
/by
/interval
parameters:
π show snippet
integers_by_parity_by_2: Stream[List[int]] = (
integers
.group(by=lambda n: n % 2, size=2)
)
assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]
Like
.group
, but groups into(key, elements)
tuples:
π show snippet
integers_by_parity: Stream[Tuple[str, List[int]]] = (
integers
.groupby(lambda n: "odd" if n % 2 else "even")
)
assert list(integers_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]
Tip
Then "starmap" over the tuples:
π show snippet
from streamable import star
counts_by_parity: Stream[Tuple[str, int]] = (
integers_by_parity
.map(star(lambda parity, ints: (parity, len(ints))))
)
assert list(counts_by_parity) == [("even", 5), ("odd", 5)]
Ungroups elements assuming that they are
Iterable
s (orAsyncIterable
s for.aflatten
):
π show snippet
even_then_odd_integers: Stream[int] = integers_by_parity.flatten()
assert list(even_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]
Concurrently flattens
concurrency
iterables via threads (or via coroutines for.aflatten
):
π show snippet
mixed_ones_and_zeros: Stream[int] = (
Stream([[0] * 4, [1] * 4])
.flatten(concurrency=2)
)
assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1]
Keeps only the elements that satisfy a condition:
π show snippet
even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)
assert list(even_integers) == [0, 2, 4, 6, 8]
Removes duplicates:
π show snippet
distinct_chars: Stream[str] = Stream("foobarfooo").distinct()
assert list(distinct_chars) == ["f", "o", "b", "a", "r"]
specifying a deduplication
key
:
π show snippet
strings_of_distinct_lengths: Stream[str] = (
Stream(["a", "foo", "bar", "z"])
.distinct(len)
)
assert list(strings_of_distinct_lengths) == ["a", "foo"]
Warning
During iteration, all distinct elements that are yielded are retained in memory to perform deduplication. However, you can remove only consecutive duplicates without a memory footprint by setting consecutive_only=True
:
π show snippet
consecutively_distinct_chars: Stream[str] = (
Stream("foobarfooo")
.distinct(consecutive_only=True)
)
assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"]
Ends iteration once a given number of elements have been yielded:
π show snippet
five_first_integers: Stream[int] = integers.truncate(5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]
or
when
a condition is satisfied:
π show snippet
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]
If both
count
andwhen
are set, truncation occurs as soon as either condition is met.
Skips the first specified number of elements:
π show snippet
integers_after_five: Stream[int] = integers.skip(5)
assert list(integers_after_five) == [5, 6, 7, 8, 9]
or skips elements
until
a predicate is satisfied:
π show snippet
integers_after_five: Stream[int] = integers.skip(until=lambda n: n >= 5)
assert list(integers_after_five) == [5, 6, 7, 8, 9]
If both
count
anduntil
are set, skipping stops as soon as either condition is met.
Catches a given type of exception, and optionally yields a
replacement
value:
π show snippet
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replacement=float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
You can specify an additional
when
condition for the catch:
π show snippet
import requests
from requests.exceptions import ConnectionError
status_codes_ignoring_resolution_errors: Stream[int] = (
Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
.map(requests.get, concurrency=2)
.catch(ConnectionError, when=lambda error: "Max retries exceeded with url" in str(error))
.map(lambda response: response.status_code)
)
assert list(status_codes_ignoring_resolution_errors) == [200, 404]
It has an optional
finally_raise: bool
parameter to raise the first exception caught (if any) when the iteration terminates.
Tip
Leverage when
to apply side effects on catch:
π show snippet
errors: List[Exception] = []
def store_error(error: Exception) -> bool:
errors.append(error) # applies effect
return True # signals to catch the error
integers_in_string: Stream[int] = (
Stream("012345foo6789")
.map(int)
.catch(ValueError, when=store_error)
)
assert list(integers_in_string) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(errors) == len("foo")
Limits the number of yields
per
time interval:
π show snippet
from datetime import timedelta
three_integers_per_second: Stream[int] = integers.throttle(3, per=timedelta(seconds=1))
# takes 3s: ceil(10 integers / 3 per_second) - 1
assert list(three_integers_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Logs the progress of iterations:
π show snippet
>>> assert list(integers.throttle(2, per=timedelta(seconds=1)).observe("integers")) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
INFO: [duration=0:00:00.001793 errors=0] 1 integers yielded
INFO: [duration=0:00:00.004388 errors=0] 2 integers yielded
INFO: [duration=0:00:01.003655 errors=0] 4 integers yielded
INFO: [duration=0:00:03.003196 errors=0] 8 integers yielded
INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded
Note
To avoid flooding, logs are emitted only when the number of yielded elements (or errors) reaches powers of 2.
Tip
To mute these logs, set the logging level above INFO
:
π show snippet
import logging
logging.getLogger("streamable").setLevel(logging.WARNING)
Concatenates streams:
π show snippet
assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9]
Use the standard
zip
function:
π show snippet
from streamable import star
cubes: Stream[int] = (
Stream(zip(integers, integers, integers)) # Stream[Tuple[int, int, int]]
.map(star(lambda a, b, c: a * b * c)) # Stream[int]
)
assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration:
.count
iterates over the stream until exhaustion and returns the number of elements yielded:
π show snippet
assert integers.count() == 10
The
.acount
(async
method) iterates over the stream as anAsyncIterable
until exhaustion and returns the number of elements yielded:
π show snippet
assert asyncio.run(integers.acount()) == 10
Calling the stream iterates over it until exhaustion, and returns it:
π show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert appending_integers() is appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Awaiting the stream iterates over it as an
AsyncIterable
until exhaustion, and returns it:
π show snippet
async def test_await() -> None:
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
appending_integers is await appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncio.run(test_await())
Calls a function, passing the stream as first argument, followed by
*args/**kwargs
if any (inspired by the.pipe
from pandas or polars):
π show snippet
import pandas as pd
(
integers
.observe("ints")
.pipe(pd.DataFrame, columns=["integer"])
.to_csv("integers.csv", index=False)
)
Tip
If any of the operations raises an exception, you can resume the iteration after handling it:
π show snippet
from contextlib import suppress
casted_ints: Iterator[int] = iter(
Stream("0123_56789")
.map(int)
.group(3)
.flatten()
)
collected: List[int] = []
with suppress(ValueError):
collected.extend(casted_ints)
assert collected == [0, 1, 2, 3]
collected.extend(casted_ints)
assert collected == [0, 1, 2, 3, 5, 6, 7, 8, 9]
Tip
A Stream
can be visited via its .accept
method: implement a custom visitor by extending the abstract class streamable.visitors.Visitor
:
π show snippet
from streamable.visitors import Visitor
class DepthVisitor(Visitor[int]):
def visit_stream(self, stream: Stream) -> int:
if not stream.upstream:
return 1
return 1 + stream.upstream.accept(self)
def depth(stream: Stream) -> int:
return stream.accept(DepthVisitor())
assert depth(Stream(range(10)).map(str).foreach(print)) == 3
Tip
The Stream
's methods are also exposed as functions:
π show snippet
from streamable.functions import catch
inverse_integers: Iterator[int] = map(lambda n: 1 / n, range(10))
safe_inverse_integers: Iterator[int] = catch(inverse_integers, ZeroDivisionError)
Many thanks to our contributors!
Feel very welcome to help us improve streamable
via issues and PRs, check CONTRIBUTING.md.