Skip to main content

Stream Protocol Support

rabbitmq-backup provides native support for RabbitMQ Streams (queues declared with x-queue-type: stream) via the RabbitMQ Stream Protocol. This page explains how stream backup differs from AMQP backup, the offset-based consumption model, and integration details.

What Are RabbitMQ Streams?

RabbitMQ Streams, introduced in RabbitMQ 3.9, are a queue type designed for high-throughput, append-only, log-like workloads. Unlike classic and quorum queues, streams have fundamentally different characteristics:

PropertyClassic/Quorum QueueStream Queue
ModelCompeting consumers (messages deleted on ack)Append-only log (messages retained)
ConsumptionDestructive (ack removes message)Non-destructive (offset-based read)
Concurrent consumersCompeting (each message delivered to one consumer)Independent (each consumer reads from any offset)
OrderingPer-consumer FIFOGlobal FIFO
RetentionUntil acknowledgedTime-based or size-based policy
ProtocolAMQP 0-9-1RabbitMQ Stream Protocol (binary, port 5552)
TTLPer-messagePer-stream policy
PrioritySupportedNot supported

Streams are ideal for event sourcing, audit logs, CDC (Change Data Capture), and any workload where messages need to be replayed or read by multiple independent consumers.


Why Use the Stream Protocol for Backup?

While streams can be consumed via AMQP 0-9-1 (using basic.consume with x-stream-offset argument), the native Stream Protocol offers significant advantages for backup:

  1. Inherently non-destructive: Stream reads never modify or remove messages. No redelivered flag, no x-delivery-count, no consumer impact.
  2. Offset-based positioning: Start reading from any offset (first, last, specific offset, or timestamp). Enables resumable backups.
  3. Higher throughput: The binary stream protocol is optimized for bulk message transfer, with less overhead than AMQP frame-level encoding.
  4. No consumer coordination: Stream consumers are fully independent. Running a backup does not affect production consumers.
  5. Batch delivery: Messages are delivered in batches rather than individual frames, reducing per-message overhead.

Enabling Stream Backup

To back up stream queues, add these settings to your configuration:

source:
amqp_url: "amqp://guest:guest@localhost:5672/%2f"
management_url: "http://localhost:15672"
management_username: guest
management_password: guest
stream_port: 5552 # Stream protocol port

queues:
include:
- "events-*"
types:
- stream # Only stream queues

backup:
stream_enabled: true # Enable stream protocol
compression: zstd
max_concurrent_queues: 4

Required Configuration

FieldDescription
source.stream_portThe RabbitMQ Stream Protocol port (default: 5552).
backup.stream_enabledMust be true to use the stream protocol for stream-type queues.
source.queues.typesInclude stream to select stream queues. If omitted, all queue types are selected.

Prerequisites

  • RabbitMQ 3.9 or later
  • The rabbitmq_stream plugin must be enabled: rabbitmq-plugins enable rabbitmq_stream
  • Stream protocol port (default 5552) must be accessible

How Stream Backup Works

Queue Discovery

During the queue discovery phase, rabbitmq-backup queries the Management API for all queues matching the selection criteria. Queues are partitioned by type:

let (stream_queues, amqp_queues): (Vec<_>, Vec<_>) =
queues.into_iter().partition(|q| q.queue_type == "stream");
  • AMQP queues (classic, quorum): Backed up using the AMQP 0-9-1 cancel strategy.
  • Stream queues: Backed up using the native Stream Protocol (if stream_enabled: true).

Connection

The StreamClient connects to the RabbitMQ stream endpoint using the rabbitmq-stream-client crate:

let client = StreamClient::connect(host, port, username, password).await?;

The host is extracted from the AMQP URL. The username and password are taken from the Management API credentials. The stream port defaults to 5552.

Consumer Creation

A consumer is created for each stream, starting from the specified offset:

let consumer = client
.create_consumer(stream_name, OffsetSpecification::First)
.await?;

Offset Specifications

The OffsetSpecification enum controls where the consumer starts reading:

SpecificationDescription
FirstStart from the first available message in the stream.
LastStart from the last message (only new messages).
NextStart from the next message after the current tail.
Offset(u64)Start from a specific numeric offset.
Timestamp(i64)Start from the first message at or after the given timestamp.

For initial backups, First is used to capture all available messages. For incremental backups, a stored offset can be used to resume from the last backed-up position.

Message Reading Loop

The StreamReader reads messages in a loop, converting each delivery to a BackupRecord:

Stream Consumer

Delivery (message + offset)

Convert to BackupRecord

Add to SegmentWriter
↓ (when threshold reached)
Finalize segment → Upload to storage → Update manifest

The loop exits when:

  • A shutdown signal is received (Ctrl+C)
  • A read timeout occurs (no new messages for 10 seconds)
  • The consumer stream ends
  • A delivery error occurs

Message Conversion

Stream messages use AMQP 1.0 message encoding (different from AMQP 0-9-1). The StreamReader converts stream deliveries to BackupRecord:

fn delivery_to_record(delivery: &Delivery, stream_name: &str, vhost: &str) -> BackupRecord {
let msg = delivery.message();
BackupRecord {
body: msg.data().map(|data| data.to_vec()),
properties: extract_properties(msg.properties()),
headers: extract_application_properties(msg.application_properties()),
exchange: String::new(), // Streams don't have exchange routing
routing_key: String::new(), // Streams don't have routing keys
delivery_tag: delivery.offset(), // Use stream offset as delivery tag
redelivered: false, // Streams never mark redelivered
backed_up_at: chrono::Utc::now().timestamp_millis(),
source_queue: stream_name.to_string(),
source_vhost: vhost.to_string(),
}
}

