8000 GitHub - mila411/pilgrimage: Create Kafaka-like things while enjoying low-level programming without external crates
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Create Kafaka-like things while enjoying low-level programming without external crates

License

Notifications You must be signed in to change notification settings

mila411/pilgrimage

Repository files navigation

logo

I will draw a picture of a crab someday

DeepSource

pilgrimage

Pilgrimage is a Rust implementation of a distributed messaging system inspired by Apache Kafka. It records messages to local files and supports At-least-once and Exactly-once delivery semantics.


Table of contents


Installation

To use Pilgrimage, add the following to your Cargo.toml:

[dependencies]
pilgrimage = "0.14.0"

Security

When using Pilgramage as a Crate, client authentication is implemented, but at present, authentication is not implemented for message sending and receiving from the CLI and web client. You can find a sample of authentication with Crate examples/auth-example.rs, examples/auth-send-recv.rs.


Features

  • Topic-based pub/sub model
  • Scalability through partitioning
  • Persistent messages (log file based)
  • Leader/Follower Replication
  • Fault Detection and Automatic Recovery
  • Delivery guaranteed by acknowledgement (ACK)
  • Fully implemented leader selection mechanism
  • Partition Replication
  • Persistent messages
  • Schema Registry for managing message schemas and ensuring compatibility
  • Automatic Scaling
  • Broker Clustering
  • Message processing in parallel
  • Authentication and Authorization Mechanisms
  • Data Encryption
  • CLI based console
  • WEB based console
  • Support AMQP

Basic Usage

use pilgrimage::broker::{Broker, MessageSchema, TopicConfig};
use pilgrimage::schema::registry::SchemaRegistry;
use chrono::Utc;
use uuid::Uuid;
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Creating a schema registry
    let schema_registry = SchemaRegistry::new();
    let schema_def = r#"{"type":"record","name":"test","fields":[{"name":"id","type":"string"}]}"#;
    schema_registry.register_schema("test_topic", schema_def)?;

    // Creating a Broker
    let broker = Arc::new(Mutex::new(Broker::new("broker1", 3, 2, "logs")));

    // Creating topics (setting the number of partitions and replication factors)
    {
        let mut broker = broker.lock().unwrap();
        let config = TopicConfig {
            num_partitions: 2,
            replication_factor: 1,
            ..Default::default()
        };
        broker.create_topic("test_topic", Some(config))?;
    }

    // Message Receiving Thread
    let broker_clone = Arc::clone(&broker);
    let receiver = tokio::spawn(async move {
        for _ in 0..5 {
            if let Ok(broker) = broker_clone.lock() {
                if let Ok(Some(message)) = broker.receive_message("test_topic", 0) {
                    println!("reception: {}", message.content);
                }
            }
            sleep(Duration::from_millis(100)).await;
        }
    });

    // Message Sending Process
    for i in 1..=5 {
        let message = MessageSchema::new()
            .with_content(format!("Message {}", i))
            .with_topic("test_topic".to_string())
            .with_partition(0);

        if let Ok(mut broker) = broker.lock() {
            broker.send_message(message)?;
            println!("Send: Message {}", i);
        }
        sleep(Duration::from_millis(100)).await;
    }

    // Wait for receiving thread to terminate
    receiver.await?;
    Ok(())
}

Dependency

  • Rust 1.51.0 or later

Functionality Implemented

  • Message Queue: Efficient message queue implementation using Mutex and VecDeque.
  • Broker: Core broker functionality including message handling, node management, and leader election.
  • Consumer Groups: Support for consumer groups to allow multiple consumers to read from the same topic.
  • Leader Election: Mechanism for electing a leader among brokers to manage partitions and replication.
  • Storage: Persistent storage of messages using local files.
  • Replication: Replication of messages across multiple brokers for fault tolerance.
  • Schema Registry: Management of message schemas to ensure compatibility between producers and consumers.
  • Benchmarking: Comprehensive benchmarking tests to measure performance of various components.
  • Automatic Scaling: Automatically scale the number of instances based on load.
  • Log Compressions: Compress and optimize logs.

