Skip to main content

Architecture Overview

rabbitmq-backup is a Rust-based CLI tool that non-destructively backs up and restores RabbitMQ messages and definitions to pluggable cloud storage. This page describes the high-level architecture, component relationships, and data flow.

Design Principles

  1. Non-destructive -- Messages remain in queues after backup. No broker shutdown, no filesystem access, no message loss.
  2. Deployment-agnostic -- Connects as an external AMQP client. Works with any RabbitMQ deployment (bare metal, Docker, Kubernetes, managed services).
  3. Multi-cloud storage -- Pluggable storage backends via the object_store crate. S3, Azure Blob, GCS, and local filesystem are supported identically.
  4. Resumable -- SQLite checkpoints enable interrupted backups to resume from the last known offset.
  5. Point-in-Time Recovery -- Every message is timestamped at capture time (backed_up_at), enabling time-window filtering during restore.

Design Heritage

This project follows the architecture established by osodevops/kafka-backup, adapting the segment-based storage format and metrics patterns from Kafka to the AMQP protocol.


Component Diagram

┌─────────────────────────────────────────────────────────────────────┐
│ rabbitmq-backup CLI │
│ (clap 4.x parser) │
├──────────┬──────────┬──────────┬──────────┬─────────┬───────────────┤
│ backup │ restore │ list │ describe │validate │ definitions- │
│ │ │ │ │ │ export/import │
└─────┬────┴─────┬────┴──────────┴──────────┴─────────┴───────┬───────┘
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌────────────────┐
│ Backup │ │ Restore │ │ Definitions │
│ Engine │ │ Engine │ │ Exporter / │
│ │ │ │ │ Importer │
└─┬───┬───┬─┘ └──┬───┬───┘ └───────┬────────┘
│ │ │ │ │ │
│ │ │ │ │ │
▼ │ ▼ ▼ │ ▼
┌─────┤ ┌──────┐┌────┤ ┌───────────────┐
│Queue│ │Stream││Pub-│ │ Management │
│Read-│ │Read- ││lish│ │ Client │
│er │ │er ││er │ │ (reqwest) │
└──┬──┘ └──┬───┘└──┬─┘ └───────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────┐ ┌───────────────────┐ ┌─────────────────┐
│ AMQP 0-9-1 │ │ Stream Protocol │ │ Management HTTP │
│ Client │ │ Client │ │ API │
│(amq-proto- │ │(rabbitmq-stream- │ │ (port 15672) │
│ col 10.x) │ │ client 0.5.x) │ └─────────────────┘
└──────┬─────┘ └────────┬──────────┘
│ │
▼ ▼
┌──────────────────────────────────┐
│ RabbitMQ Broker │
│ AMQP 0-9-1 (5672) │
│ Stream Protocol (5552) │
│ Management API (15672) │
└──────────────────────────────────┘

┌──────────────┐
│ Segment │
│ Writer / │
│ Reader │
└──────┬───────┘


┌─────────────────────────┐
│ Storage Abstraction │
│ (object_store 0.11) │
├────┬────┬────┬──────────┤
│ S3 │Azr │GCS │Filesystem│
└────┴────┴────┴──────────┘

Crate Structure

The project is organized as a Cargo workspace with two crates:

rabbitmq-backup-core

The core library containing all business logic. It has no CLI dependencies and can be embedded in other Rust applications.

ModuleResponsibility
backup/engine.rsBackup orchestrator: queue discovery, parallel backup, manifest generation.
backup/queue_reader.rsAMQP 0-9-1 message reader with frame-level message assembly (Deliver + Header + Body).
backup/stream_reader.rsRabbitMQ Stream Protocol reader for x-queue-type: stream queues.
restore/engine.rsRestore orchestrator: manifest loading, PITR filtering, parallel publish.
restore/publisher.rsAMQP message publisher with publisher confirms.
amqp/client.rsLow-level AMQP 0-9-1 client: connection, channel, consume, cancel, QoS.
amqp/channel_pool.rsChannel pool for managing multiple AMQP channels.
amqp/tls.rsTLS configuration for AMQP connections (rustls).
stream/client.rsStream Protocol client wrapper around rabbitmq-stream-client.
definitions/management.rsRabbitMQ Management API HTTP client.
definitions/exporter.rsDefinitions export logic.
definitions/importer.rsDefinitions import logic.
definitions/types.rsType definitions for Management API responses.
segment/writer.rsRBAK segment writer: batching, compression, header/footer generation.
segment/reader.rsRBAK segment reader: decompression, CRC32 verification, record parsing.
storage/config.rsStorage backend configuration (tagged enum for S3/Azure/GCS/filesystem/memory).
storage/backend.rsStorage backend trait and implementations.
manifest.rsBackup manifest, queue metadata, segment metadata, and BackupRecord types.
config.rsTop-level configuration structures (serde YAML).
compression.rszstd and LZ4 compression/decompression utilities.
offset_store/SQLite checkpoint persistence for resumable backups.
metrics/Prometheus metrics registry, labels, and HTTP server.
circuit_breaker.rsTransient failure handling with exponential backoff.
health.rsHealth check types.
error.rsStructured error type with variants for each subsystem.

