Correlation IDs
Correlation IDs
Correlation IDs link related logs across requests, services, and time boundaries. They are essential for understanding the flow of a transaction through your entire system.
What Are Correlation IDs?
A correlation ID is a unique identifier that tracks a single logical operation (like a user request) as it flows through multiple services, threads, or time periods. Instead of searching for scattered logs with common attributes, you can track a single ID.
Adding Correlation IDs
Use with_correlation_id() to associate logs with a tracking ID:
from bytehide_logs import Log
correlation_id = "req_12345_xyz"
Log.with_correlation_id(correlation_id).info("Request started")
Log.with_correlation_id(correlation_id).info("Processing request")
Log.with_correlation_id(correlation_id).info("Request completed")from bytehide_logs import Log
correlation_id = "req_12345_xyz"
Log.with_correlation_id(correlation_id).info("Request started")
Log.with_correlation_id(correlation_id).info("Processing request")
Log.with_correlation_id(correlation_id).info("Request completed")All logs with the same correlation ID can be queried together:
correlation_id:req_12345_xyzcorrelation_id:req_12345_xyzGenerating Correlation IDs
Create correlation IDs at the entry point of your application:
import uuid
from bytehide_logs import Log
def handle_http_request(request):
"""Handle incoming HTTP request with correlation ID."""
# Generate unique ID for this request
correlation_id = str(uuid.uuid4())
Log.with_correlation_id(correlation_id).info(
"Request received",
context={
"method": request.method,
"path": request.path,
"client_ip": request.remote_addr
}
)
try:
result = process_request(request, correlation_id)
Log.with_correlation_id(correlation_id).info("Request completed successfully")
return result
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Request failed",
exception=e
)
raiseimport uuid
from bytehide_logs import Log
def handle_http_request(request):
"""Handle incoming HTTP request with correlation ID."""
# Generate unique ID for this request
correlation_id = str(uuid.uuid4())
Log.with_correlation_id(correlation_id).info(
"Request received",
context={
"method": request.method,
"path": request.path,
"client_ip": request.remote_addr
}
)
try:
result = process_request(request, correlation_id)
Log.with_correlation_id(correlation_id).info("Request completed successfully")
return result
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Request failed",
exception=e
)
raiseRequest Tracking Flow
Track a request through multiple services:
from bytehide_logs import Log
def api_endpoint(request):
"""API endpoint that calls multiple services."""
correlation_id = request.headers.get("X-Correlation-ID", generate_id())
# Log receiving request
Log.with_correlation_id(correlation_id).info("API request received")
# Call authentication service
user = auth_service.authenticate(correlation_id)
Log.with_correlation_id(correlation_id).info("User authenticated")
# Call business logic service
result = business_service.process(user.id, correlation_id)
Log.with_correlation_id(correlation_id).info("Business logic completed")
# Return response
return {"data": result}
def auth_service_authenticate(correlation_id):
"""Authentication service logs with correlation ID."""
Log.with_correlation_id(correlation_id).info("Verifying credentials")
# ... authentication logic ...
Log.with_correlation_id(correlation_id).info("Authentication successful")
def business_service_process(user_id, correlation_id):
"""Business service logs with correlation ID."""
Log.with_correlation_id(correlation_id).info(f"Processing request for user {user_id}")
# ... business logic ...
Log.with_correlation_id(correlation_id).info("Processing complete")from bytehide_logs import Log
def api_endpoint(request):
"""API endpoint that calls multiple services."""
correlation_id = request.headers.get("X-Correlation-ID", generate_id())
# Log receiving request
Log.with_correlation_id(correlation_id).info("API request received")
# Call authentication service
user = auth_service.authenticate(correlation_id)
Log.with_correlation_id(correlation_id).info("User authenticated")
# Call business logic service
result = business_service.process(user.id, correlation_id)
Log.with_correlation_id(correlation_id).info("Business logic completed")
# Return response
return {"data": result}
def auth_service_authenticate(correlation_id):
"""Authentication service logs with correlation ID."""
Log.with_correlation_id(correlation_id).info("Verifying credentials")
# ... authentication logic ...
Log.with_correlation_id(correlation_id).info("Authentication successful")
def business_service_process(user_id, correlation_id):
"""Business service logs with correlation ID."""
Log.with_correlation_id(correlation_id).info(f"Processing request for user {user_id}")
# ... business logic ...
Log.with_correlation_id(correlation_id).info("Processing complete")Combining with Other Context
Use correlation IDs alongside tags, user context, and other metadata:
Log.identify(user) \
.with_tags("order", "processing") \
.with_correlation_id("req_98765") \
.with_context("order_id", "ord_42") \
.info("Order processing started")Log.identify(user) \
.with_tags("order", "processing") \
.with_correlation_id("req_98765") \
.with_context("order_id", "ord_42") \
.info("Order processing started")Correlation IDs in Microservices
Pass correlation IDs between services:
from bytehide_logs import Log
import requests
def call_downstream_service(service_url, correlation_id, data):
"""Call another service with correlation ID."""
headers = {
"X-Correlation-ID": correlation_id,
"Content-Type": "application/json"
}
Log.with_correlation_id(correlation_id).info(
f"Calling downstream service: {service_url}"
)
try:
response = requests.post(service_url, json=data, headers=headers)
Log.with_correlation_id(correlation_id).info(
"Downstream service responded",
context={"status_code": response.status_code}
)
return response.json()
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Downstream service call failed",
exception=e
)
raisefrom bytehide_logs import Log
import requests
def call_downstream_service(service_url, correlation_id, data):
"""Call another service with correlation ID."""
headers = {
"X-Correlation-ID": correlation_id,
"Content-Type": "application/json"
}
Log.with_correlation_id(correlation_id).info(
f"Calling downstream service: {service_url}"
)
try:
response = requests.post(service_url, json=data, headers=headers)
Log.with_correlation_id(correlation_id).info(
"Downstream service responded",
context={"status_code": response.status_code}
)
return response.json()
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Downstream service call failed",
exception=e
)
raiseEnsure downstream services extract and use the correlation ID:
def downstream_endpoint(request):
"""Downstream service endpoint."""
# Extract correlation ID from request header
correlation_id = request.headers.get("X-Correlation-ID")
Log.with_correlation_id(correlation_id).info("Processing request")
# Process request...
Log.with_correlation_id(correlation_id).info("Request processing complete")
return responsedef downstream_endpoint(request):
"""Downstream service endpoint."""
# Extract correlation ID from request header
correlation_id = request.headers.get("X-Correlation-ID")
Log.with_correlation_id(correlation_id).info("Processing request")
# Process request...
Log.with_correlation_id(correlation_id).info("Request processing complete")
return responseCorrelation IDs with Asynchronous Operations
Track asynchronous tasks with correlation IDs:
from bytehide_logs import Log
import asyncio
async def process_order_async(order_id, correlation_id):
"""Process order asynchronously with correlation tracking."""
Log.with_correlation_id(correlation_id).info("Starting async order processing")
try:
# Validate order
Log.with_correlation_id(correlation_id).info("Validating order")
await validate_order_async(order_id)
# Process payment
Log.with_correlation_id(correlation_id).info("Processing payment")
payment_result = await process_payment_async(order_id)
# Ship order
Log.with_correlation_id(correlation_id).info("Shipping order")
await ship_order_async(order_id)
Log.with_correlation_id(correlation_id).info("Order processing complete")
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Async order processing failed",
exception=e
)
raisefrom bytehide_logs import Log
import asyncio
async def process_order_async(order_id, correlation_id):
"""Process order asynchronously with correlation tracking."""
Log.with_correlation_id(correlation_id).info("Starting async order processing")
try:
# Validate order
Log.with_correlation_id(correlation_id).info("Validating order")
await validate_order_async(order_id)
# Process payment
Log.with_correlation_id(correlation_id).info("Processing payment")
payment_result = await process_payment_async(order_id)
# Ship order
Log.with_correlation_id(correlation_id).info("Shipping order")
await ship_order_async(order_id)
Log.with_correlation_id(correlation_id).info("Order processing complete")
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Async order processing failed",
exception=e
)
raiseCorrelation ID Format
Use consistent formats for correlation IDs:
import uuid
from datetime import datetime
# UUID-based (recommended)
correlation_id = str(uuid.uuid4()) # "550e8400-e29b-41d4-a716-446655440000"
# Request-based with timestamp
correlation_id = f"req_{datetime.utcnow().timestamp()}_{random_string()}"
# Service-based identifier
correlation_id = f"svc_payment_{order_id}_{uuid.uuid4().hex[:8]}"
# Simple incrementing ID (in high-throughput systems, use UUID instead)
correlation_id = f"req_{request_counter}"import uuid
from datetime import datetime
# UUID-based (recommended)
correlation_id = str(uuid.uuid4()) # "550e8400-e29b-41d4-a716-446655440000"
# Request-based with timestamp
correlation_id = f"req_{datetime.utcnow().timestamp()}_{random_string()}"
# Service-based identifier
correlation_id = f"svc_payment_{order_id}_{uuid.uuid4().hex[:8]}"
# Simple incrementing ID (in high-throughput systems, use UUID instead)
correlation_id = f"req_{request_counter}"Best Practices
Generate correlation IDs early:
# Good - generate at application entry point
def handle_request(request):
correlation_id = generate_or_extract_correlation_id(request)
# ... rest of processing ...# Good - generate at application entry point
def handle_request(request):
correlation_id = generate_or_extract_correlation_id(request)
# ... rest of processing ...Pass correlation IDs to all downstream calls:
# Good - explicit passing
result = downstream_service(data, correlation_id=correlation_id)
# Avoid - implicit global state (harder to test)
global CURRENT_CORRELATION_ID
CURRENT_CORRELATION_ID = correlation_id# Good - explicit passing
result = downstream_service(data, correlation_id=correlation_id)
# Avoid - implicit global state (harder to test)
global CURRENT_CORRELATION_ID
CURRENT_CORRELATION_ID = correlation_idUse consistent header names:
# Good - standard header name
correlation_id = request.headers.get("X-Correlation-ID")
# Document your convention
"""
Standard headers:
- X-Correlation-ID: Unique request identifier
- X-Request-ID: Alias for X-Correlation-ID
"""# Good - standard header name
correlation_id = request.headers.get("X-Correlation-ID")
# Document your convention
"""
Standard headers:
- X-Correlation-ID: Unique request identifier
- X-Request-ID: Alias for X-Correlation-ID
"""Include correlation ID in error responses:
def error_response(error, correlation_id):
"""Return error response with correlation ID for support."""
return {
"error": str(error),
"correlation_id": correlation_id,
"message": "Please reference this ID when contacting support"
}def error_response(error, correlation_id):
"""Return error response with correlation ID for support."""
return {
"error": str(error),
"correlation_id": correlation_id,
"message": "Please reference this ID when contacting support"
}Complete Example
from bytehide_logs import Log
import uuid
def handle_checkout(request):
"""Complete checkout flow with correlation ID tracking."""
# Generate correlation ID
correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
Log.with_correlation_id(correlation_id).info("Checkout initiated")
try:
# Identify user
user = authenticate_user(request)
Log.identify(user)
# Start transaction
Log.with_correlation_id(correlation_id).info("Starting payment transaction")
transaction = start_transaction()
# Validate cart
Log.with_correlation_id(correlation_id).info("Validating shopping cart")
cart = validate_cart(request)
# Process payment
Log.with_correlation_id(correlation_id).info("Processing payment")
payment = process_payment(user, cart, transaction)
# Fulfill order
Log.with_correlation_id(correlation_id).info("Fulfilling order")
order = create_order(user, cart, payment)
Log.with_correlation_id(correlation_id).info("Checkout completed successfully")
return {
"order_id": order.id,
"correlation_id": correlation_id
}
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Checkout failed",
exception=e,
context={"step": "payment"}
)
return {"error": "Checkout failed", "correlation_id": correlation_id}from bytehide_logs import Log
import uuid
def handle_checkout(request):
"""Complete checkout flow with correlation ID tracking."""
# Generate correlation ID
correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
Log.with_correlation_id(correlation_id).info("Checkout initiated")
try:
# Identify user
user = authenticate_user(request)
Log.identify(user)
# Start transaction
Log.with_correlation_id(correlation_id).info("Starting payment transaction")
transaction = start_transaction()
# Validate cart
Log.with_correlation_id(correlation_id).info("Validating shopping cart")
cart = validate_cart(request)
# Process payment
Log.with_correlation_id(correlation_id).info("Processing payment")
payment = process_payment(user, cart, transaction)
# Fulfill order
Log.with_correlation_id(correlation_id).info("Fulfilling order")
order = create_order(user, cart, payment)
Log.with_correlation_id(correlation_id).info("Checkout completed successfully")
return {
"order_id": order.id,
"correlation_id": correlation_id
}
except Exception as e:
Log.with_correlation_id(correlation_id).error(
"Checkout failed",
exception=e,
context={"step": "payment"}
)
return {"error": "Checkout failed", "correlation_id": correlation_id}Next Steps
- Learn about user identification to track which users are affected
- Explore tags to categorize operations
- Discover metadata context to add operational details