Key differences from AMQP 0-9-1 records:

FieldAMQP 0-9-1Stream Protocol
exchangeOriginal exchangeEmpty string (streams don't route via exchanges)
routing_keyOriginal routing keyEmpty string
delivery_tagBroker-assigned delivery tagStream offset (monotonically increasing u64)
redeliveredMay be true after backupAlways false
headersAMQP 0-9-1 headers (FieldTable)AMQP 1.0 application properties
propertiesAMQP 0-9-1 BasicPropertiesAMQP 1.0 Properties (subset mapped)

Property Mapping

AMQP 1.0 properties (used by streams) are mapped to the BackupProperties format:

AMQP 1.0 PropertyBackupProperties Field
content_typecontent_type
content_encodingcontent_encoding
message_idmessage_id
correlation_idcorrelation_id
reply_toreply_to

Properties that exist in AMQP 0-9-1 but not in AMQP 1.0 (e.g., delivery_mode, priority, expiration) are null for stream records.


Storage Layout

Stream queue backups use the same storage layout as AMQP queues:

{backup_id}/queues/{vhost}/{stream_name}/
├── segment-0001.zst
├── segment-0002.zst
└── segment-NNNN.zst

The manifest records the queue type as stream:

{
"vhost": "/",
"name": "events-stream",
"queue_type": "stream",
"segments": [...],
"message_count": 10000
}

The same RBAK segment format is used for both AMQP and stream messages.


Restoring Stream Backups

Stream backups are restored using the same AMQP 0-9-1 publish path as classic/quorum queue backups. Since stream records have empty exchange and routing_key fields, you should use direct-to-queue publish mode:

restore:
publish_mode: direct-to-queue

This publishes messages to the queue via the default exchange, using the queue name as the routing key.

note

Restoring to a stream queue via AMQP publish appends new messages to the stream. It does not restore the original stream offsets. The restored messages will have new offsets assigned by the broker.


Offset Tracking and Resumable Backups

The delivery_tag field in BackupRecord stores the stream offset. This enables resumable backups:

  1. After a backup completes, record the last offset from the final segment's last record.
  2. On the next backup, start from OffsetSpecification::Offset(last_offset + 1).
  3. Only new messages (appended since the last backup) are captured.

The offset is persisted in the SQLite checkpoint database (offset_storage), allowing backup resumption even after process restarts.


Mixed Queue Types

rabbitmq-backup supports backing up both AMQP and stream queues in a single backup operation:

source:
queues:
include:
- "*"
types:
- classic
- quorum
- stream

backup:
stream_enabled: true

The backup engine automatically partitions discovered queues:

  • Classic and quorum queues are backed up via AMQP 0-9-1 (cancel strategy).
  • Stream queues are backed up via the Stream Protocol.

Both types share the same concurrency semaphore (max_concurrent_queues), segment format, storage layout, and manifest.


Comparison: AMQP vs. Stream Backup

AspectAMQP Backup (Classic/Quorum)Stream Backup
ProtocolAMQP 0-9-1 (port 5672)Stream Protocol (port 5552)
Non-destructive mechanismConsumer cancel (requeue unacked)Offset-based read (inherently non-destructive)
Impact on queueSets redelivered flag, increments x-delivery-count (quorum)No impact
Impact on other consumersCompetes for messages during backup windowZero impact
ResumabilityCheckpoint-basedOffset-based (precise)
ThroughputHigh (push-based with cancel)Very high (batch delivery, less overhead)
Message propertiesFull AMQP 0-9-1 propertiesSubset of AMQP 1.0 properties
Exchange/routing infoPreservedNot available (streams don't route)

Dependencies

Stream protocol support depends on:

CrateVersionPurpose
rabbitmq-stream-client0.5.xOfficial Rust client for the RabbitMQ Stream Protocol. Handles connection, authentication, consumer creation, and message delivery.

The rabbitmq-stream-client crate uses its own binary protocol (not AMQP) to communicate with the RabbitMQ stream plugin over TCP port 5552.


Troubleshooting

Stream Plugin Not Enabled

Stream protocol error: Failed to connect: connection refused

Enable the stream plugin:

rabbitmq-plugins enable rabbitmq_stream

Verify it is running:

rabbitmq-plugins list | grep stream
# [E*] rabbitmq_stream 3.13.2
# [E*] rabbitmq_stream_common 3.13.2

Wrong Port

If using Docker, ensure the stream port is mapped:

docker run -p 5672:5672 -p 15672:15672 -p 5552:5552 rabbitmq:3-management

Stream Does Not Exist

Stream protocol error: Failed to create consumer for my-stream: StreamDoesNotExist

Verify the stream exists and is of type stream:

rabbitmqctl list_queues name type | grep stream

Create a stream queue:

rabbitmqadmin declare queue name=my-stream queue_type=stream

Authentication Failure

Stream protocol error: Failed to connect: StreamError(Authentication)

The stream protocol uses the same user credentials as AMQP. Verify the user exists and has permissions:

rabbitmqctl authenticate_user guest guest
rabbitmqctl list_permissions -p /