8000 GitHub - zip01/FastMiniMQ: 轻量,高效。FastMiniMQ 是一个消息队列。非常轻,源代码大约200Kb,运行程序包大约 5M。非常快,8核16G内存单机消息生产和消费均可稳定维持 TPS 100w/s 左右。
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

轻量,高效。FastMiniMQ 是一个消息队列。非常轻,源代码大约200Kb,运行程序包大约 5M。非常快,8核16G内存单机消息生产和消费均可稳定维持 TPS 100w/s 左右。

License

Notifications You must be signed in to change notification settings

zip01/FastMiniMQ

 
 

Repository files navigation

FastMiniMQ

Build Status img Coverage Status License

轻量,高效。FastMiniMQ 是一个消息队列。非常轻,源代码大约200Kb,运行程序包大约 5M。非常快,8核16G内存单机消息生产和消费均可稳定维持 TPS 100w/s 左右。看看我们是怎么做的

API Example

  • MQProducer Send Message Example
// 创建 Broker 集群元数据对象(集群名称::消息服务节点名称@节点地址;消息服务节点名称@节点地址;...)
MQClusterMetaData clusterMetaData = MQRegistry.loadClusterMetaData("cluster-test::broker-test@127.0.0.1:6001;");
// 创建 Producer 配置对象(生产者名称,连接集群名称)
MQProducerCfg producerCfg = new MQProducerCfg("producer-test", "cluster-test");
MQProducer producer = new MQProducer(producerCfg, clusterMetaData);
// 启动 Producer
producer.start();

String topic = "testTopic";
String body = "hello world!";
// 发送主题消息
MQFuture<MQRecord> future = producer.sendMsg(topic, body.getBytes());
// 设置 30 秒等待
MQRecord record = future.get(30, TimeUnit.SECONDS);
if (record.getStatus() == Status.OK) {
    System.out.printf("[%s] send message success.%n", body);
} else {
    System.out.printf("[%s] send message failed.%n", body);
}

// 关闭 Producer
producer.shutdown();
  • MQConsumer Fetch Message Example
// 创建 Broker 集群元数据对象(集群名称::消息服务节点名称@节点地址;消息服务节点名称@节点地址;...)
MQClusterMetaData clusterMetaData = MQRegistry.loadClusterMetaData("cluster-test::broker-test@127.0.0.1:6001;");
// 创建 Producer 配置对象(消费者名称,连接集群名称,连接消息服务节点名称)
MQConsumerCfg consumerCfg = new MQConsumerCfg("consumer-test", "cluster-test", "broker-test");
MQConsumer consumer = new MQConsumer(consumerCfg, clusterMetaData);
// 启动 consumer
consumer.start();

// 创建本地消费者分组队列
MQQueue queue = new MQQueue("testTopic", "testConsumeGroup");
// 更新最新队列信息
consumer.fetchUpdate(queue);

// 请求获取消息记录
MQFuture<MQResult<List<MQRecord>>> future = consumer.fetchMsg(queue);
MQResult<List<MQRecord>> result = future.get();
List<MQRecord> recordList = result.getResult();
for (MQRecord record: recordList) {
    System.out.printf("%s,%s%n", record.getTopic(), new String(record.getBody()));
}

// 确认消费
consumer.waitAck(queue);

// 关闭 consumer
consumer.shutdown();

更多示例看:fastminimq-examples

Requirements

  • Java 8+
  • slf4j library
  • netty library

Benchmarks

一个 producer 节点(4核8G内存),一个 broker 节点(8核16G内存),一个 consumer 节点(4核8G内存),256 个消息 topic,64 字节消息 body。

  • 结构图

  • 构建
mvn clean install
  • 运行 broker(打印 gc 详细日志)
nohup java -Ddata.dir=/data/fastminimq \
-Xmx12G -Xms12G -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:broker.gc.log \
-cp FastMiniMQBroker.jar org.nopasserby.fastminimq.FastMiniMQBroker > broker.log &
  • 运行 consumer(打印 gc 详细日志)
java -Dip=172.31.0.128 \
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:consumer.gc.log \
-cp ./FastMiniMQBroker.jar:./FastMiniMQ-0.13.1-SNAPSHOT-tests.jar org.nopasserby.fastminimq.benchmark.ConsumerThroughputTest
  • 运行 producer(打印 gc 详细日志)
java -Dip=172.31.0.128 -DmessageSize=64 \
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:producer.gc.log \
-cp ./FastMiniMQBroker.jar:./FastMiniMQ-0.13.1-SNAPSHOT-tests.jar org.nopasserby.fastminimq.benchmark.ProducerThroughputTest
  • 在阿里云上的测试结果

服务器参数:

规格 vCPU 处理器型号 内存(GiB) 本地存储(GiB) 网络基础带宽能力(出/入)(Gbit/s) 网络突发带宽能力(出/入)(Gbit/s) 网络收发包能力(出+入)(万PPS) 连接数(万) 多队列 云盘最大IOPS 云盘最大吞吐量(MB/s) 云盘带宽(Gbit/s)
ecs.c6.xlarge 4 Intel Xeon(Cascade Lake) Platinum 8269CY 8 高效云盘 40 G 1.5 5.0 50 最高25 4 5000 140 1.5
ecs.c6.2xlarge 8 Intel Xeon(Cascade Lake) Platinum 8269CY 16 高效云盘 40 G + 1.5T 2.5 8.0 80 最高25 8 5000 140 2

操作系统:CentOS 7.6 64bit

Maven 版本:Apache Maven 3.2.5

编译 JDK 版本:Oracle jdk1.8.0_22164

运行环境 JDK 版本:java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64

FastMiniMQ 基准测试报告:

磁盘IO读写 [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

网络IO进出 [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

CPU消耗 [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

内存占用 [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

磁盘 IOPS [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

GC Duration Time [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

GC Causes [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

GC Statistics [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

Producer Statistics TPS [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

Consumer Statistics TPS [时段 2020-08-02 12:45:00 - 2020-08-02 15:50:00]

更多测试报告看:fastminimq-testcase-list

Features

  1. 支持从指定时间开始消费
  2. 支持定时延迟消息
  3. 支持事务消息

Todo

  1. 集群管理
  2. Raft 多副本
  3. 自动扩容和负载均衡

License

Apache License, Version 2.0 Copyright (C) Guo Chaosheng

About

轻量,高效。FastMiniMQ 是一个消息队列。非常轻,源代码大约200Kb,运行程序包大约 5M。非常快,8核16G内存单机消息生产和消费均可稳定维持 TPS 100w/s 左右。

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%
0