rabbitmq-backup-cli

The binary crate providing the CLI interface. It depends on rabbitmq-backup-core and clap for argument parsing.

ModuleResponsibility
main.rsCLI entry point, subcommand dispatch, logging setup.
commands/backup.rsbackup subcommand handler.
commands/restore.rsrestore subcommand handler.
commands/list.rslist subcommand handler.
commands/describe.rsdescribe subcommand handler.
commands/validate.rsvalidate subcommand handler.
commands/definitions_export.rsdefinitions-export subcommand handler.
commands/definitions_import.rsdefinitions-import subcommand handler.

Data Flow: Backup

RabbitMQ Broker                 rabbitmq-backup                     Object Storage
│ │ │
│ 1. GET /api/overview │ │
│◄──────────────────────────────│ │
│ cluster info │ │
│──────────────────────────────►│ │
│ │ │
│ 2. GET /api/definitions │ │
│◄──────────────────────────────│ │
│ definitions JSON │ │
│──────────────────────────────►│ 3. PUT definitions.json.zst │
│ │───────────────────────────────────►│
│ │ │
│ 4. GET /api/queues │ │
│◄──────────────────────────────│ │
│ queue list │ │
│──────────────────────────────►│ │
│ │ │
│ 5. basic.consume (per queue) │ │
│◄──────────────────────────────│ │
│ │ │
│ 6. Deliver + Header + Body │ │
│──────────────────────────────►│ 7. Add record to SegmentWriter │
│ ... (repeat for N msgs) │ ... (batch until threshold) │
│ │ │
│ │ 8. Finalize segment │
│ │ Compress + CRC32 + SHA-256 │
│ │───────────────────────────────────►│
│ │ PUT segment-NNNN.zst │
│ │ │
│ 9. basic.cancel │ │
│◄──────────────────────────────│ │
│ (all unacked msgs requeue) │ │
│ │ │
│ │ 10. PUT manifest.json │
│ │───────────────────────────────────►│

Step-by-Step

  1. Cluster info: Fetch the cluster name and RabbitMQ version from GET /api/overview.
  2. Definitions export: Fetch the full definitions from GET /api/definitions, compress with zstd, and store.
  3. Queue discovery: Fetch queue list from GET /api/queues, apply include/exclude filters, partition into AMQP and stream queues.
  4. Parallel queue backup: For each queue (up to max_concurrent_queues in parallel):
    • Open an AMQP connection and channel.
    • Issue basic.consume with no_ack=false.
    • Receive Deliver + ContentHeader + ContentBody frames.
    • Assemble complete messages in the MessageAssembler state machine.
    • Add records to the SegmentWriter buffer.
    • When the buffer reaches segment_max_bytes or segment_max_interval_ms, finalize the segment (compress, add header/footer), upload to storage, and record metadata in the manifest.
  5. Cancel consumer: Issue basic.cancel. The broker requeues all unacknowledged messages.
  6. Finalize manifest: Write manifest.json to storage with totals and checksums.

Data Flow: Restore

Object Storage                  rabbitmq-backup                     RabbitMQ Broker
│ │ │
│ 1. GET manifest.json │ │
│◄──────────────────────────────│ │
│ │ │
│ 2. GET definitions.json.zst │ │
│◄──────────────────────────────│ │
│ │ 3. POST /api/definitions │
│ │───────────────────────────────────►│
│ │ │
│ 4. GET segment-NNNN.zst │ │
│◄──────────────────────────────│ │
│ │ 5. Verify CRC32 + decompress │
│ │ 6. Apply PITR filter │
│ │ 7. basic.publish (per message) │
│ │───────────────────────────────────►│
│ │ 8. publisher confirm │
│ │◄───────────────────────────────────│
│ ... (repeat for all segs) │ │

