Working with Queues in Stelvio
Stelvio supports creating and managing Amazon SQS (Simple Queue Service) queues using the Queue component. This allows you to build decoupled, event-driven architectures with reliable message delivery.
Creating a Queue
Create a queue by instantiating the Queue component in your stlv_app.py:
from stelvio.aws.queue import Queue
from stelvio.aws.function import Function
@app.run
def run() -> None:
# Create a standard queue
orders_queue = Queue("orders")
# Link it to a function
order_processor = Function(
"process-orders",
handler="functions/orders.handler",
links=[orders_queue],
)
Queue Configuration
Configure your queue with custom settings:
from stelvio.aws.queue import Queue, QueueConfig
# Using keyword arguments
orders_queue = Queue(
"orders",
delay=5, # Delay delivery by 5 seconds
visibility_timeout=60, # Message hidden for 60 seconds after read
)
# Or using QueueConfig
orders_queue = Queue(
"orders",
config=QueueConfig(
delay=5,
visibility_timeout=60,
)
)
Configuration Options
| Option | Default | Description |
|---|---|---|
fifo |
False |
Enable FIFO (First-In-First-Out) queue ordering |
delay |
0 |
Default delay (in seconds) before messages become visible |
visibility_timeout |
30 |
Time (in seconds) a message is hidden after being read |
retention |
345600 |
Message retention period in seconds (default: 4 days) |
dlq |
None |
Dead-letter queue configuration |
FIFO Queues
FIFO queues guarantee exactly-once processing and preserve message order:
When you create a FIFO queue, Stelvio automatically:
- Adds the
.fifosuffix to the queue name (required by AWS) - Enables content-based deduplication
FIFO Queue Naming
AWS requires FIFO queue names to end with .fifo. Stelvio handles this automatically when you set fifo=True.
FIFO Throughput
FIFO queues have lower throughput than standard queues (300 messages/second without batching, 3,000 with high-throughput mode). Use standard queues when message order isn't critical.
Dead-Letter Queues
Configure a dead-letter queue (DLQ) to capture messages that fail processing:
from stelvio.aws.queue import Queue, DlqConfig
# First, create the DLQ
orders_dlq = Queue("orders-dlq")
# Reference DLQ by Queue component
orders_queue = Queue("orders", dlq=DlqConfig(queue=orders_dlq))
# Custom retry count (default is 3)
orders_queue = Queue(
"orders",
dlq=DlqConfig(queue=orders_dlq, retry=5)
)
# Using dictionary syntax
orders_queue = Queue(
"orders",
dlq={"queue": orders_dlq, "retry": 5}
)
DLQ Best Practices
- Always configure a DLQ for production queues to capture failed messages
- Set up alerts on your DLQ to detect processing failures
- Choose retry counts based on your use case (typically 3-5 retries)
Queue Subscriptions
Subscribe Lambda functions to process messages from your queue:
orders_queue = Queue("orders")
# Simple subscription
orders_queue.subscribe("processor", "functions/orders.process")
# Multiple subscriptions with different names
orders_queue.subscribe("analytics", "functions/analytics.track_order")
Each subscription creates a separate Lambda function, so you can subscribe the same handler multiple times with different configurations:
orders_queue = Queue("orders")
# Same handler, different batch sizes for different throughput needs
orders_queue.subscribe("fast-processor", "functions/orders.process", batch_size=1)
orders_queue.subscribe("batch-processor", "functions/orders.process", batch_size=100)
Lambda Configuration
Customize the Lambda function for your subscription:
# With direct options
orders_queue.subscribe(
"processor",
"functions/orders.process",
memory=512,
timeout=60,
)
# With FunctionConfig
from stelvio.aws.function import FunctionConfig
orders_queue.subscribe(
"processor",
FunctionConfig(
handler="functions/orders.process",
memory=512,
timeout=60,
)
)
# With dictionary
orders_queue.subscribe(
"processor",
{"handler": "functions/orders.process", "memory": 256}
)
Batch Size
Control how many messages Lambda receives per invocation:
orders_queue.subscribe(
"batch-processor",
"functions/orders.process",
batch_size=5, # Process 5 messages at a time (default: 10)
)
Choosing Batch Size
- Smaller batches (1-5): Lower latency, faster processing of individual messages
- Larger batches (10+): Higher throughput, more efficient for high-volume queues
- Consider your Lambda timeout when choosing batch size
Subscription Permissions
Stelvio automatically configures the necessary IAM permissions for queue subscriptions:
- EventSourceMapping: Connects the SQS queue to your Lambda function
- SQS IAM permissions: Grants read access (
sqs:ReceiveMessage,sqs:DeleteMessage,sqs:GetQueueAttributes)
Sending Messages
Use the linking mechanism to send messages to your queue from Lambda functions:
import boto3
import json
from stlv_resources import Resources
def handler(event, context):
sqs = boto3.client('sqs')
# Access the linked queue URL
queue_url = Resources.orders.queue_url
# Send a message
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"order_id": "12345",
"customer": "john@example.com",
"items": [{"sku": "WIDGET-001", "qty": 2}]
})
)
return {"statusCode": 200, "body": "Message sent!"}
Sending to FIFO Queues
FIFO queues require additional parameters when sending messages:
import boto3
import json
from stlv_resources import Resources
def handler(event, context):
sqs = boto3.client('sqs')
queue_url = Resources.orders.queue_url
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"order_id": "12345"}),
# Required for FIFO queues - messages with same group ID are processed in order
MessageGroupId="order-processing",
# Optional if content-based deduplication is enabled (Stelvio enables this by default)
# MessageDeduplicationId="unique-id-12345",
)
return {"statusCode": 200, "body": "Message sent!"}
FIFO Message Parameters
- MessageGroupId (required): Messages with the same group ID are processed in order. Use different group IDs for messages that can be processed in parallel.
- MessageDeduplicationId (optional): When content-based deduplication is enabled (default in Stelvio), SQS uses a hash of the message body. Provide this explicitly if you need custom deduplication logic.
Link Properties
When you link a queue to a Lambda function, these properties are available:
| Property | Description |
|---|---|
queue_url |
The queue URL for sending messages |
queue_arn |
The queue ARN |
queue_name |
The queue name |
Link Permissions
Linked Lambda functions receive these SQS permissions for sending messages:
sqs:SendMessage- Send messages to the queuesqs:GetQueueAttributes- Read queue metadata
Receiving Messages
For processing messages from a queue, use queue.subscribe() instead of linking.
Subscriptions automatically configure the necessary permissions (sqs:ReceiveMessage,
sqs:DeleteMessage, sqs:GetQueueAttributes) for the Lambda event source mapping.
Processing Messages
Your Lambda function receives SQS events with batched messages:
import json
def process(event, context):
"""Process SQS messages."""
for record in event.get('Records', []):
# Parse the message body
body = json.loads(record['body'])
order_id = body.get('order_id')
customer = body.get('customer')
print(f"Processing order {order_id} for {customer}")
# Process the order...
# Return success - SQS will delete processed messages
return {"statusCode": 200}
Error Handling
- If your Lambda raises an exception, SQS will retry the message after the visibility timeout
- Successfully processed messages are automatically deleted
- Failed messages eventually move to the DLQ (if configured)
Linking vs Subscriptions
| Use Case | Approach |
|---|---|
| Process queue messages | Use queue.subscribe() - creates Lambda triggered by queue |
| Send messages to queue | Use links=[queue] - grants permissions to send messages |
| Pipeline (read → write) | Subscribe to one queue, link to another for forwarding |
# Example: Process orders, send to fulfillment queue
orders_queue = Queue("orders")
fulfillment_queue = Queue("fulfillment")
# Subscribe to process incoming orders
orders_queue.subscribe(
"process-orders",
"functions/orders.process",
links=[fulfillment_queue], # Grant permission to send to fulfillment
)
Next Steps
Now that you understand SQS queues, you might want to explore:
- Working with Lambda Functions - Learn more about Lambda configuration
- Working with DynamoDB - Store processed message data
- Linking - Understand how Stelvio automates IAM permissions