A producer publishes messages to an exchange. It’s a subclass of Lepus::Producer with a configure call and whatever convenience methods you want on top.
Minimal example
class OrdersProducer < Lepus::Producer configure( exchange: { name: 'orders', type: :topic, durable: true }, publish: { persistent: true } )
use :json use :correlation_idendPublish from anywhere in your app:
OrdersProducer.publish( { order_id: 42, total: 99.99 }, routing_key: 'order.created')configure DSL
configure( exchange: { name: 'orders', type: :topic, durable: true }, publish: { persistent: true, mandatory: false })| Key | Purpose |
|---|---|
exchange | String (name only) or hash (name, type, durable, auto_delete). Auto-declared on first publish. |
publish | Default publish options merged into every publish call: persistent, mandatory, content_type, expiration, etc. |
Publishing
OrdersProducer.publish(payload, routing_key: 'order.created')OrdersProducer.publish(payload, routing_key: 'order.paid', headers: { tenant_id: 123 })OrdersProducer.publish(payload, routing_key: 'order.created', expiration: 60_000)All Bunny publish options are supported. Frequently useful:
| Option | Purpose |
|---|---|
routing_key | Routing key. |
persistent | Persist the message across RabbitMQ restarts. |
headers | Custom headers. |
correlation_id | RPC pattern correlation. Auto-set by :correlation_id middleware. |
content_type | MIME type. Auto-set to application/json by :json middleware. |
expiration | Per-message TTL in milliseconds. |
mandatory | Return the message if it can’t be routed. |
Payload types
- A Hash — pair with
:jsonmiddleware to serialize. - A String — sent as-is.
- Anything else — pair with a middleware that turns it into bytes.
Middleware
class OrdersProducer < Lepus::Producer use :json use :correlation_id use :instrumentation use :header, 'X-Service', 'orders-api'endBuilt-in producer middlewares:
| Name | Purpose |
|---|---|
:json | Serialize a hash into JSON; sets content_type. |
:correlation_id | Auto-generate a UUID correlation_id if absent. |
:header | Add a static or dynamic header: use :header, 'X-Foo', 'bar'. |
:instrumentation | Emit ActiveSupport::Notifications events around each publish. |
:unique | Reject duplicate publishes (by correlation id — needs external storage). |
Write your own — see middleware.md.
Convenience class methods
Wrap publish with domain-specific signatures:
class OrdersProducer < Lepus::Producer configure(exchange: { name: 'orders', type: :topic, durable: true }) use :json use :correlation_id
def self.order_created(order) publish( { order_id: order.id, total: order.total, created_at: order.created_at }, routing_key: 'order.created' ) end
def self.order_shipped(order) publish( { order_id: order.id, shipped_at: order.shipped_at }, routing_key: 'order.shipped' ) endend
# In your code:OrdersProducer.order_created(order)Connection pooling
Producers share a single connection pool across the process:
Lepus.configure do |config| config.producer do |p| p.pool_size = 5 p.pool_timeout = 5.0 endendEnable / disable publishing
Useful when you want to disable side effects in tests or a specific environment.
# GloballyLepus::Producers.disable!Lepus::Producers.enabled?(OrdersProducer) # => false
# By producer classLepus::Producers.disable!(OrdersProducer)
# By exchange nameLepus::Producers.disable!('orders')
# Block-scopedLepus::Producers.without_publishing do OrdersProducer.publish(...) # no-opend
Lepus::Producers.with_publishing do OrdersProducer.publish(...) # forced through even if disabledendError handling
A publish can fail for many reasons — connection down, channel closed, etc. By default, Bunny retries with exponential backoff within recovery_attempts. For anything beyond that, you catch and handle it:
begin OrdersProducer.order_created(order)rescue Bunny::Exception => e Rails.logger.error("publish failed: #{e.message}") # queue for later retry, fall back to a DB outbox, etc.endTesting
See testing.md — there’s a Lepus::Testing.producer_messages(ProducerClass) helper that captures publishes in-memory.