Databases (7): Distributed Transactions — 2PC, Saga, and Why Consensus Is Hard
How distributed databases coordinate transactions across machines — two-phase commit, Raft consensus, the Saga pattern, and practical patterns like outbox and CDC.
Everything we covered about transactions in Article 3 assumed a single database server: one machine, one transaction log, one lock manager. When your data spans multiple machines—through sharding, using microservices with separate databases, or replicating with strong consistency—you face the hardest problem in distributed systems: how do you get multiple machines to agree?
If the order insert succeeds but the inventory update fails (because of a network issue, constraint violation, or crash), you have a problem: an order exists for a product that was never reserved. Without coordination, this leads to inconsistency.
On a single database, wrapping both in a BEGIN ... COMMIT solves this. Across two databases, that is not possible — they have separate transaction logs, separate crash recovery, separate clocks.
The critical weakness of 2PC is that if the coordinator crashes after sending PREPARE but before sending COMMIT/ROLLBACK, the participants are stuck. They have voted “Yes” and hold locks, but they don’t know the final decision.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Coordinator Participant A Participant B
│ │ │
├──── PREPARE ────────────────►│ │
├──── PREPARE ───────────────────────────────────►│
│ │ │
│◄─── YES ────────────────────┤ │
│◄─── YES ─────────────────────────────────────────┤
│ │ │
╳ CRASH │ │
│ │
"I voted YES "I voted YES
but don't know but don't know
the decision. the decision.
Can't commit. Can't commit.
Can't rollback. Can't rollback.
Locks held..." Locks held..."
This is called the blocking problem. Participants must wait (potentially forever) until the coordinator recovers and reveals its decision. In practice, this means:
Locks held indefinitely, blocking other transactions
Despite its limitations, 2PC is still used in practice:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- PostgreSQL: prepared transactions (2PC participant)
-- The application/coordinator calls these:
-- Phase 1: Prepare
BEGIN;UPDATEaccountsSETbalance=balance-100WHEREid=1;PREPARETRANSACTION'transfer_001_debit';-- Phase 2: Commit (if all participants prepared successfully)
COMMITPREPARED'transfer_001_debit';-- Or rollback
ROLLBACKPREPARED'transfer_001_debit';-- Check for orphaned prepared transactions (stuck 2PC)
SELECTgid,prepared,owner,databaseFROMpg_prepared_xacts;
1
2
3
4
5
6
7
8
9
10
11
12
// Java XA transactions (JTA) — standard API for 2PCUserTransactionut=(UserTransaction)ctx.lookup("java:comp/UserTransaction");ut.begin();// Enlist two different databases in the same transactionConnectionconn1=ds1.getConnection();// Order databaseConnectionconn2=ds2.getConnection();// Inventory databaseconn1.prepareStatement("INSERT INTO orders ...").execute();conn2.prepareStatement("UPDATE inventory SET stock = stock - 1 ...").execute();ut.commit();// Transaction manager runs 2PC protocol
3PC adds a PRE-COMMIT phase between PREPARE and COMMIT, which allows participants to recover without the coordinator:
1
2
3
4
5
6
Phase 1: CAN-COMMIT? → participants check if they can commit
Phase 2: PRE-COMMIT → coordinator tells participants to prepare (but not commit yet)
Phase 3: DO-COMMIT → final commit
If coordinator crashes after PRE-COMMIT:
Participants can time out and commit (they know everyone was ready)
In theory, 3PC is non-blocking, but in practice, it is rarely used because:
Network partitions can still cause inconsistency (a participant might not receive PRE-COMMIT)
The additional round trip adds latency
Raft/Paxos consensus protocols solve the problem more robustly
Consensus is the problem of getting multiple nodes to agree on a value, even when some nodes fail. It is the foundation of strongly consistent distributed databases.
Paxos (invented by Leslie Lamport in 1989) was the first proven consensus algorithm. It uses three roles:
Proposers: Propose values
Acceptors: Vote on proposals
Learners: Learn the decided value
A simplified view of Single-Decree Paxos:
1
2
3
4
5
6
7
8
9
10
Phase 1: Prepare
Proposer → Acceptors: "Prepare(proposal_number=5)"
Acceptors → Proposer: "Promise: I won't accept proposals < 5"
+ "Last accepted: (proposal=3, value='X')" if any
Phase 2: Accept
Proposer → Acceptors: "Accept(proposal=5, value='Y')"
Acceptors → Proposer: "Accepted" (if no higher proposal seen)
When a majority of acceptors accept → value is decided
Paxos is correct but notoriously difficult to implement. As Lamport noted, it took years for the community to understand his paper. This difficulty led to Raft.
Raft (2014, by Diego Ongaro and John Ousterhout) was designed to be equivalent to Paxos but easier to understand. It decomposes consensus into three sub-problems:
Every node starts as a follower. If a follower does not hear from a leader within a random timeout (150-300 ms), it becomes a candidate and initiates an election.
1
2
3
4
5
6
7
8
9
10
11
12
Node States:
Follower → times out, no heartbeat → Candidate
Candidate → receives majority votes → Leader
Candidate → discovers higher-term leader → Follower
Leader → discovers higher-term leader → Follower
Election Process:
1. Candidate increments its term number
2. Votes for itself
3. Sends RequestVote RPCs to all other nodes
4. If majority responds with vote: becomes Leader
5. Starts sending periodic heartbeats to prevent new elections
1
2
3
4
5
6
7
8
Term 1: Node A is leader
Node A ──heartbeat──► Node B
Node A ──heartbeat──► Node C
Term 2: Node A crashes. Node B times out.
Node B: "I'm a candidate for term 2. Vote for me."
Node C: "OK, you have my vote for term 2."
Node B: Now leader. Starts sending heartbeats.
Once elected, the leader accepts client requests, appends them to its log, and then replicates the entries to followers:
1
2
3
4
5
6
7
8
Leader Log: [term1:SET x=1] [term1:SET y=2] [term2:SET x=3]
│ │ │
▼ ▼ ▼
Follower B: [term1:SET x=1] [term1:SET y=2] [term2:SET x=3] ✓ up to date
Follower C: [term1:SET x=1] [term1:SET y=2] ✗ catching up
A log entry is "committed" when replicated to a majority of nodes.
The leader then applies the entry to its state machine and responds to the client.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Client ─── "SET x=3" ──► Leader
│
1. Append to log
2. Send AppendEntries RPC to followers
│
┌──────┼──────┐
▼ ▼ ▼
Foll.B Foll.C Foll.D
│ │ │
└──────┼──────┘
3. Wait for majority acknowledgment
│
4. Commit entry
5. Apply to state machine
6. Respond to client: "OK"
When 2PC is too expensive or impractical (often the case in microservices), the Saga pattern provides an alternative. Instead of one large distributed transaction, it breaks the process into a sequence of local transactions, each with a compensating transaction that undoes its work if a later step fails.
Distributed transactions require agreement on ordering. But distributed systems have no shared clock — each node has its own, and they drift. This section explains how production systems solve the ordering problem.
Node A's clock: 14:00:00.000
Node B's clock: 14:00:00.003 (3ms ahead)
Node C's clock: 13:59:59.997 (3ms behind)
Transaction T1 commits on Node A at "14:00:00.001"
Transaction T2 commits on Node B at "14:00:00.002"
Did T1 happen before T2? We cannot know from timestamps alone.
If A's clock is slow, T2 might have actually committed first.
Physical clocks are insufficient for ordering. Three approaches solve this:
Lamport clock: single counter, incremented on every event
- If event A causes event B → L(A) < L(B)
- BUT: L(A) < L(B) does NOT mean A caused B (concurrent events get arbitrary order)
Vector clock: one counter per node, tracks causal history
Node A: [3, 1, 2] → "I've seen 3 of my events, 1 from B, 2 from C"
Node B: [2, 4, 2] → "I've seen 2 from A, 4 of my events, 2 from C"
Comparing: if all elements of VA ≤ VB → A causally precedes B
if some VA[i] > VB[i] and some VA[j] < VB[j] → concurrent
Vector clocks enable causal consistency but don’t provide a total order. For serializable distributed transactions, you need something stronger.
CockroachDB and YugabyteDB use HLCs — a combination of physical time and a logical counter:
1
2
3
4
5
6
7
8
9
10
HLC = (physical_time, logical_counter)
Rules:
1. On local event: hlc.physical = max(hlc.physical, wall_clock); hlc.logical = 0
2. On send: include hlc in message
3. On receive: hlc.physical = max(local.physical, msg.physical, wall_clock)
if physical times are equal: hlc.logical = max(local.logical, msg.logical) + 1
Result: HLC ≈ wall clock time, but with causal ordering guarantees
Bound: HLC is always within max_clock_offset of real time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// CockroachDB's uncertainty interval
// When reading, a transaction must consider values written by other nodes
// within the clock uncertainty window
typeReadTimestampstruct{ReadTShlc.Timestamp// "I started reading at this time"
MaxOffsettime.Duration// "Clocks might be off by this much"
// Uncertainty interval: [ReadTS, ReadTS + MaxOffset]
// Values in this interval might have been written before us
}// If a value is found with timestamp in the uncertainty interval:
// Option 1: Push read timestamp forward (restart transaction at higher ts)
// Option 2: If the writing transaction is still in-flight, wait for it
Spanner solves the clock problem with hardware: GPS receivers and atomic clocks in every data center, providing a bounded-uncertainty time API.
1
2
3
4
5
6
7
TrueTime API:
TT.now() → returns an interval [earliest, latest]
TT.after(t) → true if t is definitely in the past
TT.before(t) → true if t is definitely in the future
Typical uncertainty: ε ≈ 1-7ms (average ~4ms)
GPS + atomic clock synchronization keeps drift minimal
Spanner’s commit protocol uses TrueTime to assign globally meaningful timestamps:
1
2
3
4
5
6
7
8
9
10
Commit-wait protocol:
1. Transaction T acquires all locks, performs writes
2. Coordinator picks commit timestamp s = TT.now().latest
3. Coordinator WAITS until TT.after(s) is true
(waits at most 2ε ≈ 7ms for uncertainty to pass)
4. Release locks, respond to client
Guarantee: if T1 commits before T2 starts (real time),
then T1's timestamp < T2's timestamp
→ externally consistent (linearizable)
1
2
3
4
5
6
7
8
9
10
Why commit-wait works:
T1 commits at s1 = TT.now().latest at time t_commit
→ real commit time ≤ s1 (because s1 is the latest possible time)
→ waits until TT.after(s1): real time is now definitely > s1
T2 starts at time t_start > t_commit (real time)
→ T2 picks s2 = TT.now().latest at some point ≥ t_start
→ s2 ≥ real_time at t_start > s1 (because we waited)
→ s1 < s2 guaranteed!
-- CockroachDB: check clock offset health
SHOWCLUSTERSETTINGserver.clock.max_offset;-- default 500ms
-- If clocks drift beyond max_offset, nodes self-terminate to protect correctness
-- Monitor NTP offset on all nodes:
-- $ chronyc tracking | grep "Last offset"
-- Spanner: no clock concerns for the user, but you pay for it
-- Read-only transactions can avoid commit-wait by reading at a snapshot:
SELECT*FROMordersWHEREcreated_at>TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL1HOUR)-- Spanner picks a safe read timestamp automatically
The pseudocode Saga shown earlier captures the concept, but production sagas need: persistent state, idempotent steps, retries with backoff, timeout handling, and observability. Here is a production-grade pattern.
asyncdefsweep_stuck_sagas(db_pool,orchestrator,interval_seconds=60):"""Background task that recovers stuck sagas."""whileTrue:asyncwithdb_pool.acquire()asconn:stuck=awaitconn.fetch("""
SELECT saga_id, state, current_step, saga_type
FROM sagas
WHERE state IN ('RUNNING', 'COMPENSATING')
AND updated_at < NOW() - INTERVAL '2 minutes'
FOR UPDATE SKIP LOCKED
LIMIT 10
""")forsagainstuck:ifsaga["state"]=="RUNNING":awaitorchestrator._run_forward(saga["saga_id"])elifsaga["state"]=="COMPENSATING":awaitorchestrator._compensate(saga["saga_id"],saga["current_step"])# Also handle deadline-exceeded sagasexpired=awaitconn.fetch("""
UPDATE sagas SET state = 'COMPENSATING'
WHERE state = 'RUNNING' AND deadline_at < NOW()
RETURNING saga_id, current_step
""")forsagainexpired:awaitorchestrator._compensate(saga["saga_id"],saga["current_step"])awaitasyncio.sleep(interval_seconds)
Every saga step must be idempotent — safe to retry without side effects:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
asyncdefcharge_payment(context:dict,idempotency_key:str)->dict:"""Saga step: charge the customer. Idempotent via idempotency_key."""response=awaitpayment_api.create_charge(amount=context["total"],customer_id=context["user_id"],idempotency_key=str(idempotency_key),# payment provider deduplicates)return{"payment_id":response["id"],"charge_status":response["status"]}asyncdefrefund_payment(context:dict,idempotency_key:str):"""Compensation: refund the charge. Also idempotent."""if"payment_id"notincontext:return# charge never happenedawaitpayment_api.refund(payment_id=context["payment_id"],idempotency_key=f"refund-{idempotency_key}",)
These two terms are frequently confused but describe different things:
Serializability (from transactions): The result of executing concurrent transactions is equivalent to some serial execution of those transactions. This is about transactions and databases.
Linearizability (from distributed systems): Every operation appears to take effect instantaneously at some point between its invocation and completion. Once a write is acknowledged, all subsequent reads see it. This is about individual operations and real-time ordering.
1
2
3
4
5
6
7
8
9
10
11
Linearizable system (register with value initially 0):
Client A: write(1) ─────────────────► OK
│ (from this point, all reads must return 1)
Client B: read() ──────────► 1 ✓
Client C: read() ──► 1 ✓
Non-linearizable:
Client A: write(1) ─────────────────► OK
Client B: read() ──────────► 0 ✗ (stale!)
Client C: read() ──► 1 ✓
Property
Serializability
Linearizability
Scope
Multi-operation transactions
Single operations
Ordering
Some serial order (any order is fine)
Real-time order
Where it matters
Databases
Distributed key-value stores, locks
Example systems
Any SERIALIZABLE database
ZooKeeper, etcd, Spanner
Strict serializability = serializability + linearizability. This is the strongest guarantee and what Google Spanner provides.
At the opposite end of the spectrum from linearizability is eventual consistency: if no new writes are made, all replicas will eventually converge to the same value.
“Eventually” is vague — it could be milliseconds or minutes. In practice:
How do you atomically update a database AND publish a message to a message broker? You cannot use a distributed transaction between your database and Kafka.
The outbox pattern: write the message to an “outbox” table in the same database transaction. A separate process reads the outbox and publishes to the message broker.
-- Single database transaction (atomic!)
BEGIN;-- Business logic
INSERTINTOorders(user_id,total,status)VALUES(1,99.99,'created');-- Outbox entry (same transaction, same database)
INSERTINTOoutbox(aggregate_type,aggregate_id,event_type,payload,created_at)VALUES('Order',currval('orders_order_id_seq'),'OrderCreated','{"user_id": 1, "total": 99.99}',NOW());COMMIT;
A separate publisher process (or Debezium with CDC) reads the outbox table and publishes events to Kafka:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Outbox publisher (runs continuously)whileTrue:rows=db.query("""
SELECT id, event_type, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
""")forrowinrows:kafka.produce(topic=f"events.{row.event_type}",value=row.payload)db.execute("UPDATE outbox SET published_at = NOW() WHERE id = %s",row.id)
The best distributed transaction is the one you do not need. Strategies:
Keep data that transacts together on the same node: Design partitioning keys so related data co-locates.
Accept eventual consistency: Many business processes are naturally asynchronous (email, notifications, analytics).
Use idempotent operations: Design operations so retrying is safe.
1
2
3
4
5
-- Idempotent insert (PostgreSQL)
INSERTINTOprocessed_events(event_id,processed_at)VALUES('evt-123',NOW())ONCONFLICT(event_id)DONOTHING;-- Safe to retry — duplicate inserts are silently ignored
Design for compensation: Instead of preventing inconsistency, detect and fix it. This is what banks actually do — reconciliation processes run nightly.
Use a single database: If your microservices share the same database (heresy, but practical), use regular transactions.
We have covered the theory: how data is stored, queried, replicated, partitioned, and transacted. But theory is not enough. In the final article, we will get practical: databases in production — migrations, monitoring, connection pooling, backups, capacity planning, and real war stories from production incidents.