CQRS & Event Sourcing
Traditional CRUD এর সমস্যা কোথায়?
Traditional CRUD (Create, Read, Update, Delete) সহজ ও সুন্দর — কিন্তু complex domain-এ এটি চারটি মৌলিক সমস্যা তৈরি করে। একটা banking system বা e-commerce platform যখন scale করতে চায়, তখন এই সমস্যাগুলো তীব্র হয়ে ওঠে।
Read model ও Write model এর structure প্রায়ই আলাদা। Write: normalized tables। Read: denormalized, joined views। Same DB schema উভয়কে efficiently serve করতে পারে না।
Same row-এ Read ও Write একসাথে হলে database lock। High traffic-এ read query write-কে block করে, write query read-কে block করে। Performance bottleneck।
UPDATE করলেন পুরনো value চলে যায়। "Account balance কাল কত ছিল?" — CRUD system জানে না। History track করতে পারে না।
"৩ মাস আগের state দেখাও" — CRUD-এ অসম্ভব। Data মুছে যায়, overwrite হয়। Past state reconstruct করার উপায় নেই।
📌 Solution: CQRS + Event Sourcing
CQRS = Read ও Write কে আলাদা করুন। Command (write) আলাদা model, Query (read) আলাদা model। প্রতিটা তার নিজের কাজ optimally করে।
Event Sourcing = State নয়, Events store করুন। "Balance = 5000 টাকা" না রেখে রাখুন: "Deposited 10000, Withdrawn 5000"। Events replay করলেন যেকোনো সময়ের state পানয়া যায়।
CQRS — Command Query Responsibility Segregation
CQRS একটি architectural pattern যেখানে Command (data পরিবর্তন করে) এবং Query (data পড়ে) — এই দুটি দায়িত্ব সম্পূর্ণ আলাদা করা হয়। আলাদা model, আলাদা database, আলাদা service।
CQRS Architecture — Command Side + Query Side
| বৈশিষ্ট্য | Command (Write) | Query (Read) |
|---|---|---|
| কাজ | State পরিবর্তন করে | State পড়ে |
| Return | void / success-failure | DTO / data |
| উদাহরণ | PlaceOrder, CancelOrder | GetOrderById, ListOrders |
| Database | PostgreSQL (normalized) | Elasticsearch / Redis |
| Consistency | Strong (ACID) | Eventual (async sync) |
| Scale | Write-optimized | Read-optimized |
// ============================================================
// COMMAND SIDE — Write Model
// ============================================================
interface PlaceOrderCommand {
type: 'PLACE_ORDER';
customerId: string;
items: { productId: string; quantity: number; price: number }[];
shippingAddress: string;
}
class PlaceOrderCommandHandler {
constructor(
private readonly orderRepository: OrderWriteRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<void> {
// Validate command
if (command.items.length === 0) {
throw new Error('Order must have at least one item');
}
// Create aggregate and persist to Write DB (PostgreSQL)
const order = Order.create({
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
});
await this.orderRepository.save(order);
// Publish domain event to Kafka — Query side reads this
await this.eventBus.publish({
type: 'OrderPlaced',
orderId: order.id,
customerId: order.customerId,
totalAmount: order.totalAmount,
timestamp: new Date().toISOString(),
});
// Command handler returns void — no data back
}
}
// ============================================================
// QUERY SIDE — Read Model
// ============================================================
interface GetOrderQuery {
orderId: string;
}
interface OrderReadModel {
orderId: string;
customerName: string; // Denormalized — no JOIN needed
customerEmail: string; // Denormalized — no JOIN needed
items: { productName: string; quantity: number; price: number }[];
totalAmount: number;
status: string;
createdAt: string;
}
class GetOrderQueryHandler {
constructor(
private readonly orderReadRepository: OrderReadRepository // Elasticsearch
) {}
async handle(query: GetOrderQuery): Promise<OrderReadModel> {
// Read from optimized Read DB — super fast, no JOIN
const order = await this.orderReadRepository.findById(query.orderId);
if (!order) {
throw new Error(`Order ${query.orderId} not found`);
}
return order; // Returns full data object
}
}Event Sourcing — State নয়, Events Store করুন
Traditional CRUD-এ শুধু current state store হয়। Event Sourcing-এ সব events (যা ঘটেছে) store হয়। Current state = সব events replay করার ফলাফল।
❌ CRUD Approach — Current State Only
accounts table:
| id | balance |
|---|---|
| ACC-001 | ৳ 5,000 |
❌ Balance কীভাবে 5000 হলো? অজানা।
❌ ৩ দিন আগে balance কত ছিল? অজানা।
❌ কে টাকা তুলেছে? Audit trail নেই।
✅ Event Sourcing — Full History
✅ Balance = Replay → ৳ 5,000
Event Timeline — Bank Account
// ============================================================
// Domain Events
// ============================================================
interface AccountOpened {
type: 'AccountOpened';
accountId: string;
ownerId: string;
timestamp: string;
}
interface MoneyDeposited {
type: 'MoneyDeposited';
accountId: string;
amount: number;
timestamp: string;
}
interface MoneyWithdrawn {
type: 'MoneyWithdrawn';
accountId: string;
amount: number;
timestamp: string;
}
type BankAccountEvent = AccountOpened | MoneyDeposited | MoneyWithdrawn;
// ============================================================
// Aggregate — Events apply করে state manage করে
// ============================================================
class BankAccountAggregate {
private accountId: string = '';
private balance: number = 0;
private ownerId: string = '';
private isOpen: boolean = false;
private uncommittedEvents: BankAccountEvent[] = [];
// Reconstruct state from event history (replay)
static rehydrate(events: BankAccountEvent[]): BankAccountAggregate {
const account = new BankAccountAggregate();
for (const event of events) {
account.apply(event); // Apply each event in order
}
return account;
}
// Command: Open account
openAccount(accountId: string, ownerId: string): void {
if (this.isOpen) throw new Error('Account already open');
const event: AccountOpened = {
type: 'AccountOpened',
accountId, ownerId,
timestamp: new Date().toISOString(),
};
this.apply(event);
this.uncommittedEvents.push(event);
}
// Command: Deposit money
deposit(amount: number): void {
if (!this.isOpen) throw new Error('Account is closed');
if (amount <= 0) throw new Error('Amount must be positive');
const event: MoneyDeposited = {
type: 'MoneyDeposited',
accountId: this.accountId, amount,
timestamp: new Date().toISOString(),
};
this.apply(event);
this.uncommittedEvents.push(event);
}
// Command: Withdraw money
withdraw(amount: number): void {
if (!this.isOpen) throw new Error('Account is closed');
if (amount > this.balance) throw new Error('Insufficient funds');
const event: MoneyWithdrawn = {
type: 'MoneyWithdrawn',
accountId: this.accountId, amount,
timestamp: new Date().toISOString(),
};
this.apply(event);
this.uncommittedEvents.push(event);
}
// Internal: apply event to update in-memory state
private apply(event: BankAccountEvent): void {
switch (event.type) {
case 'AccountOpened':
this.accountId = event.accountId;
this.ownerId = event.ownerId;
this.isOpen = true;
break;
case 'MoneyDeposited':
this.balance += event.amount;
break;
case 'MoneyWithdrawn':
this.balance -= event.amount;
break;
}
}
getBalance(): number { return this.balance; }
getUncommittedEvents(): BankAccountEvent[] { return this.uncommittedEvents; }
clearUncommittedEvents(): void { this.uncommittedEvents = []; }
}Event Store Design
Event Store হলো একটি append-only log — events শুধু যোগ হয়, কখনো update বা delete হয় না। এটি traditional database থেকে মৌলিকভাবে আলাদা।
| বৈশিষ্ট্য | Event Store | Regular DB |
|---|---|---|
| Mutability | Immutable (append-only) | Mutable (UPDATE/DELETE) |
| Query Style | aggregateId দিয়ে events পড়ো | Any column দিয়ে query |
| History | Complete history preserved | Overwrite হয়ে যায় |
| Undo | Compensating event তৈরি করুন | Previous value নেই |
| Schema | Flexible (JSON payload) | Fixed columns |
| Time Travel | যেকোনো past state পানয়া যায় | অসম্ভব |
// ============================================================
// Event Store Schema (PostgreSQL — append-only table)
// ============================================================
/*
CREATE TABLE event_store (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL, -- e.g., "ACC-001"
aggregate_type VARCHAR(100) NOT NULL, -- e.g., "BankAccount"
event_type VARCHAR(100) NOT NULL, -- e.g., "MoneyDeposited"
event_data JSONB NOT NULL, -- Serialized event payload
version INT NOT NULL, -- Optimistic concurrency control
metadata JSONB, -- correlation_id, user_id, etc.
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Ensure version is sequential per aggregate
UNIQUE(aggregate_id, version)
);
-- Index for loading events of a specific aggregate
CREATE INDEX idx_event_store_aggregate ON event_store(aggregate_id, version ASC);
*/
// ============================================================
// Event Store Implementation
// ============================================================
interface StoredEvent {
id: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventData: unknown;
version: number;
metadata?: Record<string, unknown>;
timestamp: string;
}
class EventStore {
constructor(private readonly db: Database) {}
// Save new events — append-only, never update
async appendEvents(
aggregateId: string,
aggregateType: string,
events: BankAccountEvent[],
expectedVersion: number // Optimistic concurrency
): Promise<void> {
await this.db.transaction(async (tx) => {
// Check for concurrent modification
const currentVersion = await tx.query<{ max: number }>(
'SELECT MAX(version) as max FROM event_store WHERE aggregate_id = $1',
[aggregateId]
);
const latestVersion = currentVersion.rows[0]?.max ?? -1;
if (latestVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, got ${latestVersion}`
);
}
// Append each event
for (let i = 0; i < events.length; i++) {
await tx.query(
`INSERT INTO event_store
(aggregate_id, aggregate_type, event_type, event_data, version)
VALUES ($1, $2, $3, $4, $5)`,
[aggregateId, aggregateType, events[i].type,
JSON.stringify(events[i]), expectedVersion + i + 1]
);
}
});
}
// Load all events for an aggregate (to reconstruct state)
async loadEvents(aggregateId: string): Promise<StoredEvent[]> {
const result = await this.db.query<StoredEvent>(
`SELECT * FROM event_store
WHERE aggregate_id = $1
ORDER BY version ASC`,
[aggregateId]
);
return result.rows;
}
// Time travel: load events up to a specific point in time
async loadEventsUntil(aggregateId: string, until: Date): Promise<StoredEvent[]> {
const result = await this.db.query<StoredEvent>(
`SELECT * FROM event_store
WHERE aggregate_id = $1 AND timestamp <= $2
ORDER BY version ASC`,
[aggregateId, until.toISOString()]
);
return result.rows;
}
}🔒 Event Immutability — সবচেয়ে গুরুত্বপূর্ণ নিয়ম
Events কখনো DELETE বা UPDATE করা যাবেন না। একবার store হলে সেটা চিরকালের জন্য সেখানে থাকে। এই immutability-ই Event Sourcing-এর সমস্ত সুবিধার ভিত্তি।
ভুল হলে করতে হয়: Compensating Event — নতুন event যোগ করুন যা পুরনো event-এর effect উল্টে দেয়। যেমন ভুল withdrawal-এর জন্য WithdrawalReversed event।
Projections — Events থেকে Views তৈরি করুন
Event Store-এ raw events আছে — কিন্তু UI দ্রুত data দেখাতে হবে। Projection হলো সেই mechanism যা events শুনে optimized read models (views) তৈরি করে। একই events থেকে multiple projections তৈরি করা যায়।
একই Events → Multiple Projections
Event Store
OrderPlaced, OrderShipped, OrderDelivered, PaymentReceived...
Order Status View
orderId → current status, ETA
Customer Order History
customerId → all past orders
Inventory View
productId → sold qty, remaining stock
Revenue Dashboard
date → total revenue, order count
// ============================================================
// Projection: Order Status View
// Listens to events → builds denormalized read model
// ============================================================
interface OrderStatusReadModel {
orderId: string;
customerId: string;
customerName: string; // Denormalized
status: 'pending' | 'confirmed' | 'shipped' | 'delivered' | 'cancelled';
totalAmount: number;
itemCount: number;
shippingAddress: string;
placedAt: string;
shippedAt?: string;
deliveredAt?: string;
lastUpdated: string;
}
class OrderStatusProjection {
constructor(
private readonly readDb: ReadDatabase // Elasticsearch or PostgreSQL read replica
) {}
// Handles OrderPlaced event
async onOrderPlaced(event: {
orderId: string;
customerId: string;
customerName: string;
items: { productId: string; quantity: number; price: number }[];
shippingAddress: string;
timestamp: string;
}): Promise<void> {
const totalAmount = event.items.reduce(
(sum, item) => sum + item.price * item.quantity, 0
);
// Build and upsert the read model
const readModel: OrderStatusReadModel = {
orderId: event.orderId,
customerId: event.customerId,
customerName: event.customerName, // Already denormalized in event
status: 'pending',
totalAmount,
itemCount: event.items.length,
shippingAddress: event.shippingAddress,
placedAt: event.timestamp,
lastUpdated: event.timestamp,
};
await this.readDb.upsert('order_status_view', event.orderId, readModel);
}
// Handles OrderShipped event
async onOrderShipped(event: {
orderId: string;
trackingNumber: string;
timestamp: string;
}): Promise<void> {
await this.readDb.update('order_status_view', event.orderId, {
status: 'shipped',
shippedAt: event.timestamp,
lastUpdated: event.timestamp,
});
}
// Handles OrderDelivered event
async onOrderDelivered(event: {
orderId: string;
timestamp: string;
}): Promise<void> {
await this.readDb.update('order_status_view', event.orderId, {
status: 'delivered',
deliveredAt: event.timestamp,
lastUpdated: event.timestamp,
});
}
}
// ============================================================
// Projection: Inventory View — same events, different read model
// ============================================================
class InventoryProjection {
async onOrderPlaced(event: {
items: { productId: string; quantity: number }[];
}): Promise<void> {
// Decrement available stock for each item
for (const item of event.items) {
await this.readDb.increment(
'inventory_view',
item.productId,
{ reserved_qty: item.quantity }
);
}
}
async onOrderDelivered(event: {
items: { productId: string; quantity: number }[];
}): Promise<void> {
for (const item of event.items) {
await this.readDb.increment(
'inventory_view',
item.productId,
{ sold_qty: item.quantity, reserved_qty: -item.quantity }
);
}
}
}💡 Projection Rebuild করা যায়
Read model corrupt হয়ে গেলে বা নতুন view দরকার হলে — Event Store থেকে সব events শুরু থেকে replay করে projection rebuild করুন। এটাই Event Sourcing-এর অন্যতম সুবিধা: Read model কখনো হারিয়ে যায় না, events থেকে যেকোনো সময় নতুন করে তৈরি করা যায়।
CQRS + Event Sourcing Together
CQRS এবং Event Sourcing আলাদাভাবেও ব্যবহার করা যায়, কিন্তু একসাথে ব্যবহার করলেন এরা একে অপরকে complement করে। Event Sourcing CQRS-এর Write side-এর natural implementation, এবং events সরাসরি Query side-এ push করতে পারে।
Step 1 — User sends Command
User একটা action নেয় (PlaceOrder, TransferMoney)। Application Command object তৈরি করে Command Bus-এ পাঠায়। Command Bus সঠিক Command Handler-এ route করে।
Step 2 — Command Handler Validates
Command Handler business rules validate করে। Invalid হলে error return করে (void নয়, exception)। Valid হলে Event Store থেকে Aggregate load করে।
Step 3 — Aggregate Applies Event
Aggregate (domain object) command execute করে domain event তৈরি করে। Event apply করে নিজের in-memory state update করে। Uncommitted events list-এ রাখে।
Step 4 — Event Stored
Command Handler Aggregate থেকে uncommitted events নিয়ে Event Store-এ append করে। Optimistic concurrency check করে। Events immutably persist হয়।
Step 5 — Event Published to Bus
Events Kafka-তে publish হয়। Multiple consumers (Projections, Notification Service, Analytics) এই events subscribe করে নিজেদের কাজ করে।
Step 6 — Projections Update Read DB
Projection handlers events consume করে Read DB (Elasticsearch/Redis) update করে। Read models denormalized ও query-optimized। Eventual consistency — Write হওয়ার কিছুক্ষণ পর Read-এ reflect হয়।
Step 7 — Query Handler Reads
User data দেখতে চাইলে Query Handler সরাসরি Read DB থেকে পড়ে। No JOIN, no complex SQL। Pre-built view return করে milliseconds-এ। Write side-এর কোনো load নেই।
// ============================================================
// Complete CQRS + Event Sourcing Flow
// ============================================================
// 1. Command arrives
const command: PlaceOrderCommand = {
type: 'PLACE_ORDER',
customerId: 'CUST-001',
items: [
{ productId: 'PROD-A', quantity: 2, price: 500 },
{ productId: 'PROD-B', quantity: 1, price: 1200 },
],
shippingAddress: 'Dhaka, Bangladesh',
};
// 2. Command Handler orchestrates everything
class PlaceOrderCommandHandler {
constructor(
private eventStore: EventStore,
private eventBus: EventBus
) {}
async handle(cmd: PlaceOrderCommand): Promise<void> {
// Load Aggregate from Event Store (rehydrate from history)
const pastEvents = await this.eventStore.loadEvents(`ORDER-${cmd.customerId}`);
const order = OrderAggregate.rehydrate(pastEvents);
// Execute command on aggregate — raises domain events
order.placeOrder({
customerId: cmd.customerId,
items: cmd.items,
shippingAddress: cmd.shippingAddress,
});
// Persist new events to Event Store (append-only)
const newEvents = order.getUncommittedEvents();
await this.eventStore.appendEvents(
order.id,
'Order',
newEvents,
order.version - newEvents.length // optimistic concurrency
);
// Publish to Kafka — Projections will consume
for (const event of newEvents) {
await this.eventBus.publish('order-events', event);
}
// Return void — command has no return value
}
}
// 3. Projection consumes event → updates Read DB
class OrderProjectionConsumer {
async consume(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'OrderPlaced':
await this.readDb.upsert('orders_view', {
orderId: event.orderId,
customerId: event.customerId,
status: 'PENDING',
totalAmount: event.totalAmount,
placedAt: event.timestamp,
});
break;
case 'OrderShipped':
await this.readDb.update('orders_view', event.orderId, {
status: 'SHIPPED',
shippedAt: event.timestamp,
});
break;
}
}
}
// 4. Query Handler reads from optimized Read DB
class GetOrderQueryHandler {
async handle(query: { orderId: string }) {
// Instant read — no JOIN, pre-built view
return await this.readDb.findById('orders_view', query.orderId);
}
}Trade-offs ও কখন Use করবেন?
CQRS ও Event Sourcing powerful — কিন্তু সব জায়গায় use করলেন unnecessary complexity তৈরি হয়। কখন use করবেন, কখন করবেন না — এটা বোঝাটাই সত্যিকারের দক্ষতা।
| পরিস্থিতি | CQRS/ES Use করুন | CQRS/ES এড়াও |
|---|---|---|
| Read/Write Load | Read ও Write load অনেক আলাদা | Similar load, simple CRUD |
| Audit Requirement | Banking, Healthcare, Legal | No audit needed (e.g., temp data) |
| Domain Complexity | Complex business rules, DDD | Simple CRUD app, small team |
| Team Size | Large team, multiple services | Small team, tight deadline |
| Scalability | 10M+ users, high throughput | Early-stage startup, MVP |
| Time Travel | Past state query দরকার | Current state-ই যথেষ্ট |
⚠️ Eventual Consistency — সবচেয়ে বড় চ্যালেঞ্জ
CQRS-এ Write DB-তে data যানয়ার পর Kafka-র মাধ্যমে Read DB-তে sync হতে কিছু milliseconds বা seconds লাগে। এই সময়ে user নতুন data দেখবেন না।
Problem scenario: User order দিল (Write)। সাথে সাথে "My Orders" দেখতে গেল (Read) — নতুন order নাও দেখাতে পারে কারণ projection এখনো update হয়নি।
Solutions: (১) UI-তে optimistic update দেখাও, (২) Command-এর পরে short delay করুন, (৩) User-কে "Processing..." দেখাও sync হওয়া পর্যন্ত।
💡 Pragmatic Advice — Simple দিয়ে শুরু করুন
Rule of thumb: যদি নিশ্চিত না হও, Event Sourcing ছাড়া শুধু CQRS দিয়ে শুরু করুন। Command ও Query model আলাদা করুন, কিন্তু same database-এ। পরে যখন event history দরকার হবে, তখন Event Sourcing যোগ করুন।
Cost: Event Sourcing যোগ করলেন codebase complexity 2-3x বাড়ে। Debugging কঠিন হয়। Snapshot strategy দরকার হয় (replay করতে বেশি সময় লাগলে)। শুধু clear business value থাকলে use করুন।
Real World: কোথায় Use করে?
CQRS ও Event Sourcing real production systems-এ ব্যাপকভাবে ব্যবহৃত হয়। বিশেষত যেসব domain-এ audit trail critical, ইতিহাস হারানো যাবেন না।
Audit trail — কোন transaction কখন, কে করেছেনে সব track করতে হবে। Regulatory compliance।
→ Account transactions, Fund transfers, Loan processing
Order lifecycle — Placed, Confirmed, Packed, Shipped, Delivered, Returned। প্রতিটা state transition event।
→ Amazon, Daraz order management
Score history, achievement unlocks, inventory changes — সব replayable হওয়া দরকার। Anti-cheat audit।
→ Player stats, Match history, Leaderboards
Google Docs-এর প্রতিটা keystroke একটা event। Undo/Redo, version history, conflict resolution সব event-based।
→ Google Docs, Figma, Notion
| Company / System | Pattern | Why CQRS/ES? |
|---|---|---|
| Microsoft Azure | CQRS + ES | EventStoreDB তৈরি করেছেনে, internal services-এ ব্যবহার |
| Event Sourcing | Activity feed, notifications — Kafka-based event log | |
| Netflix | CQRS | Streaming data read/write separation, playback history |
| Airbnb | Event Sourcing | Booking state machine — 15+ states, full audit trail |
| Axon Framework | CQRS + ES | Java-র সবচেয়ে popular CQRS/ES framework |
🎯 Interview Tips — CQRS/ES প্রশ্নের জন্য
1) CQRS explain করার সময় বলুন: "Read ও Write load different — আলাদা করলেন প্রতিটা independently scale করা যায়।"
2) Event Sourcing-এর key insight: "Current state নয়, facts (events) store করুন। Facts কখনো মিথ্যা হয় না।"
3) Eventual consistency-র trade-off নিজে থেকে mention করুন — interviewer impressed হবে।
4) Projection rebuild capability mention করুন — "যেকোনো সময় নতুন read model তৈরি করা যায়।"
5) কখন use করবেন না সেটাও বলুন — simple CRUD app-এ overkill। Maturity দেখায়।
SUMMARY — আজকে যা শিখলাম
| Pattern | মূল ধারণা | সুবিধা | চ্যালেঞ্জ | ব্যবহার |
|---|---|---|---|---|
| CQRS | Read ও Write আলাদা model | Independent scaling, optimized queries | Eventual consistency, two codebases | High read/write disparity |
| Event Sourcing | State নয়, Events store করুন | Full audit trail, time travel, replay | Complex querying, event schema evolution | Banking, audit-heavy domains |
| CQRS + ES | উপরের দুটো একসাথে | Maximum flexibility, full history | High complexity, steep learning curve | Complex domains, large teams |
| Projection | Events → Read models | Multiple views, rebuild anytime | Async lag, projection failures | Dashboard, reporting views |
| Event Store | Append-only event log | Immutable history, time travel | Storage growth, snapshot needed | All Event Sourcing systems |