Producer-Consumer Pattern
A fundamental pattern where producers create work items and consumers process them independently.
1The Factory Assembly Line
Neither department waits for the other. The conveyor belt (queue) buffers the difference in their speeds.
The Producer-Consumer Pattern decouples the creation of work (producing) from its execution (consuming). A shared buffer (queue) holds work items, allowing producers and consumers to operate at different rates.
2Pattern Components
Create messages
Holds messages
Process messages
- • Create work items/messages
- • Push to queue
- • Handle backpressure
- • Don't wait for processing
- • Store messages safely
- • FIFO ordering (usually)
- • Handle concurrent access
- • Persist until consumed
- • Pull from queue
- • Process messages
- • Acknowledge completion
- • Handle failures/retries
3Scaling Patterns
Single Producer, Single Consumer
Simplest form. Good for low throughput.
Single Producer, Multiple Consumers
Scale processing by adding consumers. Each message processed once.
Multiple Producers, Multiple Consumers
Full scale. Many services produce, many workers consume.
Add consumers when queue depth grows. Remove when queue stays empty. Auto-scaling based on queue length is common.
4Implementation Considerations
FIFO queues guarantee order but limit throughput. Standard queues are faster but may reorder. Use message timestamps if order matters.
Most queues provide at-least-once (may duplicate on retry). Design consumers to be idempotent—processing twice should be safe.
Producer should handle: retry with backoff, drop messages, or block. Don't overwhelm the queue.
After N retries, move to Dead Letter Queue. Don't let one bad message block the entire queue.
5Code Example
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
async function produce(message) {
await sqs.sendMessage({
QueueUrl: process.env.QUEUE_URL,
MessageBody: JSON.stringify(message),
MessageAttributes: {
'Type': { DataType: 'String', StringValue: 'order' }
}
}).promise();
console.log('Message sent:', message.id);
}async function consume() {
while (true) {
const result = await sqs.receiveMessage({
QueueUrl: process.env.QUEUE_URL,
WaitTimeSeconds: 20, // Long polling
MaxNumberOfMessages: 10
}).promise();
for (const msg of result.Messages || []) {
try {
await processMessage(JSON.parse(msg.Body));
await sqs.deleteMessage({
QueueUrl: process.env.QUEUE_URL,
ReceiptHandle: msg.ReceiptHandle
}).promise();
} catch (err) {
console.error('Processing failed:', err);
// Message will be retried after visibility timeout
}
}
}
}