8000 GitHub - frostyplanet/crossfire-rs: A lockless mpmc/mpsc to support async base on crossbeam
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

frostyplanet/crossfire-rs

Repository files navigation

Crossfire

Build Status License Cargo Documentation Rust 1.36+

High-performance spsc/mpsc/mpmc channels.

It supports async context, or communicates between async-blocking context.

Implemented with lockless in mind, low level is based on crossbeam-channel. For the concept, please refer to wiki.

Stability and versions

Crossfire v1.0 has been released and used in production since 2022.12. Heavily tested on X86_64 and ARM.

V2.0 has refactored the codebase and API at 2025.6. By removing generic types of ChannelShared object in sender and receiver, it's easier to remember and code.

V2.0.x branch will remain in maintenance mode. Further optimization might be in v2.x_beta version until long run tests prove to be stable.

Performance

We focus on optimization of async logic, outperforming other async capability channels (flume, tokio::mpsc, etc) in most cases.

Due to context switching between sleep and wake, there is a certain overhead on async context over crossbeam-channel which in blocking context.

Benchmark is written in criterion framework. You can run benchmark by:

cargo bench

More benchmark data is on wiki. Here are some of the results:

mpmc bounded size 100 async context

mpsc unbounded async context

APIs

modules and functions

There are 3 modules: [spsc], [mpsc], [mpmc], providing functions to allocate different types of channels.

For SP or SC, it's more memory efficient than MP or MC implementations, and sometimes slightly faster.

The return types in these 3 modules are different:

  • [mpmc::bounded_async()]: (tx async, rx async)

  • [mpmc::bounded_tx_async_rx_blocking()]

  • [mpmc::bounded_tx_blocking_rx_async()]

  • [mpmc::unbounded_async()]

NOTE : For bounded channel, 0 size case is not supported yet. (Temporary rewrite as 1 size).

Types

Context Sender (Producer) Receiver (Consumer)
Single Multiple Single Multiple
Blocking BlockingTxTrait BlockingRxTrait
Tx MTx Rx MRx
Async AsyncTxTrait AsyncRxTrait
AsyncTx MAsyncTx AsyncRx MAsyncRx

NOTE: For SP / SC version [AsyncTx] and [AsyncRx], although not designed to be not cloneable, send() recv() use immutable &self for convenient reason. Be careful do not use the SP/SC concurrently when put in Arc.

Error types

Error types are re-exported from crossbeam-channel: [TrySendError], [SendError], [TryRecvError], [RecvError]

Async compatibility

Mainly tested on tokio-1.x.

In async context, future-select! can be used. Cancelling is supported. You can combine send() or recv() future with tokio::time::timeout.

While using MAsyncTx or MAsyncRx, there's memory overhead to pass along small size wakers for pending async producer or consumer. Because we aim to be lockless, when the sending/receiving futures are cancelled (like tokio::time::timeout()), might trigger immediate cleanup if non-conflict conditions are met. Otherwise will rely on lazy cleanup. (waker will be consumed by actual message send and recv).

Never the less, for close notification without sending anything, I suggest that use tokio::sync::oneshot instead.

Usage

Cargo.toml:

[dependencies]
crossfire = "2.0"

example:

extern crate crossfire;
use crossfire::*;

#[tokio::main]
async main() {
    let (tx, rx) = mpsc::bounded_async::<i32>(100);
    tokio::spawn(async move {
       for i in 0i32..10000 {
           let _ = tx.send(i).await;
           println!("sent {}", i);
       }
    });
    loop {
        if let Ok(_i) = rx.recv().await {
            println!("recv {}", _i);
        } else {
            println!("rx closed");
            break;
        }
    }
}

About

A lockless mpmc/mpsc to support async base on crossbeam

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  
0