Integrations¶
Real-world examples of integrating aiocop with popular frameworks and tools.
Table of Contents¶
Standalone (No Framework)¶
A minimal example with no dependencies - just Python and aiocop:
# test_aiocop.py
import asyncio
import aiocop
def on_slow_task(event):
print(f"SLOW TASK DETECTED: {event.elapsed_ms:.1f}ms")
print(f" Severity: {event.severity_level}")
print(f" Blocking events: {len(event.blocking_events)}")
for evt in event.blocking_events:
print(f" - {evt['event']} at {evt['entry_point']}")
async def blocking_task():
print("Executing task...")
# This synchronous open() will block the loop!
with open("/dev/null", "w") as f:
f.write("data")
await asyncio.sleep(0.1)
async def main():
# Setup aiocop
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection(trace_depth=5)
aiocop.detect_slow_tasks(threshold_ms=10, on_slow_task=on_slow_task)
aiocop.activate()
# Run your code as normal
await asyncio.gather(blocking_task(), blocking_task())
aiocop.deactivate()
if __name__ == "__main__":
asyncio.run(main())
Run it:
Output:
Executing task...
Executing task...
SLOW TASK DETECTED: 102.3ms
Severity: medium
Blocking events: 1
- open(/dev/null, w) at test_aiocop.py:15:blocking_task
SLOW TASK DETECTED: 103.1ms
Severity: medium
Blocking events: 1
- open(/dev/null, w) at test_aiocop.py:15:blocking_task
FastAPI¶
Basic Integration¶
from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiocop
def setup_aiocop() -> None:
"""Configure aiocop monitoring."""
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection(trace_depth=20)
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=log_slow_task)
def log_slow_task(event: aiocop.SlowTaskEvent) -> None:
"""Log slow task events."""
if event.exceeded_threshold:
print(f"Slow task: {event.elapsed_ms:.1f}ms, severity: {event.severity_level}")
for evt in event.blocking_events:
print(f" - {evt['event']} at {evt['entry_point']}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: activate monitoring
aiocop.activate()
yield
# Shutdown: deactivate monitoring
aiocop.deactivate()
# Setup before creating the app
setup_aiocop()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Hello World"}
With Request Context¶
Capture request information in aiocop events:
from contextvars import ContextVar
from typing import Any
from uuid import uuid4
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import aiocop
# Context variable for request ID
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
class RequestIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid4()))
request_id_var.set(request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
def request_context_provider() -> dict[str, Any]:
"""Provide request context to aiocop callbacks."""
return {
"request_id": request_id_var.get(),
}
def log_slow_task(event: aiocop.SlowTaskEvent) -> None:
if event.exceeded_threshold:
request_id = event.context.get("request_id", "unknown")
print(f"[{request_id}] Slow task: {event.elapsed_ms:.1f}ms")
# Setup
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection()
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=log_slow_task)
aiocop.register_context_provider(request_context_provider)
app = FastAPI()
app.add_middleware(RequestIdMiddleware)
Starlette¶
from contextlib import asynccontextmanager
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
import aiocop
def setup_aiocop() -> None:
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection()
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=log_slow_task)
def log_slow_task(event: aiocop.SlowTaskEvent) -> None:
if event.exceeded_threshold:
print(f"Blocking detected: {event.elapsed_ms:.1f}ms")
@asynccontextmanager
async def lifespan(app):
aiocop.activate()
yield
aiocop.deactivate()
async def homepage(request):
return JSONResponse({"hello": "world"})
setup_aiocop()
app = Starlette(
routes=[Route("/", homepage)],
lifespan=lifespan,
)
aiohttp¶
from aiohttp import web
import aiocop
def setup_aiocop() -> None:
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection()
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=log_slow_task)
def log_slow_task(event: aiocop.SlowTaskEvent) -> None:
if event.exceeded_threshold:
print(f"Blocking: {event.elapsed_ms:.1f}ms - {event.reason}")
async def on_startup(app):
aiocop.activate()
async def on_cleanup(app):
aiocop.deactivate()
async def handle(request):
return web.json_response({"message": "Hello"})
setup_aiocop()
app = web.Application()
app.router.add_get("/", handle)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
if __name__ == "__main__":
web.run_app(app)
Datadog¶
APM Integration¶
Capture aiocop events as Datadog span tags:
from typing import Any
from ddtrace import tracer
import aiocop
def datadog_context_provider() -> dict[str, Any]:
"""Capture the current Datadog span."""
return {"dd_span": tracer.current_span()}
def send_to_datadog(event: aiocop.SlowTaskEvent) -> None:
"""Tag the Datadog span with slow task info."""
if not event.exceeded_threshold:
return
span = event.context.get("dd_span")
if span is None:
return
# Tag the span
span.set_tag("aiocop.slow_task", True)
span.set_tag("aiocop.severity_level", event.severity_level)
span.set_tag("aiocop.reason", event.reason)
# Add metrics
span.set_metric("aiocop.elapsed_ms", event.elapsed_ms)
span.set_metric("aiocop.severity_score", event.severity_score)
span.set_metric("aiocop.blocking_events_count", len(event.blocking_events))
# Add blocking event details
if event.blocking_events:
events_summary = "; ".join(e["event"] for e in event.blocking_events[:5])
span.set_tag("aiocop.blocking_events", events_summary)
# Setup
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection()
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=send_to_datadog)
aiocop.register_context_provider(datadog_context_provider)
aiocop.activate()
StatsD Metrics¶
from datadog import statsd
import aiocop
def send_metrics(event: aiocop.SlowTaskEvent) -> None:
"""Send metrics to Datadog StatsD."""
tags = [
f"severity:{event.severity_level}",
f"reason:{event.reason}",
f"exceeded_threshold:{event.exceeded_threshold}",
]
# Always send elapsed time
statsd.histogram("aiocop.task.elapsed_ms", event.elapsed_ms, tags=tags)
# Count slow tasks
if event.exceeded_threshold:
statsd.increment("aiocop.slow_task.count", tags=tags)
statsd.histogram("aiocop.slow_task.severity_score", event.severity_score, tags=tags)
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=send_metrics)
Prometheus¶
from prometheus_client import Counter, Histogram
import aiocop
# Define metrics
SLOW_TASK_COUNT = Counter(
"aiocop_slow_tasks_total",
"Total number of slow tasks detected",
["severity_level", "reason"],
)
TASK_DURATION = Histogram(
"aiocop_task_duration_milliseconds",
"Task duration in milliseconds",
["exceeded_threshold"],
buckets=[10, 25, 50, 100, 250, 500, 1000],
)
SEVERITY_SCORE = Histogram(
"aiocop_severity_score",
"Severity score of blocking events",
buckets=[1, 10, 25, 50, 100, 200],
)
def prometheus_callback(event: aiocop.SlowTaskEvent) -> None:
"""Record metrics in Prometheus."""
# Record duration for all tasks
TASK_DURATION.labels(
exceeded_threshold=str(event.exceeded_threshold).lower()
).observe(event.elapsed_ms)
# Record slow tasks
if event.exceeded_threshold:
SLOW_TASK_COUNT.labels(
severity_level=event.severity_level,
reason=event.reason,
).inc()
# Record severity score
if event.severity_score > 0:
SEVERITY_SCORE.observe(event.severity_score)
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=prometheus_callback)
Structured Logging¶
With structlog¶
import structlog
import aiocop
logger = structlog.get_logger()
def structured_log_callback(event: aiocop.SlowTaskEvent) -> None:
"""Log slow tasks with structured logging."""
if not event.exceeded_threshold:
return
log = logger.bind(
elapsed_ms=event.elapsed_ms,
threshold_ms=event.threshold_ms,
severity_score=event.severity_score,
severity_level=event.severity_level,
reason=event.reason,
blocking_events_count=len(event.blocking_events),
request_id=event.context.get("request_id"),
)
if event.severity_level == "high":
log.error("High severity blocking I/O detected")
elif event.severity_level == "medium":
log.warning("Medium severity blocking I/O detected")
else:
log.info("Low severity blocking I/O detected")
# Log individual blocking events
for evt in event.blocking_events:
log.debug(
"Blocking event",
event=evt["event"],
entry_point=evt["entry_point"],
severity=evt["severity"],
)
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=structured_log_callback)
With Python logging (JSON)¶
import json
import logging
import aiocop
logger = logging.getLogger("aiocop")
def json_log_callback(event: aiocop.SlowTaskEvent) -> None:
"""Log slow tasks as JSON."""
if not event.exceeded_threshold:
return
log_data = {
"message": "Slow task detected",
"elapsed_ms": event.elapsed_ms,
"threshold_ms": event.threshold_ms,
"severity_score": event.severity_score,
"severity_level": event.severity_level,
"reason": event.reason,
"blocking_events": [
{"event": e["event"], "entry_point": e["entry_point"]}
for e in event.blocking_events
],
"context": event.context,
}
if event.severity_level == "high":
logger.error(json.dumps(log_data))
else:
logger.warning(json.dumps(log_data))
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=json_log_callback)
Sentry¶
import sentry_sdk
import aiocop
def sentry_callback(event: aiocop.SlowTaskEvent) -> None:
"""Report high-severity blocking to Sentry."""
if event.severity_level != "high":
return
with sentry_sdk.push_scope() as scope:
scope.set_tag("aiocop.severity_level", event.severity_level)
scope.set_tag("aiocop.reason", event.reason)
scope.set_extra("elapsed_ms", event.elapsed_ms)
scope.set_extra("threshold_ms", event.threshold_ms)
scope.set_extra("severity_score", event.severity_score)
scope.set_extra("blocking_events", event.blocking_events)
scope.set_extra("context", event.context)
# Create a breadcrumb for each blocking event
for evt in event.blocking_events:
sentry_sdk.add_breadcrumb(
category="aiocop",
message=evt["event"],
data={"trace": evt["trace"], "severity": evt["severity"]},
level="warning",
)
sentry_sdk.capture_message(
f"High severity blocking I/O: {event.elapsed_ms:.1f}ms",
level="warning",
)
aiocop.detect_slow_tasks(threshold_ms=30, on_slow_task=sentry_callback)
Combining Multiple Integrations¶
import aiocop
def combined_callback(event: aiocop.SlowTaskEvent) -> None:
"""Route events to multiple systems."""
# Always send metrics
send_to_prometheus(event)
if event.exceeded_threshold:
# Log all slow tasks
log_slow_task(event)
# Send high severity to alerting
if event.severity_level == "high":
send_to_sentry(event)
send_to_pagerduty(event)
# Or register multiple callbacks
aiocop.register_slow_task_callback(prometheus_callback)
aiocop.register_slow_task_callback(structured_log_callback)
aiocop.register_slow_task_callback(datadog_callback)
CI/CD Integration - Fail Tests on Blocking I/O¶
Use aiocop to prevent blocking code from being merged by raising exceptions in your integration tests.
pytest Integration¶
# conftest.py
import importlib
import pytest
import aiocop
# Libraries that do lazy-loading (would cause false positives)
LAZY_LOAD_LIBS = ["anyio", "httpx", "httpcore"]
@pytest.fixture(scope="session", autouse=True)
def preload_lazy_libraries():
"""
Pre-import libraries that do lazy loading to avoid false positives.
Import-time blocking is expected; runtime blocking is not.
"""
for lib in LAZY_LOAD_LIBS:
try:
importlib.import_module(lib)
except ImportError:
pass
@pytest.fixture(scope="session", autouse=True)
def setup_aiocop():
"""Set up aiocop for the test session."""
aiocop.patch_audit_functions()
aiocop.start_blocking_io_detection()
aiocop.detect_slow_tasks(threshold_ms=50)
aiocop.activate()
# Don't raise by default - enable per-test or per-view
aiocop.disable_raise_on_violations()
Context Manager for Async Views¶
Wrap async view execution to catch blocking I/O:
# io_blocking_context.py
import aiocop
class BlockingIODetector:
"""
Context manager that raises HighSeverityBlockingIoException
when blocking I/O is detected in async code.
Use in integration tests to catch blocking code before it's merged.
"""
def __init__(self):
self._context_manager = None
def __enter__(self):
aiocop.reset_blocking_events()
self._context_manager = aiocop.raise_on_violations()
self._context_manager.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._context_manager is not None:
self._context_manager.__exit__(exc_type, exc_val, exc_tb)
Why Wrap Only the View (Not the Entire Test)?¶
Tests often contain blocking code that's perfectly fine:
@pytest.mark.asyncio
async def test_my_endpoint(client, db):
# Test setup - blocking is OK here
user = User.objects.create(name="test") # Sync DB call (fine)
with open("fixtures/data.json") as f: # File I/O (fine)
data = json.load(f)
# The view execution - blocking is NOT OK here
with BlockingIODetector():
response = await client.post("/api/users", json=data)
# Assertions - blocking is OK here
assert response.status_code == 201
assert User.objects.count() == 2 # Sync DB call (fine)
We only care about blocking inside the async view itself, not in test setup/teardown. Enabling raise_on_violations for the entire test would cause false positives from:
- Database fixtures and factory setup
- File-based test data loading
- Synchronous assertions and verifications
- Test cleanup operations
By wrapping only the view execution, you get precise detection of production-relevant blocking code.
Using in Tests¶
# test_views.py
import pytest
from io_blocking_context import BlockingIODetector
@pytest.mark.asyncio
async def test_my_async_endpoint(client):
"""This test will FAIL if the endpoint has blocking I/O."""
with BlockingIODetector():
response = await client.get("/api/my-endpoint")
assert response.status_code == 200
Framework Integration (Django Ninja Example)¶
Automatically wrap all async views in your test environment:
# In your framework wrapper
from functools import wraps
TESTING = False # Set to True in conftest.py
def wrap_async_view(view_func):
@wraps(view_func)
async def wrapper(*args, **kwargs):
if TESTING:
with BlockingIODetector():
return await view_func(*args, **kwargs)
return await view_func(*args, **kwargs)
return wrapper
Benefits¶
- Catch blocking code in CI before it reaches production
- Self-documenting tests - failures clearly indicate blocking I/O
- Gradual adoption - enable per-test or per-view
- No production overhead - only active during testing
Handling False Positives¶
Some blocking is expected (e.g., lazy imports). Handle with:
# Pre-import lazy libraries in conftest.py
LAZY_LOAD_LIBS = ["anyio", "httpx", "httpcore", "your_lazy_lib"]
@pytest.fixture(scope="session", autouse=True)
def preload_lazy_libraries():
for lib in LAZY_LOAD_LIBS:
try:
importlib.import_module(lib)
except ImportError:
pass
Gradual Rollout¶
Roll out aiocop monitoring gradually:
import os
import random
import aiocop
# Configuration
AIOCOP_ENABLED = os.getenv("AIOCOP_ENABLED", "true").lower() == "true"
AIOCOP_SAMPLE_RATE = float(os.getenv("AIOCOP_SAMPLE_RATE", "1.0"))
def maybe_activate():
"""Activate monitoring based on sample rate."""
if not AIOCOP_ENABLED:
return
if random.random() < AIOCOP_SAMPLE_RATE:
aiocop.activate()
else:
aiocop.deactivate()
# In your request middleware
class AiocopMiddleware:
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
maybe_activate()
await self.app(scope, receive, send)