Step-by-Step

  1. Load manifest: Read manifest.json from storage.
  2. Restore definitions: Download and decompress the definitions file, POST to the target Management API.
  3. Parallel queue restore: For each queue in the manifest (up to max_concurrent_queues):
    • Download each segment file.
    • Verify CRC32 integrity.
    • Decompress and parse the length-prefixed JSON records.
    • Apply PITR filtering: include only records where backed_up_at falls within [time_window_start, time_window_end].
    • Apply queue/vhost/exchange remapping.
    • Publish messages to the target broker in batches, with optional rate limiting.
    • Wait for publisher confirms.

Technology Choices

TechnologyCrateVersionPurpose
AMQP 0-9-1 codecamq-protocol10.xWire-level protocol encoding/decoding. Provides frame types, method types, and property accessors without imposing a specific connection model.
Stream Protocolrabbitmq-stream-client0.5.xNative RabbitMQ Stream Protocol client for offset-based non-destructive reads from stream queues.
HTTP clientreqwest0.12Management API HTTP calls (definitions export/import, queue discovery, cluster overview). Uses rustls for TLS.
Object storageobject_store0.11Unified abstraction over AWS S3, Azure Blob Storage, Google Cloud Storage, and local filesystem. Handles authentication, retry, and streaming.
Checkpointsqlx0.8SQLite persistence for offset/checkpoint data, enabling resumable backups.
CLIclap4.xCommand-line argument parsing with derive macros.
Async runtimetokio1.xMulti-threaded async runtime. Used for concurrent queue backup/restore, I/O, and timers.
Serializationserde + serde_yaml + serde_json1.x / 0.9 / 1.xConfiguration (YAML), manifest and record (JSON), and storage format serialization.
Compressionzstd + lz4_flex0.13 / 0.11Segment payload compression. zstd is the default (good ratio at moderate CPU). LZ4 for maximum speed.
Integritycrc32fast + sha21.4 / 0.10CRC32 for segment footer checksums. SHA-256 for manifest-level checksums.
Metricsprometheus-client0.24Prometheus metrics collection with the prometheus-client crate (not the deprecated prometheus crate). Uses Family<Labels, Counter/Gauge> pattern.
HTTP serverhyper1.xLightweight HTTP server for the metrics /metrics and /health endpoints.
Loggingtracing + tracing-subscriber0.1 / 0.3Structured logging with environment filter support.
Error handlingthiserror2.xDerive macros for the structured Error enum.
TLStokio-rustls + rustls0.26 / 0.23TLS support for AMQP connections. Supports CA certificates and mutual TLS (mTLS).

Concurrency Model

rabbitmq-backup uses Tokio for async concurrency. The key concurrency patterns are:

  1. Semaphore-based parallelism: A tokio::sync::Semaphore limits concurrent queue backups/restores to max_concurrent_queues. Each queue runs as an independent Tokio task.

  2. Shared manifest: The BackupManifest is wrapped in Arc<Mutex<BackupManifest>> and shared across all queue backup tasks. Each task locks the manifest briefly to add segment metadata.

  3. Shutdown coordination: A tokio::sync::broadcast channel propagates shutdown signals (Ctrl+C) to all concurrent tasks, enabling graceful shutdown with segment flush.

  4. Per-queue connections: Each queue backup/restore task opens its own AMQP connection and channel, avoiding contention on a shared connection.


Error Handling Strategy

The crate uses a central Error enum with variants for each subsystem (AMQP, Stream, Storage, Config, etc.). The thiserror derive macro generates Display and Error implementations.

Key design decisions:

  • Non-fatal queue errors: If a single queue fails during backup, the error is logged but other queues continue. The final manifest reports partial results.
  • Non-fatal definitions errors: If definitions export/import fails, the backup/restore continues with a warning.
  • Fatal config errors: Missing required configuration causes an immediate exit.
  • Circuit breaker: The circuit_breaker.rs module provides transient failure handling with exponential backoff for storage and connection operations.