8000 GitHub - Yshayy/hi-stream: A repository for hi-stream library
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Yshayy/hi-stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

hi-stream 🚀

Supercharge your Web Streams with familiar, powerful operators!

Welcome to hi-stream! If you're working with Web Streams (specifically ReadableStream), hi-stream makes your life easier. It provides a rich set of higher-order utility functions inspired by reactive programming libraries like RxJS and IxJS, allowing you to manipulate streams with operators like map, filter, flatMap, scan, and many more.

hi-stream embraces modern JavaScript features, leveraging native TransformStream for performance and integrating seamlessly with Promises and async/await iteration. It's designed with a functional approach, offering curried operators and utilities like pipe for elegant, point-free style programming.

✨ Why Choose hi-stream?

  • Simplified Stream Manipulation: Forget manual stream plumbing. Use intuitive operators you might already know from libraries like RxJS/IxJS.
  • Modern & Compatible: Built for today's JavaScript environments:
  • Async Native: Works perfectly with async function* and the for await...of loop, the standard way to consume Web Streams.
  • Performant: Utilizes the browser's (or runtime's) built-in TransformStream mechanism for efficient data processing.
  • Functional Programming Friendly: All operators are automatically curried, enabling easy composition using functions like pipe.
  • LLM & AI Ready: Web Streams are increasingly used by AI/LLM libraries (like Langchain, OpenAI SDK) to handle streaming responses. hi-stream makes processing these streams straightforward.
  • TypeScript First: Written in TypeScript for strong typing and improved developer experience.

📦 Installation

Choose your favorite package manager:

# npm
npm install hi-stream

# yarn
yarn add hi-stream

# pnpm
pnpm add hi-stream

📚 Usage Examples

Get started quickly with these examples:

Importing Operators

import {
  from, // Create streams from iterables, promises, etc.
  pipe, // Chain operators together
  map, filter, scan, flatMap, // Common operators
  take, skip, takeWhile, skipWhile, // Filtering/limiting operators
  zip, pairwise, // Combination operators
  toPromise, // Convert stream result back to a Promise
  curry, // Utility for currying functions (operators are pre-curried)
  // ... and more!
} from 'hi-stream';

Basic Transformation

Let's create a stream from an array, filter it, and map the values.

import { from, pipe, filter, map } from 'hi-stream';

const numbers = [1, 2, 3, 4, 5, 6];
const numberStream = from(numbers); // Create a ReadableStream

const processedStream = pipe(
  numberStream,
  filter(n => n % 2 === 0),    // Keep only even numbers
  map(n => n * 10)             // Multiply them by 10
);

for await (const value of processedStream) {
  console.log(value);
  // Output:
  // 20
  // 40
  // 60
}

Processing a Stream of Objects (e.g., Tweets)

This example demonstrates chaining multiple operators to process a stream resembling real-world data, like tweets.

import { from, pipe, filter, map, scan } from 'hi-stream';

// Simulate a stream of tweet objects
const tweets = [
  { id: 1, text: 'Hello world', likes: 10 },
  { id: 2, text: 'Hi there', likes: 5 },
  { id: 3, text: 'JavaScript is awesome', likes: 20 },
  { id: 4, text: 'TypeScript is great', likes: 15 },
];

const tweetStream = from(tweets); // Create stream from array

// Define the processing pipeline
const processedTweets = pipe(
  tweetStream,
  filter(tweet => tweet.likes > 10), // Only keep tweets with more than 10 likes
  map(tweet => ({ ...tweet, text: tweet.text.toUpperCase() })), // Uppercase the text
  // Accumulate results: scan emits the intermediate accumulated array for each input item
  scan((accumulator, currentTweet) => [...accumulator, currentTweet], [])
);
for await (const chunk of processedTweets) {
  // scan emits the accumulated array at each step
  console.log('Current accumulated popular tweets:', chunk);
}
console.log('Stream finished.');

/* Output:
Processing tweet stream...
Current accumulated popular tweets: [ { id: 3, text: 'JAVASCRIPT IS AWESOME', likes: 20 } ]
Current accumulated popular tweets: [ { id: 3, text: 'JAVASCRIPT IS AWESOME', likes: 20 }, { id: 4, text: 'TYPESCRIPT IS GREAT', likes: 15 } ]
Stream finished.
*/

Operators

buffer

Signature:

export function buffer<T>(predicate: (chunk: T) => boolean): (readableStream: ReadableStream<T>) => ReadableStream<T[]>;

Description: Buffers chunks in the readable stream based on a predicate function.

Example
const stream = from([1, 2, 3, 4, 5]);
const resultStream = pipe(stream, buffer(x => x % 2 === 0));
await toPromise(resultStream); // Output: [[1, 2], [3, 4], [5]]

bufferCount

Signature:

export function bufferCount<T>(count: number): (readableStream: ReadableStream<T>) => ReadableStream<T[]>;

