本篇文章介绍一下 Rust 使用消息中间件 kafka。
Rust 接入 kafka 的库中,使用最多的是 rdkafka。本篇文章就来介绍一下 rdkafka,以及封装一个开箱即用的 simple-kafka 库。
rdkafka
rdkafka 是 Rust 的 Kafka 客户端库,提供了与 Kafka 的低级别交互。它是对 librdkafka C 库的绑定,提供了高性能和可靠性。rdkafka 提供了广泛的功能,包括生产者和消费者 API、支持各种配置选项、消息压缩、事务等。
rdkafka 提供了高阶的 API:StreamConsumer、FutureProducer。
- StreamConsumer:负责自动轮询消费者的消息流。
- FutureProducer:为生成的每条消息返回一个 Future,Future 中可以得到消息的发送结果。
rdkafka = "0.36.0"
创建生产者
fn create_producer(brokers: &str) -> FutureProducer {
info!("create kafka producer,brokers={}", brokers);
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.set("acks", "1")
.create()
.expect("Failed to create producer");
producer
}
发送消息
有两个方法可以使用:send、send_result
-
send方法:send方法用于发送消息到 Kafka,并返回一个DeliveryFuture对象。它会尝试将消息发送到 Kafka,如果发送失败,将会在DeliveryFuture中返回一个错误。send方法会在发送过程中进行重试,如果 Kafka 的生产者队列已满,可以使用queue_timeout参数来控制重试的时间。你可以将queue_timeout设置为Timeout::Never来永远重试,或者设置为Timeout::After(0)来立即返回错误。如果重试超时并且队列仍然满,DeliveryFuture中将报告一个RDKafkaErrorCode::QueueFull错误。 -
send_result方法:send_result方法与send方法类似,同样用于发送消息到 Kafka,并返回一个DeliveryResult对象。但是,与send方法不同的是,如果消息无法入队,send_result方法会立即返回一个错误,同时返回提供的FutureRecord对象。它不会进行重试操作。
发送消息的时候需要构建一个 FutureRecord对象
let message_record = FutureRecord::to(TOPIC).key("1").payload(message.as_bytes());
let result = producer.send(message_record, Timeout::After(Duration::from_secs(3))).await;
2023-11-27 18:06:09.596 INFO ThreadId(25) rust_web::middleware::kafka: 54: create kafka producer,brokers=localhost:9092
2023-11-27 18:06:09.597 INFO ThreadId(25) rust_web::middleware::kafka: 48: send message: FutureRecord { topic: "my_topic", partition: None, payload: Some([104, 101, 108, 108, 111]), key: Some("1"), timestamp: None, headers: None }
2023-11-27 18:06:09.648 INFO ThreadId(25) rust_web::middleware::kafka: 50: send result: Ok((0, 7))
2023-11-27 18:06:09.648 INFO ThreadId(05) rust_web::middleware::kafka: 97: kafka consumer message. message = [Message { ptr: 0x10c6043b8, event_ptr: 0x10c604340 }]
2023-11-27 18:06:09.648 INFO ThreadId(05) rust_web::middleware::kafka: 111: partition = 0, offset = 7 message : "hello"
2023-11-27 18:06:09.648 INFO ThreadId(05) rust_web::middleware::kafka: 93: recv...
简单的使用 Rust 发送和接收 kafka 消息。以上代码在 github。
simple-kafka
基于以上代码,做了一个简化使用 rdkafka 组件的小工具 simple-kafka。
使用这个小工具之后不用再写一堆的创建生产者、消费者代码,极大的减少了 kafka 的接入工作。
在 main 中,通过 tokio::spawn 线程初始化 kafka 生产者及消费者就能够在 Rust 中使用 kafka 了。
let _init_task = tokio::spawn(async {
let simple_kafka_config:simple_kafka::KafkaConfig = kafka_config.to_owned().into();
simple_kafka::kafka_init::init_producers(&simple_kafka_config).await;
simple_kafka::kafka_init::init_consumers(&simple_kafka_config,"my_topic", message_handler).await;
});
simple-kafka 在 github。
小结
以上就是 Rust 使用 kafka 的简单示例,并写了一个小工具减少配置相关的代码。
