Event-Driven Architecture: Building Reactive and Scalable Systems
Comprehensive guide to event-driven architecture patterns, from basic pub/sub to complex event sourcing and CQRS implementations with real-world examples.
Event-Driven Architecture: Building Reactive and Scalable Systems
Event-driven architecture (EDA) is a paradigm that promotes the production, detection, consumption, and reaction to events. This approach enables systems to be more responsive, resilient, and scalable by decoupling components and enabling asynchronous communication. This comprehensive guide explores EDA patterns, implementations, and best practices.
Understanding Event-Driven Architecture
Core Concepts
Events are immutable facts about something that happened in the system. Unlike commands (which express intent), events represent completed actions.
Event Producers generate events when state changes occur.
Event Consumers react to events by performing business logic, updating views, or triggering other events.
Event Sourcing Implementation
Event Sourcing stores all changes as a sequence of events rather than just the current state.
import uuid
import json
import time
from typing import Dict, List, Any, Optional, Type
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
# Base Event Classes
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
event_data: Dict[str, Any]
timestamp: float
version: int
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Event':
return cls(**data)
# Domain Events
@dataclass
class UserRegistered(Event):
def __init__(self, user_id: str, email: str, name: str, version: int):
super().__init__(
event_id=str(uuid.uuid4()),
aggregate_id=user_id,
event_type="UserRegistered",
event_data={"email": email, "name": name},
timestamp=time.time(),
version=version
)
@dataclass
class UserEmailChanged(Event):
def __init__(self, user_id: str, old_email: str, new_email: str, version: int):
super().__init__(
event_id=str(uuid.uuid4()),
aggregate_id=user_id,
event_type="UserEmailChanged",
event_data={"old_email": old_email, "new_email": new_email},
timestamp=time.time(),
version=version
)
@dataclass
class UserDeactivated(Event):
def __init__(self, user_id: str, reason: str, version: int):
super().__init__(
event_id=str(uuid.uuid4()),
aggregate_id=user_id,
event_type="UserDeactivated",
event_data={"reason": reason},
timestamp=time.time(),
version=version
)
# Event Store Interface
class EventStore(ABC):
@abstractmethod
def append_events(self, aggregate_id: str, events: List[Event], expected_version: int) -> None:
pass
@abstractmethod
def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
pass
@abstractmethod
def get_all_events(self, from_timestamp: float = 0) -> List[Event]:
pass
# In-Memory Event Store Implementation
class InMemoryEventStore(EventStore):
def __init__(self):
self.events: Dict[str, List[Event]] = {}
self.global_events: List[Event] = []
self.snapshots: Dict[str, Dict] = {}
self._lock = threading.Lock()
def append_events(self, aggregate_id: str, events: List[Event], expected_version: int) -> None:
with self._lock:
if aggregate_id not in self.events:
self.events[aggregate_id] = []
current_version = len(self.events[aggregate_id])
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, but current version is {current_version}"
)
# Append events
for event in events:
self.events[aggregate_id].append(event)
self.global_events.append(event)
def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
with self._lock:
if aggregate_id not in self.events:
return []
return self.events[aggregate_id][from_version:]
def get_all_events(self, from_timestamp: float = 0) -> List[Event]:
with self._lock:
return [event for event in self.global_events if event.timestamp >= from_timestamp]
def create_snapshot(self, aggregate_id: str, version: int, snapshot_data: Dict) -> None:
with self._lock:
self.snapshots[aggregate_id] = {
'version': version,
'data': snapshot_data,
'timestamp': time.time()
}
def get_snapshot(self, aggregate_id: str) -> Optional[Dict]:
return self.snapshots.get(aggregate_id)
class ConcurrencyError(Exception):
pass
# Aggregate Base Class
class AggregateRoot(ABC):
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.version = 0
self.uncommitted_events: List[Event] = []
def apply_event(self, event: Event):
"""Apply event to aggregate state"""
self.version = event.version
handler_name = f"_on_{event.event_type.lower()}"
if hasattr(self, handler_name):
getattr(self, handler_name)(event)
def raise_event(self, event: Event):
"""Raise a new event"""
event.version = self.version + 1
self.apply_event(event)
self.uncommitted_events.append(event)
def mark_events_as_committed(self):
"""Mark uncommitted events as committed"""
self.uncommitted_events.clear()
def load_from_history(self, events: List[Event]):
"""Rebuild aggregate from event history"""
for event in events:
self.apply_event(event)
# User Aggregate Implementation
class User(AggregateRoot):
def __init__(self, user_id: str):
super().__init__(user_id)
self.email = None
self.name = None
self.is_active = False
@classmethod
def register(cls, user_id: str, email: str, name: str) -> 'User':
"""Factory method to create new user"""
user = cls(user_id)
event = UserRegistered(user_id, email, name, version=1)
user.raise_event(event)
return user
def change_email(self, new_email: str):
"""Change user email"""
if not self.is_active:
raise ValueError("Cannot change email of inactive user")
if self.email == new_email:
return # No change needed
event = UserEmailChanged(self.aggregate_id, self.email, new_email, self.version + 1)
self.raise_event(event)
def deactivate(self, reason: str):
"""Deactivate user"""
if not self.is_active:
return # Already inactive
event = UserDeactivated(self.aggregate_id, reason, self.version + 1)
self.raise_event(event)
# Event Handlers
def _on_userregistered(self, event: Event):
self.email = event.event_data['email']
self.name = event.event_data['name']
self.is_active = True
def _on_useremailchanged(self, event: Event):
self.email = event.event_data['new_email']
def _on_userdeactivated(self, event: Event):
self.is_active = False
# Repository Pattern for Aggregates
class Repository:
def __init__(self, event_store: EventStore, aggregate_class: Type[AggregateRoot]):
self.event_store = event_store
self.aggregate_class = aggregate_class
def save(self, aggregate: AggregateRoot) -> None:
"""Save aggregate changes to event store"""
if aggregate.uncommitted_events:
expected_version = aggregate.version - len(aggregate.uncommitted_events)
self.event_store.append_events(
aggregate.aggregate_id,
aggregate.uncommitted_events,
expected_version
)
aggregate.mark_events_as_committed()
def get_by_id(self, aggregate_id: str) -> Optional[AggregateRoot]:
"""Load aggregate from event store"""
events = self.event_store.get_events(aggregate_id)
if not events:
return None
aggregate = self.aggregate_class(aggregate_id)
aggregate.load_from_history(events)
return aggregate
# Command Pattern
@dataclass
class Command:
command_id: str
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
@dataclass
class RegisterUserCommand(Command):
user_id: str
email: str
name: str
@dataclass
class ChangeUserEmailCommand(Command):
user_id: str
new_email: str
@dataclass
class DeactivateUserCommand(Command):
user_id: str
reason: str
# Command Handler
class UserCommandHandler:
def __init__(self, user_repository: Repository):
self.user_repository = user_repository
def handle_register_user(self, command: RegisterUserCommand):
"""Handle user registration command"""
# Check if user already exists
existing_user = self.user_repository.get_by_id(command.user_id)
if existing_user:
raise ValueError(f"User {command.user_id} already exists")
# Create new user
user = User.register(command.user_id, command.email, command.name)
self.user_repository.save(user)
def handle_change_email(self, command: ChangeUserEmailCommand):
"""Handle change email command"""
user = self.user_repository.get_by_id(command.user_id)
if not user:
raise ValueError(f"User {command.user_id} not found")
user.change_email(command.new_email)
self.user_repository.save(user)
def handle_deactivate_user(self, command: DeactivateUserCommand):
"""Handle user deactivation command"""
user = self.user_repository.get_by_id(command.user_id)
if not user:
raise ValueError(f"User {command.user_id} not found")
user.deactivate(command.reason)
self.user_repository.save(user)
CQRS (Command Query Responsibility Segregation)
CQRS separates read and write operations, allowing for optimized data models for each.
import threading
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
# Read Models (Query Side)
@dataclass
class UserReadModel:
user_id: str
email: str
name: str
is_active: bool
registration_date: float
last_updated: float
profile_completion: int = 0
class ReadModelStore(ABC):
@abstractmethod
def save_user(self, user: UserReadModel) -> None:
pass
@abstractmethod
def get_user(self, user_id: str) -> Optional[UserReadModel]:
pass
@abstractmethod
def get_users_by_email(self, email: str) -> List[UserReadModel]:
pass
@abstractmethod
def get_active_users(self) -> List[UserReadModel]:
pass
@abstractmethod
def search_users(self, query: str) -> List[UserReadModel]:
pass
class InMemoryReadModelStore(ReadModelStore):
def __init__(self):
self.users: Dict[str, UserReadModel] = {}
self.email_index: Dict[str, List[str]] = {}
self._lock = threading.Lock()
def save_user(self, user: UserReadModel) -> None:
with self._lock:
# Update main store
old_user = self.users.get(user.user_id)
self.users[user.user_id] = user
# Update email index
if old_user and old_user.email != user.email:
# Remove old email mapping
if old_user.email in self.email_index:
self.email_index[old_user.email] = [
uid for uid in self.email_index[old_user.email]
if uid != user.user_id
]
# Add new email mapping
if user.email not in self.email_index:
self.email_index[user.email] = []
if user.user_id not in self.email_index[user.email]:
self.email_index[user.email].append(user.user_id)
def get_user(self, user_id: str) -> Optional[UserReadModel]:
return self.users.get(user_id)
def get_users_by_email(self, email: str) -> List[UserReadModel]:
user_ids = self.email_index.get(email, [])
return [self.users[uid] for uid in user_ids if uid in self.users]
def get_active_users(self) -> List[UserReadModel]:
return [user for user in self.users.values() if user.is_active]
def search_users(self, query: str) -> List[UserReadModel]:
query_lower = query.lower()
results = []
for user in self.users.values():
if (query_lower in user.name.lower() or
query_lower in user.email.lower()):
results.append(user)
return results
# Event Handlers (Projections)
class UserProjectionHandler:
def __init__(self, read_model_store: ReadModelStore):
self.read_model_store = read_model_store
def handle_user_registered(self, event: Event):
"""Project UserRegistered event to read model"""
user_data = event.event_data
read_model = UserReadModel(
user_id=event.aggregate_id,
email=user_data['email'],
name=user_data['name'],
is_active=True,
registration_date=event.timestamp,
last_updated=event.timestamp
)
self.read_model_store.save_user(read_model)
def handle_user_email_changed(self, event: Event):
"""Project UserEmailChanged event to read model"""
user = self.read_model_store.get_user(event.aggregate_id)
if user:
user.email = event.event_data['new_email']
user.last_updated = event.timestamp
self.read_model_store.save_user(user)
def handle_user_deactivated(self, event: Event):
"""Project UserDeactivated event to read model"""
user = self.read_model_store.get_user(event.aggregate_id)
if user:
user.is_active = False
user.last_updated = event.timestamp
self.read_model_store.save_user(user)
# Query Handlers
class UserQueryHandler:
def __init__(self, read_model_store: ReadModelStore):
self.read_model_store = read_model_store
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get complete user profile"""
user = self.read_model_store.get_user(user_id)
if not user:
return None
return {
'user_id': user.user_id,
'email': user.email,
'name': user.name,
'is_active': user.is_active,
'registration_date': user.registration_date,
'profile_completion': user.profile_completion
}
def search_users(self, query: str, active_only: bool = True) -> List[Dict[str, Any]]:
"""Search users by name or email"""
users = self.read_model_store.search_users(query)
if active_only:
users = [user for user in users if user.is_active]
return [
{
'user_id': user.user_id,
'email': user.email,
'name': user.name,
'is_active': user.is_active
}
for user in users
]
def get_user_statistics(self) -> Dict[str, Any]:
"""Get user statistics"""
all_users = list(self.read_model_store.users.values())
active_users = [user for user in all_users if user.is_active]
return {
'total_users': len(all_users),
'active_users': len(active_users),
'inactive_users': len(all_users) - len(active_users),
'recent_registrations': len([
user for user in all_users
if time.time() - user.registration_date < 86400 # Last 24 hours
])
}
# Event Bus for CQRS
class EventBus:
def __init__(self):
self.handlers: Dict[str, List[callable]] = {}
self._lock = threading.Lock()
def subscribe(self, event_type: str, handler: callable):
"""Subscribe handler to event type"""
with self._lock:
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def publish(self, event: Event):
"""Publish event to all subscribers"""
with self._lock:
handlers = self.handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"Error handling event {event.event_type}: {e}")
# In production, you'd want proper error handling/dead letter queues
# Complete CQRS System
class CQRSSystem:
def __init__(self):
self.event_store = InMemoryEventStore()
self.read_model_store = InMemoryReadModelStore()
self.event_bus = EventBus()
# Set up repositories and handlers
self.user_repository = Repository(self.event_store, User)
self.command_handler = UserCommandHandler(self.user_repository)
self.projection_handler = UserProjectionHandler(self.read_model_store)
self.query_handler = UserQueryHandler(self.read_model_store)
# Subscribe projection handlers to events
self.event_bus.subscribe('UserRegistered', self.projection_handler.handle_user_registered)
self.event_bus.subscribe('UserEmailChanged', self.projection_handler.handle_user_email_changed)
self.event_bus.subscribe('UserDeactivated', self.projection_handler.handle_user_deactivated)
# Set up event publishing from event store
self._setup_event_publishing()
def _setup_event_publishing(self):
"""Set up automatic event publishing when events are stored"""
original_append = self.event_store.append_events
def append_and_publish(aggregate_id: str, events: List[Event], expected_version: int):
original_append(aggregate_id, events, expected_version)
# Publish events to event bus
for event in events:
self.event_bus.publish(event)
self.event_store.append_events = append_and_publish
# Command Interface
def execute_command(self, command: Command):
"""Execute command"""
if isinstance(command, RegisterUserCommand):
self.command_handler.handle_register_user(command)
elif isinstance(command, ChangeUserEmailCommand):
self.command_handler.handle_change_email(command)
elif isinstance(command, DeactivateUserCommand):
self.command_handler.handle_deactivate_user(command)
else:
raise ValueError(f"Unknown command type: {type(command)}")
# Query Interface
def execute_query(self, query_name: str, **params):
"""Execute query"""
if query_name == 'get_user_profile':
return self.query_handler.get_user_profile(params['user_id'])
elif query_name == 'search_users':
return self.query_handler.search_users(
params['query'],
params.get('active_only', True)
)
elif query_name == 'get_user_statistics':
return self.query_handler.get_user_statistics()
else:
raise ValueError(f"Unknown query: {query_name}")
Saga Pattern for Distributed Transactions
Sagas manage long-running transactions across multiple services using compensating actions.
from enum import Enum
from typing import Dict, List, Any, Optional, Callable
import time
import threading
class SagaStepStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
step_id: str
saga_id: str
action: Callable
compensation: Callable
status: SagaStepStatus = SagaStepStatus.PENDING
result: Any = None
error: Optional[str] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class SagaOrchestrator:
def __init__(self):
self.sagas: Dict[str, Dict] = {}
self.saga_definitions: Dict[str, List[SagaStep]] = {}
self._lock = threading.Lock()
def define_saga(self, saga_type: str, steps: List[SagaStep]):
"""Define a saga workflow"""
self.saga_definitions[saga_type] = steps
def start_saga(self, saga_type: str, saga_id: str, context: Dict[str, Any]) -> str:
"""Start a saga instance"""
if saga_type not in self.saga_definitions:
raise ValueError(f"Saga type {saga_type} not defined")
with self._lock:
self.sagas[saga_id] = {
'saga_type': saga_type,
'saga_id': saga_id,
'context': context,
'current_step': 0,
'status': 'running',
'steps': [],
'start_time': time.time()
}
# Create step instances
for i, step_template in enumerate(self.saga_definitions[saga_type]):
step = SagaStep(
step_id=f"{saga_id}_step_{i}",
saga_id=saga_id,
action=step_template.action,
compensation=step_template.compensation
)
self.sagas[saga_id]['steps'].append(step)
# Start execution
threading.Thread(target=self._execute_saga, args=(saga_id,)).start()
return saga_id
def _execute_saga(self, saga_id: str):
"""Execute saga steps sequentially"""
saga = self.sagas[saga_id]
steps = saga['steps']
context = saga['context']
try:
# Execute steps forward
for i, step in enumerate(steps):
saga['current_step'] = i
step.status = SagaStepStatus.PENDING
try:
# Execute step action
result = step.action(context)
step.result = result
step.status = SagaStepStatus.COMPLETED
# Update context with result
if isinstance(result, dict):
context.update(result)
except Exception as e:
step.error = str(e)
step.status = SagaStepStatus.FAILED
# Start compensation
self._compensate_saga(saga_id, i)
return
# All steps completed successfully
saga['status'] = 'completed'
except Exception as e:
saga['status'] = 'failed'
saga['error'] = str(e)
def _compensate_saga(self, saga_id: str, failed_step_index: int):
"""Compensate completed steps in reverse order"""
saga = self.sagas[saga_id]
steps = saga['steps']
context = saga['context']
saga['status'] = 'compensating'
# Compensate completed steps in reverse order
for i in range(failed_step_index - 1, -1, -1):
step = steps[i]
if step.status == SagaStepStatus.COMPLETED:
step.status = SagaStepStatus.COMPENSATING
try:
compensation_result = step.compensation(context, step.result)
step.status = SagaStepStatus.COMPENSATED
# Update context with compensation result
if isinstance(compensation_result, dict):
context.update(compensation_result)
except Exception as e:
step.error = f"Compensation failed: {str(e)}"
saga['status'] = 'compensation_failed'
return
saga['status'] = 'compensated'
def get_saga_status(self, saga_id: str) -> Optional[Dict[str, Any]]:
"""Get current saga status"""
saga = self.sagas.get(saga_id)
if not saga:
return None
return {
'saga_id': saga_id,
'saga_type': saga['saga_type'],
'status': saga['status'],
'current_step': saga['current_step'],
'steps': [
{
'step_id': step.step_id,
'status': step.status.value,
'error': step.error
}
for step in saga['steps']
]
}
# Example: E-commerce Order Saga
class OrderService:
def __init__(self):
self.orders = {}
def create_order(self, context: Dict[str, Any]) -> Dict[str, Any]:
order_id = context['order_id']
user_id = context['user_id']
items = context['items']
order = {
'order_id': order_id,
'user_id': user_id,
'items': items,
'status': 'created',
'total_amount': sum(item['price'] * item['quantity'] for item in items)
}
self.orders[order_id] = order
print(f"Order {order_id} created")
return {'total_amount': order['total_amount']}
def cancel_order(self, context: Dict[str, Any], result: Any) -> Dict[str, Any]:
order_id = context['order_id']
if order_id in self.orders:
self.orders[order_id]['status'] = 'cancelled'
print(f"Order {order_id} cancelled")
return {}
class PaymentService:
def __init__(self):
self.payments = {}
def process_payment(self, context: Dict[str, Any]) -> Dict[str, Any]:
order_id = context['order_id']
user_id = context['user_id']
amount = context['total_amount']
# Simulate payment processing
payment_id = f"payment_{order_id}"
# Simulate payment failure for demonstration
import random
if random.random() < 0.3: # 30% failure rate
raise Exception("Payment processing failed")
payment = {
'payment_id': payment_id,
'order_id': order_id,
'user_id': user_id,
'amount': amount,
'status': 'completed'
}
self.payments[payment_id] = payment
print(f"Payment {payment_id} processed for ${amount}")
return {'payment_id': payment_id}
def refund_payment(self, context: Dict[str, Any], result: Any) -> Dict[str, Any]:
payment_id = result.get('payment_id') if result else None
if payment_id and payment_id in self.payments:
self.payments[payment_id]['status'] = 'refunded'
print(f"Payment {payment_id} refunded")
return {}
class InventoryService:
def __init__(self):
self.inventory = {
'item1': 100,
'item2': 50,
'item3': 25
}
self.reservations = {}
def reserve_items(self, context: Dict[str, Any]) -> Dict[str, Any]:
order_id = context['order_id']
items = context['items']
reservations = []
# Check and reserve items
for item in items:
item_id = item['item_id']
quantity = item['quantity']
if self.inventory.get(item_id, 0) < quantity:
# Release already reserved items
for reserved_item in reservations:
self.inventory[reserved_item['item_id']] += reserved_item['quantity']
raise Exception(f"Insufficient inventory for item {item_id}")
self.inventory[item_id] -= quantity
reservations.append(item)
self.reservations[order_id] = reservations
print(f"Items reserved for order {order_id}: {reservations}")
return {'reserved_items': reservations}
def release_reservation(self, context: Dict[str, Any], result: Any) -> Dict[str, Any]:
order_id = context['order_id']
if order_id in self.reservations:
reservations = self.reservations[order_id]
# Release reserved items back to inventory
for item in reservations:
self.inventory[item['item_id']] += item['quantity']
del self.reservations[order_id]
print(f"Released reservations for order {order_id}")
return {}
class ShippingService:
def __init__(self):
self.shipments = {}
def create_shipment(self, context: Dict[str, Any]) -> Dict[str, Any]:
order_id = context['order_id']
items = context.get('reserved_items', [])
shipment_id = f"shipment_{order_id}"
shipment = {
'shipment_id': shipment_id,
'order_id': order_id,
'items': items,
'status': 'created'
}
self.shipments[shipment_id] = shipment
print(f"Shipment {shipment_id} created")
return {'shipment_id': shipment_id}
def cancel_shipment(self, context: Dict[str, Any], result: Any) -> Dict[str, Any]:
shipment_id = result.get('shipment_id') if result else None
if shipment_id and shipment_id in self.shipments:
self.shipments[shipment_id]['status'] = 'cancelled'
print(f"Shipment {shipment_id} cancelled")
return {}
# Set up Order Processing Saga
def setup_order_saga():
order_service = OrderService()
payment_service = PaymentService()
inventory_service = InventoryService()
shipping_service = ShippingService()
orchestrator = SagaOrchestrator()
# Define order processing saga steps
steps = [
SagaStep(
step_id="create_order",
saga_id="", # Will be set when saga starts
action=order_service.create_order,
compensation=order_service.cancel_order
),
SagaStep(
step_id="reserve_inventory",
saga_id="",
action=inventory_service.reserve_items,
compensation=inventory_service.release_reservation
),
SagaStep(
step_id="process_payment",
saga_id="",
action=payment_service.process_payment,
compensation=payment_service.refund_payment
),
SagaStep(
step_id="create_shipment",
saga_id="",
action=shipping_service.create_shipment,
compensation=shipping_service.cancel_shipment
)
]
orchestrator.define_saga("order_processing", steps)
return orchestrator, {
'order_service': order_service,
'payment_service': payment_service,
'inventory_service': inventory_service,
'shipping_service': shipping_service
}
# Usage Example
if __name__ == "__main__":
orchestrator, services = setup_order_saga()
# Start an order saga
order_context = {
'order_id': 'order_123',
'user_id': 'user_456',
'items': [
{'item_id': 'item1', 'quantity': 2, 'price': 10.00},
{'item_id': 'item2', 'quantity': 1, 'price': 25.00}
]
}
saga_id = orchestrator.start_saga("order_processing", "saga_123", order_context)
# Wait for saga to complete
time.sleep(2)
# Check saga status
status = orchestrator.get_saga_status(saga_id)
print(f"Saga status: {json.dumps(status, indent=2)}")
Event Streaming and Processing
Real-time Event Processing
import asyncio
from typing import AsyncIterator, Dict, Any, List, Callable
from dataclasses import dataclass
import json
import time
@dataclass
class StreamEvent:
stream_id: str
event_id: str
event_type: str
data: Dict[str, Any]
timestamp: float
partition_key: str = None
def __post_init__(self):
if self.partition_key is None:
self.partition_key = str(hash(self.stream_id) % 10)
class EventStream:
def __init__(self, stream_id: str, partition_count: int = 10):
self.stream_id = stream_id
self.partition_count = partition_count
self.partitions: Dict[str, List[StreamEvent]] = {
str(i): [] for i in range(partition_count)
}
self.subscribers: List[Callable] = []
self.positions: Dict[str, int] = {} # consumer_id -> position
self._lock = asyncio.Lock()
async def append(self, event: StreamEvent):
"""Append event to stream"""
async with self._lock:
partition = event.partition_key
if partition not in self.partitions:
partition = str(int(partition) % self.partition_count)
self.partitions[partition].append(event)
# Notify subscribers
for subscriber in self.subscribers:
asyncio.create_task(subscriber(event))
async def read(self, consumer_id: str, from_position: int = 0) -> AsyncIterator[StreamEvent]:
"""Read events from stream"""
position = max(from_position, self.positions.get(consumer_id, 0))
while True:
events_to_yield = []
async with self._lock:
# Collect events from all partitions
all_events = []
for partition_events in self.partitions.values():
all_events.extend(partition_events)
# Sort by timestamp and get events after position
all_events.sort(key=lambda x: x.timestamp)
if position < len(all_events):
events_to_yield = all_events[position:]
self.positions[consumer_id] = len(all_events)
for event in events_to_yield:
yield event
# Wait before checking for new events
await asyncio.sleep(0.1)
def subscribe(self, callback: Callable[[StreamEvent], None]):
"""Subscribe to real-time events"""
self.subscribers.append(callback)
class EventProcessor:
def __init__(self, processor_id: str):
self.processor_id = processor_id
self.handlers: Dict[str, Callable] = {}
self.running = False
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler"""
self.handlers[event_type] = handler
async def process_stream(self, stream: EventStream, from_position: int = 0):
"""Process events from stream"""
self.running = True
async for event in stream.read(self.processor_id, from_position):
if not self.running:
break
if event.event_type in self.handlers:
try:
await self.handlers[event.event_type](event)
except Exception as e:
print(f"Error processing event {event.event_id}: {e}")
await asyncio.sleep(0.01) # Small delay to prevent overwhelming
def stop(self):
"""Stop processing"""
self.running = False
# Complex Event Processing (CEP)
class EventPattern:
def __init__(self, pattern_id: str):
self.pattern_id = pattern_id
self.conditions: List[Callable] = []
self.window_size: float = 60.0 # 1 minute window
self.action: Callable = None
def add_condition(self, condition: Callable[[StreamEvent], bool]):
"""Add condition to pattern"""
self.conditions.append(condition)
def set_action(self, action: Callable[[List[StreamEvent]], None]):
"""Set action to execute when pattern matches"""
self.action = action
def matches(self, events: List[StreamEvent]) -> bool:
"""Check if events match pattern"""
if not events:
return False
# Check time window
now = time.time()
window_events = [
event for event in events
if now - event.timestamp <= self.window_size
]
if not window_events:
return False
# Check all conditions
return all(
any(condition(event) for event in window_events)
for condition in self.conditions
)
class ComplexEventProcessor:
def __init__(self):
self.patterns: Dict[str, EventPattern] = {}
self.event_buffer: List[StreamEvent] = []
self.buffer_size = 1000
self._lock = asyncio.Lock()
def add_pattern(self, pattern: EventPattern):
"""Add event pattern"""
self.patterns[pattern.pattern_id] = pattern
async def process_event(self, event: StreamEvent):
"""Process incoming event against patterns"""
async with self._lock:
# Add to buffer
self.event_buffer.append(event)
# Maintain buffer size
if len(self.event_buffer) > self.buffer_size:
self.event_buffer = self.event_buffer[-self.buffer_size:]
# Check patterns
for pattern in self.patterns.values():
if pattern.matches(self.event_buffer):
if pattern.action:
# Get matching events for action
relevant_events = [
e for e in self.event_buffer
if time.time() - e.timestamp <= pattern.window_size
]
await pattern.action(relevant_events)
# Real-time Analytics Engine
class AnalyticsEngine:
def __init__(self):
self.metrics: Dict[str, Any] = {}
self.aggregations: Dict[str, Dict] = {}
self.time_windows = [60, 300, 3600] # 1min, 5min, 1hour
self._lock = asyncio.Lock()
async def process_event(self, event: StreamEvent):
"""Process event for analytics"""
async with self._lock:
now = time.time()
event_type = event.event_type
# Initialize metrics for event type
if event_type not in self.aggregations:
self.aggregations[event_type] = {
'count': 0,
'timestamps': [],
'values': []
}
agg = self.aggregations[event_type]
agg['count'] += 1
agg['timestamps'].append(event.timestamp)
# Extract numeric values for aggregation
if 'value' in event.data:
agg['values'].append(event.data['value'])
# Clean old data
for window in self.time_windows:
cutoff = now - window
agg['timestamps'] = [ts for ts in agg['timestamps'] if ts > cutoff]
if len(agg['values']) > len(agg['timestamps']):
agg['values'] = agg['values'][-len(agg['timestamps']):]
def get_metrics(self, event_type: str, window: int = 60) -> Dict[str, Any]:
"""Get aggregated metrics for event type"""
if event_type not in self.aggregations:
return {}
agg = self.aggregations[event_type]
now = time.time()
cutoff = now - window
# Filter data within window
recent_timestamps = [ts for ts in agg['timestamps'] if ts > cutoff]
recent_values = agg['values'][-len(recent_timestamps):]
metrics = {
'count': len(recent_timestamps),
'rate': len(recent_timestamps) / window if window > 0 else 0
}
if recent_values:
metrics.update({
'avg': sum(recent_values) / len(recent_values),
'min': min(recent_values),
'max': max(recent_values),
'sum': sum(recent_values)
})
return metrics
# Example: Real-time User Activity Monitoring
async def user_activity_monitoring_example():
# Set up event stream
activity_stream = EventStream("user_activity")
# Set up processors
analytics = AnalyticsEngine()
cep = ComplexEventProcessor()
# Create fraud detection pattern
fraud_pattern = EventPattern("fraud_detection")
fraud_pattern.add_condition(lambda e: e.event_type == "login_attempt")
fraud_pattern.add_condition(lambda e: e.event_type == "login_failed")
fraud_pattern.window_size = 300 # 5 minutes
async def fraud_action(events):
login_attempts = [e for e in events if e.event_type == "login_attempt"]
failed_attempts = [e for e in events if e.event_type == "login_failed"]
if len(failed_attempts) > 5: # More than 5 failed attempts
print(f"FRAUD ALERT: {len(failed_attempts)} failed login attempts detected")
fraud_pattern.set_action(fraud_action)
cep.add_pattern(fraud_pattern)
# Subscribe processors to stream
activity_stream.subscribe(analytics.process_event)
activity_stream.subscribe(cep.process_event)
# Simulate user activity events
user_events = [
StreamEvent("user_activity", "1", "login_attempt", {"user_id": "user123", "ip": "192.168.1.1"}, time.time()),
StreamEvent("user_activity", "2", "login_failed", {"user_id": "user123", "ip": "192.168.1.1"}, time.time()),
StreamEvent("user_activity", "3", "page_view", {"user_id": "user456", "page": "/dashboard", "value": 1}, time.time()),
StreamEvent("user_activity", "4", "login_failed", {"user_id": "user123", "ip": "192.168.1.1"}, time.time()),
]
# Send events to stream
for event in user_events:
await activity_stream.append(event)
await asyncio.sleep(0.1)
# Wait for processing
await asyncio.sleep(1)
# Get analytics
login_metrics = analytics.get_metrics("login_attempt", 300)
print(f"Login metrics: {login_metrics}")
# Run example
if __name__ == "__main__":
asyncio.run(user_activity_monitoring_example())
Conclusion
Event-driven architecture provides powerful patterns for building scalable, resilient systems:
- Event Sourcing enables complete audit trails and temporal queries
- CQRS optimizes read and write operations separately
- Sagas manage distributed transactions with compensating actions
- Event Streaming enables real-time processing and analytics
- Complex Event Processing detects patterns in event streams
Key benefits include:
- Loose Coupling: Components communicate through events
- Scalability: Easy to scale individual components
- Resilience: Natural error boundaries and recovery mechanisms
- Auditability: Complete history of state changes
- Flexibility: Easy to add new features without modifying existing code
When implementing EDA, consider:
- Event Design: Make events immutable and self-contained
- Ordering: Decide on global vs. per-partition ordering
- Error Handling: Implement proper retry and dead letter policies
- Schema Evolution: Plan for backward/forward compatibility
- Monitoring: Track event flow and processing latencies
Event-driven architecture is particularly well-suited for microservices, real-time systems, and applications requiring high scalability and resilience.