microservice-patternsProvides guidance and patterns for decomposing monoliths, inter-service communication, data management, and resilience in microservice architectures.
Install via ClawdBot CLI:
clawdbot install wpank/microservice-patternsPatterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.
microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS
| If you have... | Then... |
|----------------|---------|
| Small team (<5 devs), simple domain | Start with monolith |
| Need independent deployment/scaling | Consider microservices |
| Multiple teams, clear domain boundaries | Microservices work well |
| Tight deadlines, unknown requirements | Monolith first, extract later |
Rule of thumb: If you can't define clear service boundaries, you're not ready for microservices.
Organize services around business functions, not technical layers.
E-commerce Example:
βββ order-service # Order lifecycle
βββ payment-service # Payment processing
βββ inventory-service # Stock management
βββ shipping-service # Fulfillment
βββ notification-service # Emails, SMS
Gradually extract from monolith without big-bang rewrites.
1. Identify bounded context to extract
2. Create new microservice
3. Route new traffic to microservice
4. Gradually migrate existing functionality
5. Remove from monolith when complete
# API Gateway routing during migration
async def route_orders(request):
if request.path.startswith("/api/orders/v2"):
return await new_order_service.forward(request)
else:
return await legacy_monolith.forward(request)
Use for: Queries, when you need immediate response.
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=5.0)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def get(self, path: str):
"""GET with automatic retries."""
response = await self.client.get(f"{self.base_url}{path}")
response.raise_for_status()
return response.json()
# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get(f"/payments/{payment_id}")
Use for: Commands, when eventual consistency is acceptable.
from aiokafka import AIOKafkaProducer
import json
@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict
class EventBus:
def __init__(self, bootstrap_servers: List[str]):
self.producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
async def publish(self, event: DomainEvent):
await self.producer.send_and_wait(
event.event_type, # Topic = event type
value=asdict(event),
key=event.aggregate_id.encode()
)
# Order service publishes
await event_bus.publish(DomainEvent(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=order.id,
occurred_at=datetime.now(),
data={"order_id": order.id, "customer_id": order.customer_id}
))
# Inventory service subscribes and reacts
async def handle_order_created(event_data: dict):
order_id = event_data["data"]["order_id"]
items = event_data["data"]["items"]
await reserve_inventory(order_id, items)
| Synchronous | Asynchronous |
|-------------|--------------|
| Need immediate response | Fire-and-forget |
| Simple query/response | Long-running operations |
| Low latency required | Decoupling is priority |
| Tight coupling acceptable | Eventual consistency OK |
Each service owns its data. No shared databases.
order-service β orders_db (PostgreSQL)
payment-service β payments_db (PostgreSQL)
product-service β products_db (MongoDB)
analytics-service β analytics_db (ClickHouse)
For operations spanning multiple services that need rollback capability.
class SagaStep:
def __init__(self, name: str, action: Callable, compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation
class OrderFulfillmentSaga:
def __init__(self):
self.steps = [
SagaStep("create_order", self.create_order, self.cancel_order),
SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
SagaStep("process_payment", self.process_payment, self.refund_payment),
SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation),
]
async def execute(self, order_data: dict) -> SagaResult:
completed_steps = []
context = {"order_data": order_data}
for step in self.steps:
try:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=result.error)
completed_steps.append(step)
context.update(result.data)
except Exception as e:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=str(e))
return SagaResult(status="completed", data=context)
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""Execute compensating actions in reverse order."""
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# Log but continue compensating
logger.error(f"Compensation failed for {step.name}: {e}")
Fail fast when a service is down. Prevents cascade failures.
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func: Callable, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen("Service unavailable")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
def _should_attempt_reset(self) -> bool:
return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout)
# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(data: dict):
return await breaker.call(payment_client.post, "/payments", json=data)
For transient failures.
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
)
async def fetch_user(user_id: str):
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
Isolate resources to limit impact of failures.
import asyncio
class Bulkhead:
def __init__(self, max_concurrent: int):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def call(self, func: Callable, *args, **kwargs):
async with self.semaphore:
return await func(*args, **kwargs)
# Limit concurrent calls to each service
payment_bulkhead = Bulkhead(max_concurrent=10)
inventory_bulkhead = Bulkhead(max_concurrent=20)
result = await payment_bulkhead.call(payment_service.charge, amount)
Single entry point for all clients.
from fastapi import FastAPI, Depends, HTTPException
from circuitbreaker import circuit
app = FastAPI()
class APIGateway:
def __init__(self):
self.clients = {
"orders": httpx.AsyncClient(base_url="http://order-service:8000"),
"payments": httpx.AsyncClient(base_url="http://payment-service:8001"),
"inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"),
}
@circuit(failure_threshold=5, recovery_timeout=30)
async def forward(self, service: str, path: str, **kwargs):
client = self.clients[service]
response = await client.request(**kwargs, url=path)
response.raise_for_status()
return response.json()
async def aggregate(self, order_id: str) -> dict:
"""Aggregate data from multiple services."""
results = await asyncio.gather(
self.forward("orders", f"/orders/{order_id}", method="GET"),
self.forward("payments", f"/payments/order/{order_id}", method="GET"),
self.forward("inventory", f"/reservations/order/{order_id}", method="GET"),
return_exceptions=True
)
return {
"order": results[0] if not isinstance(results[0], Exception) else None,
"payment": results[1] if not isinstance(results[1], Exception) else None,
"inventory": results[2] if not isinstance(results[2], Exception) else None,
}
gateway = APIGateway()
@app.get("/api/orders/{order_id}")
async def get_order_aggregate(order_id: str):
return await gateway.aggregate(order_id)
Every service needs liveness and readiness probes.
@app.get("/health/live")
async def liveness():
"""Is the process running?"""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""Can we serve traffic?"""
checks = {
"database": await check_database(),
"cache": await check_redis(),
}
all_healthy = all(checks.values())
status = "ready" if all_healthy else "not_ready"
return {"status": status, "checks": checks}
AI Usage Analysis
Analysis is being generated⦠refresh in a few seconds.
Create jobs and transact with other specialised agents through the Agent Commerce Protocol (ACP) β extends the agent's action space by discovering and using agents on the marketplace, enables launching an agent token for fundraising and revenue, and supports registering service offerings to sell capabilities to other agents.
Write, structure, and update a business plan for a solopreneur. Use when creating a plan from scratch, updating an existing plan after a pivot or new phase, or preparing a plan to share with investors, partners, or even just to clarify your own strategy. Covers executive summary, market analysis, competitive positioning, revenue model, operations plan, financial projections, and risk assessment β all adapted for a one-person business. Trigger on "write a business plan", "business plan", "create my plan", "business plan template", "update my business plan", "plan for my business", "investor pitch plan".
Executive leadership guidance for strategic decision-making, organizational development, and stakeholder management. Includes strategy analyzer, financial scenario modeling, board governance frameworks, and investor relations playbooks. Use when planning strategy, preparing board presentations, managing investors, developing organizational culture, making executive decisions, or when user mentions CEO, strategic planning, board meetings, investor updates, organizational leadership, or executive strategy.
Strategic product leadership toolkit for Head of Product including OKR cascade generation, market analysis, vision setting, and team scaling. Use for strategic planning, goal alignment, competitive analysis, and organizational design.
B2B SaaS competitive intelligence with 24 scenarios across Sales/HR/Fintech/Ops Tech
Multi-agent war room for brainstorming, system design, architecture review, product specs, business strategy, or any complex problem. Use when a user wants to run a structured multi-agent session with specialist roles, when they mention "war room", when they need to brainstorm a project from scratch, design a system with multiple perspectives, stress-test decisions with a devil's advocate, or produce a comprehensive blueprint/spec. Works for software, hardware, content, business β any domain.