Examples

To execute a basic example, use the following command:

cargo run --example ack-transaction
cargo run --example amqp-send-recv
cargo run --example auth-send-recv
cargo run --example batch-transaction
cargo run --example broker-integration-test
cargo run --example idempotency-test
cargo run --example improved-transaction
cargo run --example persistent-ack

Benchmarks

Pilgrimage includes a comprehensive suite of benchmarks to measure performance in a variety of scenarios.

execution method

cargo bench

Benchmark Category

  1. Message Sending - Transmission performance with different message sizes

    • Small messages (~12 bytes): ~6.0 µs
    • Medium messages (1KB): ~16.2 µs
    • Large messages (10KB): ~19.6 µs
  2. Message Receiving - Message reception performance

    • Average receive time: ~82.7 µs
  3. Topic Operations - Topic Management Operations

    • Topic creation: ~1.6 µs
    • Topic listing: ~652 ms (warning: slow operation)
  4. Partition Operations - Transmission performance by partition

    • 1 partition: ~7.2 µs
    • 2 partitions: ~13.9 µs
    • 4 partitions: ~28.5 µs
    • 8 partitions: ~54.7 µs
  5. Concurrent Operations - Parallel transmission and reception performance

    • Send + Receive: ~5.5 ms
  6. Throughput Testing - Batch Processing Performance

    • 10 messages: ~69.0 µs
    • 100 messages: ~693 µs
    • 1000 messages: ~6.7 ms

Checking Reports

After the benchmark is run, a detailed HTML report is generated in target/criterion/report/index.html.

If the allocated memory is small, it may fail.

Gnuplot not found, using plotters backend
Benchmarking message_sending/send_message/small: Collecting 100 samples in estimated                                                                                    message_sending/send_message/small
                        time:   [5.9308 µs 6.0175 µs 6.1085 µs]
Found 7 outliers among 100 measurements (7.00%)
  5 (5.00%) high mild
  2 (2.00%) high severe
Benchmarking message_sending/send_message/medium: Collecting 100 samples in estimate                                                                                    message_sending/send_message/medium
                        time:   [15.097 µs 16.234 µs 17.513 µs]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
Benchmarking message_sending/send_message/large: Collecting 100 samples in estimated                                                                                    message_sending/send_message/large
                        time:   [19.128 µs 19.556 µs 20.154 µs]
