Stream Protocol Support
rabbitmq-backup provides native support for RabbitMQ Streams (queues declared with x-queue-type: stream) via the RabbitMQ Stream Protocol. This page explains how stream backup differs from AMQP backup, the offset-based consumption model, and integration details.
What Are RabbitMQ Streams?
RabbitMQ Streams, introduced in RabbitMQ 3.9, are a queue type designed for high-throughput, append-only, log-like workloads. Unlike classic and quorum queues, streams have fundamentally different characteristics:
| Property | Classic/Quorum Queue | Stream Queue |
|---|---|---|
| Model | Competing consumers (messages deleted on ack) | Append-only log (messages retained) |
| Consumption | Destructive (ack removes message) | Non-destructive (offset-based read) |
| Concurrent consumers | Competing (each message delivered to one consumer) | Independent (each consumer reads from any offset) |
| Ordering | Per-consumer FIFO | Global FIFO |
| Retention | Until acknowledged | Time-based or size-based policy |
| Protocol | AMQP 0-9-1 | RabbitMQ Stream Protocol (binary, port 5552) |
| TTL | Per-message | Per-stream policy |
| Priority | Supported | Not supported |
Streams are ideal for event sourcing, audit logs, CDC (Change Data Capture), and any workload where messages need to be replayed or read by multiple independent consumers.
Why Use the Stream Protocol for Backup?
While streams can be consumed via AMQP 0-9-1 (using basic.consume with x-stream-offset argument), the native Stream Protocol offers significant advantages for backup:
- Inherently non-destructive: Stream reads never modify or remove messages. No
redeliveredflag, nox-delivery-count, no consumer impact. - Offset-based positioning: Start reading from any offset (first, last, specific offset, or timestamp). Enables resumable backups.
- Higher throughput: The binary stream protocol is optimized for bulk message transfer, with less overhead than AMQP frame-level encoding.
- No consumer coordination: Stream consumers are fully independent. Running a backup does not affect production consumers.
- Batch delivery: Messages are delivered in batches rather than individual frames, reducing per-message overhead.
Enabling Stream Backup
To back up stream queues, add these settings to your configuration:
source:
amqp_url: "amqp://guest:guest@localhost:5672/%2f"
management_url: "http://localhost:15672"
management_username: guest
management_password: guest
stream_port: 5552 # Stream protocol port
queues:
include:
- "events-*"
types:
- stream # Only stream queues
backup:
stream_enabled: true # Enable stream protocol
compression: zstd
max_concurrent_queues: 4
Required Configuration
| Field | Description |
|---|---|
source.stream_port | The RabbitMQ Stream Protocol port (default: 5552). |
backup.stream_enabled | Must be true to use the stream protocol for stream-type queues. |
source.queues.types | Include stream to select stream queues. If omitted, all queue types are selected. |
Prerequisites
- RabbitMQ 3.9 or later
- The
rabbitmq_streamplugin must be enabled:rabbitmq-plugins enable rabbitmq_stream - Stream protocol port (default 5552) must be accessible
How Stream Backup Works
Queue Discovery
During the queue discovery phase, rabbitmq-backup queries the Management API for all queues matching the selection criteria. Queues are partitioned by type:
let (stream_queues, amqp_queues): (Vec<_>, Vec<_>) =
queues.into_iter().partition(|q| q.queue_type == "stream");
- AMQP queues (classic, quorum): Backed up using the AMQP 0-9-1 cancel strategy.
- Stream queues: Backed up using the native Stream Protocol (if
stream_enabled: true).
Connection
The StreamClient connects to the RabbitMQ stream endpoint using the rabbitmq-stream-client crate:
let client = StreamClient::connect(host, port, username, password).await?;
The host is extracted from the AMQP URL. The username and password are taken from the Management API credentials. The stream port defaults to 5552.
Consumer Creation
A consumer is created for each stream, starting from the specified offset:
let consumer = client
.create_consumer(stream_name, OffsetSpecification::First)
.await?;
Offset Specifications
The OffsetSpecification enum controls where the consumer starts reading:
| Specification | Description |
|---|---|
First | Start from the first available message in the stream. |
Last | Start from the last message (only new messages). |
Next | Start from the next message after the current tail. |
Offset(u64) | Start from a specific numeric offset. |
Timestamp(i64) | Start from the first message at or after the given timestamp. |
For initial backups, First is used to capture all available messages. For incremental backups, a stored offset can be used to resume from the last backed-up position.
Message Reading Loop
The StreamReader reads messages in a loop, converting each delivery to a BackupRecord:
Stream Consumer
↓
Delivery (message + offset)
↓
Convert to BackupRecord
↓
Add to SegmentWriter
↓ (when threshold reached)
Finalize segment → Upload to storage → Update manifest
The loop exits when:
- A shutdown signal is received (Ctrl+C)
- A read timeout occurs (no new messages for 10 seconds)
- The consumer stream ends
- A delivery error occurs
Message Conversion
Stream messages use AMQP 1.0 message encoding (different from AMQP 0-9-1). The StreamReader converts stream deliveries to BackupRecord:
fn delivery_to_record(delivery: &Delivery, stream_name: &str, vhost: &str) -> BackupRecord {
let msg = delivery.message();
BackupRecord {
body: msg.data().map(|data| data.to_vec()),
properties: extract_properties(msg.properties()),
headers: extract_application_properties(msg.application_properties()),
exchange: String::new(), // Streams don't have exchange routing
routing_key: String::new(), // Streams don't have routing keys
delivery_tag: delivery.offset(), // Use stream offset as delivery tag
redelivered: false, // Streams never mark redelivered
backed_up_at: chrono::Utc::now().timestamp_millis(),
source_queue: stream_name.to_string(),
source_vhost: vhost.to_string(),
}
}
Key differences from AMQP 0-9-1 records:
| Field | AMQP 0-9-1 | Stream Protocol |
|---|---|---|
exchange | Original exchange | Empty string (streams don't route via exchanges) |
routing_key | Original routing key | Empty string |
delivery_tag | Broker-assigned delivery tag | Stream offset (monotonically increasing u64) |
redelivered | May be true after backup | Always false |
headers | AMQP 0-9-1 headers (FieldTable) | AMQP 1.0 application properties |
properties | AMQP 0-9-1 BasicProperties | AMQP 1.0 Properties (subset mapped) |
Property Mapping
AMQP 1.0 properties (used by streams) are mapped to the BackupProperties format:
| AMQP 1.0 Property | BackupProperties Field |
|---|---|
content_type | content_type |
content_encoding | content_encoding |
message_id | message_id |
correlation_id | correlation_id |
reply_to | reply_to |
Properties that exist in AMQP 0-9-1 but not in AMQP 1.0 (e.g., delivery_mode, priority, expiration) are null for stream records.
Storage Layout
Stream queue backups use the same storage layout as AMQP queues:
{backup_id}/queues/{vhost}/{stream_name}/
├── segment-0001.zst
├── segment-0002.zst
└── segment-NNNN.zst
The manifest records the queue type as stream:
{
"vhost": "/",
"name": "events-stream",
"queue_type": "stream",
"segments": [...],
"message_count": 10000
}
The same RBAK segment format is used for both AMQP and stream messages.
Restoring Stream Backups
Stream backups are restored using the same AMQP 0-9-1 publish path as classic/quorum queue backups. Since stream records have empty exchange and routing_key fields, you should use direct-to-queue publish mode:
restore:
publish_mode: direct-to-queue
This publishes messages to the queue via the default exchange, using the queue name as the routing key.
Restoring to a stream queue via AMQP publish appends new messages to the stream. It does not restore the original stream offsets. The restored messages will have new offsets assigned by the broker.
Offset Tracking and Resumable Backups
The delivery_tag field in BackupRecord stores the stream offset. This enables resumable backups:
- After a backup completes, record the last offset from the final segment's last record.
- On the next backup, start from
OffsetSpecification::Offset(last_offset + 1). - Only new messages (appended since the last backup) are captured.
The offset is persisted in the SQLite checkpoint database (offset_storage), allowing backup resumption even after process restarts.
Mixed Queue Types
rabbitmq-backup supports backing up both AMQP and stream queues in a single backup operation:
source:
queues:
include:
- "*"
types:
- classic
- quorum
- stream
backup:
stream_enabled: true
The backup engine automatically partitions discovered queues:
- Classic and quorum queues are backed up via AMQP 0-9-1 (cancel strategy).
- Stream queues are backed up via the Stream Protocol.
Both types share the same concurrency semaphore (max_concurrent_queues), segment format, storage layout, and manifest.
Comparison: AMQP vs. Stream Backup
| Aspect | AMQP Backup (Classic/Quorum) | Stream Backup |
|---|---|---|
| Protocol | AMQP 0-9-1 (port 5672) | Stream Protocol (port 5552) |
| Non-destructive mechanism | Consumer cancel (requeue unacked) | Offset-based read (inherently non-destructive) |
| Impact on queue | Sets redelivered flag, increments x-delivery-count (quorum) | No impact |
| Impact on other consumers | Competes for messages during backup window | Zero impact |
| Resumability | Checkpoint-based | Offset-based (precise) |
| Throughput | High (push-based with cancel) | Very high (batch delivery, less overhead) |
| Message properties | Full AMQP 0-9-1 properties | Subset of AMQP 1.0 properties |
| Exchange/routing info | Preserved | Not available (streams don't route) |
Dependencies
Stream protocol support depends on:
| Crate | Version | Purpose |
|---|---|---|
rabbitmq-stream-client | 0.5.x | Official Rust client for the RabbitMQ Stream Protocol. Handles connection, authentication, consumer creation, and message delivery. |
The rabbitmq-stream-client crate uses its own binary protocol (not AMQP) to communicate with the RabbitMQ stream plugin over TCP port 5552.
Troubleshooting
Stream Plugin Not Enabled
Stream protocol error: Failed to connect: connection refused
Enable the stream plugin:
rabbitmq-plugins enable rabbitmq_stream
Verify it is running:
rabbitmq-plugins list | grep stream
# [E*] rabbitmq_stream 3.13.2
# [E*] rabbitmq_stream_common 3.13.2
Wrong Port
If using Docker, ensure the stream port is mapped:
docker run -p 5672:5672 -p 15672:15672 -p 5552:5552 rabbitmq:3-management
Stream Does Not Exist
Stream protocol error: Failed to create consumer for my-stream: StreamDoesNotExist
Verify the stream exists and is of type stream:
rabbitmqctl list_queues name type | grep stream
Create a stream queue:
rabbitmqadmin declare queue name=my-stream queue_type=stream
Authentication Failure
Stream protocol error: Failed to connect: StreamError(Authentication)
The stream protocol uses the same user credentials as AMQP. Verify the user exists and has permissions:
rabbitmqctl authenticate_user guest guest
rabbitmqctl list_permissions -p /