Quickstart
This guide walks you through publishing and consuming your first typed event using Rabbit Relay.
What you will build
A small publisher creates a typed event and a consumer handles it from RabbitMQ.
Install
npm install @bitspacerlabs/rabbit-relayDefine an event contract
export const SchedulerEvents = {
ScheduleTask: "scheduler.scheduleTask",
} as const;
export type ScheduleTaskData = {
id: string;
when: number;
};Publish and consume
import { RabbitMQBroker, event } from "@bitspacerlabs/rabbit-relay";
import { SchedulerEvents, type ScheduleTaskData } from "./events";
const broker = new RabbitMQBroker("scheduler_service");
const pub = await broker
.queue("scheduler_publish_queue")
.exchange("scheduler_exchange", {
exchangeType: "topic",
publisherConfirms: true,
});
const scheduleTask = event(
SchedulerEvents.ScheduleTask,
"v1"
).of<ScheduleTaskData>();
await pub.produce(
scheduleTask({
id: "task-1",
when: Date.now() + 5000,
})
);
await broker.close();import {
RabbitMQBroker,
type EventEnvelope,
} from "@bitspacerlabs/rabbit-relay";
import { SchedulerEvents, type ScheduleTaskData } from "./events";
const broker = new RabbitMQBroker("scheduler_worker");
const sub = await broker
.queue("scheduler_worker_queue")
.exchange<{
[SchedulerEvents.ScheduleTask]: EventEnvelope<ScheduleTaskData>;
}>("scheduler_exchange", {
exchangeType: "topic",
routingKey: "scheduler.*",
});
sub.handle(SchedulerEvents.ScheduleTask, async (_id, ev) => {
console.log("Task received:", ev.data);
});
await sub.consume({
prefetch: 10,
concurrency: 5,
});Publish with the with() API
with() converts event factories into a small typed publish API. Calling a generated method creates the event and publishes it, so it returns a Promise and should be awaited.
const api = pub.with({ scheduleTask });
await api.scheduleTask({
id: "task-2",
when: Date.now() + 10_000,
});Recommended style
Use with() when a service owns a group of events and publishes them often. Use produce() or publish() directly for one-off publishing.
Add retries and DLQ
For production consumers, prefer bounded retries with a dead-letter queue.
const sub = await broker
.queue("scheduler_worker_queue")
.exchange<{
[SchedulerEvents.ScheduleTask]: EventEnvelope<ScheduleTaskData>;
}>("scheduler_exchange", {
exchangeType: "topic",
routingKey: "scheduler.*",
deadLetter: {
exchange: "scheduler.dlx",
queue: "scheduler.dlq",
routingKey: "scheduler.dead",
autoDeclare: true,
},
});
sub.handle(SchedulerEvents.ScheduleTask, async (_id, ev) => {
console.log("Task received:", ev.data);
});
await sub.consume({
prefetch: 10,
concurrency: 5,
onError: "retry",
retry: {
attempts: 3,
delayMs: 5000,
then: "dead-letter",
},
});Queue arguments are immutable
RabbitMQ does not allow changing queue arguments after a queue already exists. If you change DLQ settings, retry delay, or queue type in local development, recreate the queue or reset the local RabbitMQ volume.
delayMs uses RabbitMQ TTL + DLX delayed retry queues. If you omit delayMs, retry remains immediate.
Typed RPC
Use request<TReply>() for request/reply flows.
type Reply = {
ok: boolean;
};
const reply = await pub.request<Reply>(
scheduleTask({
id: "task-rpc",
when: Date.now(),
}),
{
timeoutMs: 5000,
}
);
console.log(reply);Use RPC deliberately
RPC creates tighter service coupling than events. Prefer events when the workflow does not need an immediate reply.
Message metadata
Use withHeaders() when you want to attach metadata to a message.
import { withHeaders } from "@bitspacerlabs/rabbit-relay";
await pub.produce(
withHeaders(
scheduleTask({
id: "task-with-headers",
when: Date.now(),
}),
{
source: "scheduler_service",
}
)
);Use traceFrom() when creating a child event from an existing event.
import { traceFrom } from "@bitspacerlabs/rabbit-relay";
sub.handle(SchedulerEvents.ScheduleTask, async (_id, ev) => {
const trace = traceFrom(ev);
console.log("Trace metadata:", trace);
});Operations helpers
Rabbit Relay includes operations helpers for production visibility and support workflows.
broker.on("retry.scheduled", (event) => {
console.log("retry scheduled", event);
});const plan = broker.planTopology();
console.log(plan);
const validation = await broker.validateTopology();
console.log(validation);const redrive = await broker.redriveDlq({
fromQueue: "scheduler.dlq",
toExchange: "scheduler_exchange",
routingKey: SchedulerEvents.ScheduleTask,
limit: 10,
dryRun: true,
});
console.log(redrive);OpenTelemetry
Use OpenTelemetry by passing your own tracer.
import { attachOpenTelemetry } from "@bitspacerlabs/rabbit-relay";
import { trace } from "@opentelemetry/api";
attachOpenTelemetry(broker, {
tracer: trace.getTracer("rabbit-relay"),
serviceName: "scheduler-service",
});Publish with native AMQP options
Use publish() when you need per-message RabbitMQ options.
await pub.publish(scheduleTask({ id: "task-3", when: Date.now() }), {
maxMessageBytes: 64 * 1024,
amqp: {
publish: {
persistent: true,
priority: 5,
},
},
});Health checks
const health = await broker.health();
console.log(health);Graceful shutdown
process.on("SIGTERM", async () => {
await broker.close();
process.exit(0);
});Summary
- Publishers create typed event envelopes
- Consumers explicitly declare what they handle
produce()is the simplest way to publish an eventwith()creates a small typed publish APIrequest<TReply>()supports typed RPCwithHeaders()andtraceFrom()help with metadata- retry + delayed retry + DLQ gives safer production failure handling
- lifecycle hooks, topology planning, validation, and DLQ redrive help operations
- native
amqpliboptions remain available when needed