Found 13 outliers among 100 measurements (13.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild
  6 (6.00%) high severe

Benchmarking message_receiving/receive_message: Collecting 100 samples in estimated                                                                                     message_receiving/receive_message
                        time:   [82.541 µs 82.696 µs 82.857 µs]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  8 (8.00%) high mild
  1 (1.00%) high severe

Benchmarking topic_operations/create_topic: Collecting 100 samples in estimated 5.00                                                                                    topic_operations/create_topic
                        time:   [1.4150 µs 1.5777 µs 1.9184 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
Benchmarking topic_operations/list_topics: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 65.9s, or reduce sample count to 10.
Benchmarking topic_operations/list_topics: Collecting 100 samples in estimated 65.86                                                                                    topic_operations/list_topics
                        time:   [648.33 ms 652.26 ms 656.59 ms]
Found 13 outliers among 100 measurements (13.00%)
  10 (10.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe

Benchmarking partition_operations/send_to_partitions/1: Collecting 100 samples in es                                                                                    partition_operations/send_to_partitions/1
                        time:   [6.8442 µs 7.1557 µs 7.5364 µs]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
Benchmarking partition_operations/send_to_partitions/2: Collecting 100 samples in es                                                                                    partition_operations/send_to_partitions/2
                        time:   [13.501 µs 13.921 µs 14.396 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
Benchmarking partition_operations/send_to_partitions/4: Collecting 100 samples in es                                                                                    partition_operations/send_to_partitions/4
                        time:   [27.957 µs 28.450 µs 28.927 µs]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
Benchmarking partition_operations/send_to_partitions/8: Collecting 100 samples in es                                                                                    partition_operations/send_to_partitions/8
                        time:   [53.335 µs 54.671 µs 55.994 µs]

Benchmarking concurrent_operations/concurrent_send_receive: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.0s, enable flat sampling, or reduce sample count to 50.
Benchmarking concurrent_operations/concurrent_send_receive: Collecting 100 samples i                                                                                    concurrent_operations/concurrent_send_receive
                        time:   [5.2259 ms 5.5166 ms 5.7806 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking throughput/batch_send/10: Collecting 10 samples in estimated 5.0011 s (                                                                                    throughput/batch_send/10
                        time:   [67.767 µs 68.967 µs 70.823 µs]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
Benchmarking throughput/batch_send/100: Collecting 10 samples in estimated 5.0197 s                                                                                     throughput/batch_send/100
                        time:   [687.47 µs 693.34 µs 700.38 µs]
Benchmarking throughput/batch_send/1000: Collecting 10 samples in estimated 5.0580 s                                                                                    throughput/batch_send/1000
                        time:   [6.6636 ms 6.7133 ms 6.7860 ms]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low severe

To run the benchmark on your local machine, use the command:

cargo bench

CLI Features

Pilgrimage offers a comprehensive Command-Line Interface (CLI) to manage and interact with your message brokers efficiently. Below are the available commands along with their descriptions and usage examples.

start

Description: Starts the broker with the specified configurations.

Usage:

pilgrimage start --id <BROKER_ID> --partitions <NUMBER_OF_PARTITIONS> --replication <REPLICATION_FACTOR> --storage <STORAGE_PATH> [--test-mode]

Options:

  • --id, -i (required): Sets the broker ID.
  • --partitions, -p (required): Sets the number of partitions.
  • --replication, -r (required): Sets the replication factor.
  • --storage, -s (required): Sets the storage path.
  • --test-mode: Runs the broker in test mode, which breaks out of the main loop quickly for testing purposes.

Example:

pilgrimage start --id broker1 --partitions 3 --replication 2 --storage /data/broker1 --test-mode

stop

Description: Stops the specified broker.

Usage

pilgrimage stop --id <BROKER_ID>

Options:

  • --id, -i (required): Sets the broker ID.

Example

pilgrimage stop --id broker1

send

Description:

Sends a message to a topic.

Usage

pilgrimage send --topic <TOPIC> --message <MESSAGE> [--schema <SCHEMA>] [--compatibility <COMPATIBILITY>]

Options:

  • --topic, -t (required): Specifies the topic name.
  • --message, -m (required): Specifies the message to send.
  • --schema, -s (optional): Specifies the path to a schema file. If not specified, an existing schema will be used.
  • --compatibility, -c (optional): Specifies the schema compatibility level (BACKWARD, FORWARD, FULL, NONE).

Example

pilgrimage send --topic test_topic --message "Hello, World!"

consume

Description:

Consumes messages from a broker.

Usage

pilgrimage consume --id <BROKER_ID> [--topic <TOPIC>] [--partition <PARTITION>] [--group <GROUP>]

Options:

  • --id, -i (required): Specifies the broker ID.
  • --topic, -t (optional): Specifies the topic name.
  • --partition, -p (optional): Specifies the partition number.
  • --group, -g (optional): Specifies the consumer group ID.

Example:

pilgrimage consume --id broker1 --topic test_topic --partition 0

status

Description:

Checks the status of the specified broker.

Usage:

pilgrimage status --id <BROKER_ID>

Options:

  • --id, -i (required): Sets the broker ID.

Example:

pilgrimage status --id broker1

schema

Description:

Manages message schemas for topics.

Subcommands:

  1. register - Register a new schema for a topic

    Usage:

    pilgrimage schema register --topic <TOPIC> --schema <SCHEMA_FILE> [--compatibility <COMPATIBILITY>]

    Options:

    • --topic, -t (required): Specifies the topic name to register the schema for.
    • --schema, -s (required): Specifies the path to the schema file.
    • --compatibility, -c (optional): Specifies the schema compatibility level (BACKWARD, FORWARD, FULL, NONE).

    Example:

    pilgrimage schema register --topic test_topic --schema ./schemas/test_schema.json --compatibility BACKWARD

  2. list - List all schemas for a topic

    Usage:

    pilgrimage schema list --topic <TOPIC>

    Options:

    • --topic, -t (required): Specifies the topic name.

    Example:

    pilgrimage schema list --topic test_topic

Additional Information

  • Help Command: To view all available commands and options, use the help command:

pilgrimage help

  • Version Information: To check the current version of Pilgrimage, use:

pilgrimage --version

Running the CLI

To run the CLI application:

cargo run --bin pilgrimage -- [COMMAND] [OPTIONS]

Examples:

# Start a broker
cargo run --bin pilgrimage -- start --id broker1 --partitions 3 --replication 2 --storage ./storage/broker1

# Send a message to a topic
cargo run --bin pilgrimage -- send --topic test_topic --message "Hello, World!"

# Consume messages from a broker
cargo run --bin pilgrimage -- consume --id broker1 --topic test_topic

Web Console API

Pilgrimage provides a REST API for managing brokers through HTTP requests. The server runs on http://localhost:8080 by default.

Available Endpoints

Start Broker

Starts a new broker instance.

Endpoint: POST /start Request:

{
    "id": "broker1",
    "partitions": 3,
    "replication": 2,
    "storage": "/tmp/broker1"
}

Example:

curl -X POST http://localhost:8080/start \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1",
    "partitions": 3,
    "replication": 2,
    "storage": "/tmp/broker1"
  }'

Stop Broker

Stops a running broker instance.

Endpoint: POST /stop Request:

{
    "id": "broker1"
}

Example:

curl -X POST http://localhost:8080/stop \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1"
  }'

Send Message

Sends a message to the broker. If the specified topic does not exist, it will be created automatically.

Endpoint: POST /send Request:

{
    "id": "broker1",
    "topic": "custom-topic",
    "message": "Hello, World!"
}

Example:

curl -X POST http://localhost:8080/send \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1",
    "topic": "custom-topic",
    "message": "Hello, World!"
  }'

Consume Messages

Consumes messages from the broker. If the specified topic does not exist, it will be created automatically.

Endpoint: POST /consume

Request:

{
    "id": "broker1",
    "topic": "custom-topic",
    "partition": 0
}

Example:

# Default topic (default_topic)
curl -X POST http://localhost:8080/consume \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1"
  }'

# Custom topic
curl -X POST http://localhost:8080/consume \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1",
    "topic": "custom-topic"
  }'

Check Status

Checks the status of the broker.

Endpoint: POST /status

Request:

{
    "id": "broker1"
}

Example:

curl -X POST http://localhost:8080/status \
  -H "Content-Type: application/json" \
  -d '{
    "id": "broker1"
  }'

Running the Web Server

To start the web server:

cargo run --bin web

The server will be available at http://localhost:8080.

Version increment on release

  • The commit message is parsed and the version of either major, minor or patch is incremented.
  • The version of Cargo.toml is updated.
  • The updated Cargo.toml is committed and a new tag is created.
  • The changes and tag are pushed to the remote repository.

The version is automatically incremented based on the commit message. Here, we treat feat as minor, fix as patch, and BREAKING CHANGE as major.

License

MIT

About

Create Kafaka-like things while enjoying low-level programming without external crates

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Contributors 5

Languages

0