Description: Buffers a specified number of items in the readable stream.

Example
const stream = from([1, 2, 3, 4, 5]);
const resultStream = pipe(stream, bufferCount(2));
await toPromise(resultStream); // Output: [[1, 2], [3, 4], [5]]

filter

Signature:

export function filter<T>(predicate: (chunk: T) => boolean | Promise<boolean>): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Filters chunks in the readable stream based on a predicate function.

Example
const stream = from([1,2,3])
await pipe(stream, filter(x=>x%2===0), toPromise) // Output: [2]

flatMap

Signature:

export function flatMap<T, R>(fn: (chunk: T) => R[]): (readableStream: ReadableStream<T>) => ReadableStream<R>;

Description: Applies a given function to each chunk in the readable stream and flattens the result.

Example
const stream = from([1,2,3])
await pipe(stream, flatMap(x=>[x,x*2]), toPromise) // Output: [1,2,2,4,3,6]

map

Signature:

export function map<T, R>(fn: (chunk: T) => R | Promise<R>): (readableStream: ReadableStream<T>) => ReadableStream<R>;

Description: Applies a given function to each chunk in the readable stream.

Example
const stream = from([1,2,3])
await pipe(stream, map(x=>x*2), toPromise) // Output: [2,4,6]

merge

Signature:

export function merge<T>(streams: ReadableStream<T>[]);

Description: Merges multiple readable streams into a single readable stream.

Example
const stream1 = from([1, 2, 3]);
const stream2 = from(['a', 'b', 'c']);
const mergedStream = merge([stream1, stream2]);
for await (const chunk of mergedStream) {
  console.log(chunk);
  // Output:
  // 1
  // 'a'
  // 2
  // 'b'
  // 3
  // 'c'
}

pairwise

Signature:

export function pairwise<T>(): (readableStream: ReadableStream<T>) => ReadableStream<[T, T]>;

Description: Emits pairs of consecutive chunks from the readable stream.

Example
const stream = from([1,2,3])
await pipe(stream, pairwise(), toPromise) // Output: [[1,2],[2,3]]

scan

Signature:

export function scan<T, R>(fn: (acc: R, chunk: T) => R, initialValue: R): (readableStream: ReadableStream<T>) => ReadableStream<R>;

Description: Applies a given function to each chunk in the readable stream, accumulating the result.

Example
const stream = from([1,2,3])
await pipe(stream, scan((acc,x)=>acc+x,0), toPromise) // Output: [1,3,6]

skip

Signature:

export function skip<T>(count: number): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Skips a specified number of chunks in the readable stream.

Example
const stream = from([1,2,3,4])
await pipe(stream, skip(2), toPromise) // Output: [3,4]

skipUntil

Signature:

export function skipUntil<T>(predicate: (chunk: T)
9454
 => boolean): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Skips chunks in the readable stream until a predicate function is true.

Example
const stream = from([1,2,3,4])
await pipe(stream, skipUntil(x=>x>=3), toPromise) // Output: [3,4]

skipWhile

Signature:

export function skipWhile<T>(predicate: (chunk: T) => boolean): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Skips chunks in the readable stream while a predicate function is true.

Example
const stream = from([1,2,3,4])
await pipe(stream, skipWhile(x=>x<3), toPromise) // Output: [3,4]

take

Signature:

export function take<T>(count: number): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Takes a specified number of chunks from the readable stream.

Example
const stream = from([1,2,3,4])
await pipe(stream, take(2), toPromise) // Output: [1,2]

takeUntil

Signature:

export function takeUntil<T>(predicate: (chunk: T) => boolean): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Takes chunks from the readable stream until a predicate function is true.

Example
const stream = from([1,2,3,4])
await pipe(stream, takeUntil(x=>x>=3), toPromise) // Output: [1,2]

takeWhile

Signature:

export function takeWhile<T>(predicate: (chunk: T) => boolean): (readableStream: ReadableStream<T>) => ReadableStream<T>;

Description: Takes chunks from the readable stream while a predicate function is true.

Example
```ts
const stream = from([1,2,3,4])
await pipe(stream, takeWhile(x=>x<3), toPromise) // Output: [1,2]

</details>

### zipStreams

**Signature:**
```ts
export function zipStreams<T>(streams: ReadableStream<T>[], readableStream: ReadableStream<T>): ReadableStream<T[]>;

Description: Combines chunks from multiple streams into a single stream.

Example
const stream1 = from([1,2,3])
const stream2 = from(['a','b','c'])
await pipe(stream1, zip(stream2), toPromise) // Output: [[1,'a'],[2,'b'],[3,'c']]

Project Status and Contributions 🚧

This project is in its early stages but is functional and working. It has been generated completely by the Copilot workspace AI agent. Contributions are welcome, and you can open issues for any bugs or feature requests. The implementation is driven and maintained by AI with human reviews.

About

A repository for hi-stream library

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published
0