CDN Cache, Invalidation and Performance: Enterprise Architecture

Enterprise-scale cache architecture patterns, multi-CDN strategies, distributed invalidation systems, and capacity planning for teams operating at global scale.

CDN Cache, Invalidation and Performance: Enterprise Architecture

Enterprise cache architecture requires coordinating multiple CDN providers, invalidating millions of objects atomically, and maintaining performance under extreme load. This guide covers architecture patterns, scale considerations, and decision frameworks for teams operating globally.

Last updated: 2026-06-18

Global Scale Cache Architectures

Multi-CDN Strategies

Single CDN creates single point of failure. Multi-CDN architectures distribute risk and optimize for cost, performance, and resilience.

Active-Active Configuration

Both CDNs serve traffic simultaneously. Load balancer distributes based on performance, cost, or weighted ratios.

┌──────────────────────────────────────────────────────────────────┐
│ DNS Load Balancer │
│ (Route 53, Cloudflare, NS1) │
└─────────────────────┬────────────────────┬───────────────────────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ CDN A │ │ CDN B │
│ 60% traffic │ │ 40% traffic │
└───────┬───────┘ └───────┬───────┘
│ │
└────────┬───────────┘
┌───────────────┐
│ Origin │
└───────────────┘

Weighted distribution algorithm:

def select_cdn(request, weights):
"""
weights = {'cdn_a': 0.6, 'cdn_b': 0.4}
"""
cdn = weighted_choice(weights)
# Override if one CDN has issues
if health_check(cdn) == 'degraded':
cdn = fallback_cdn(cdn, weights)
return cdn

Failover Configuration

Primary CDN serves all traffic. Secondary activates on primary failure.

┌─────────────────────────────────────────────────────────────────┐
│ Health Monitor │
│ (checks /health every 10s) │
└─────────────────────────┬───────────────────────────────────────┘
┌───────────────────────┐
│ Primary CDN │ ← Normal: 100% traffic
│ Status: Healthy │
└───────────┬───────────┘
│ On failure
┌───────────────────────┐
│ Secondary CDN │ ← Failover: 100% traffic
│ Status: Standby │
└───────────────────────┘

Failover decision matrix:

MetricThresholdActionRecovery
Error rate> 5%Route to secondary< 1% for 5 min
P95 latency> 2sRoute to secondary< 500ms for 5 min
Availability< 99%Route to secondary> 99.9% for 10 min
RUM complaintsSpike > 3xInvestigate + potential failoverBaseline

Performance-Based Routing

Route to CDN with lowest latency for user’s location.

def select_best_cdn(user_ip, cdn_providers):
"""
Latency-aware CDN selection.
"""
latencies = {}
for cdn in cdn_providers:
# Use real user measurements or synthetic probes
latencies[cdn] = get_p95_latency(user_ip, cdn)
# Select CDN with lowest latency
best_cdn = min(latencies, key=latencies.get)
# Apply business rules (cost caps, contract minimums)
if get_monthly_spend(best_cdn) > cost_cap:
best_cdn = get_next_best(latencies, exclude=best_cdn)
return best_cdn

Cache Hierarchy

Enterprise CDNs implement hierarchical caching: origin → regional edge → local edge.

┌─────────────────────────────────────────────────────────────────────┐
│ Origin │
│ (Single source of truth) │
└───────────────────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Regional Edge Cache │
│ (US-East, EU-West, APAC, LATAM) │
│ TTL: Base TTL × 1.5 (longer-lived) │
└───────┬─────────────────┬─────────────────┬─────────────────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Local Edge │ │ Local Edge │ │ Local Edge │ │ Local Edge │
│ New York │ │ London │ │ São Paulo │ │ Tokyo │
│ TTL: Base │ │ TTL: Base │ │ TTL: Base │ │ TTL: Base │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
[Users] [Users] [Users] [Users]

Hierarchy TTL configuration:

LevelPurposeTTL MultiplierStorage
OriginSourceN/AFull dataset
RegionalReduce cross-region requests1.5x base TTLHot + warm
LocalServe usersBase TTLHot only

Cache lookup flow:

def hierarchical_lookup(request, cache_hierarchy):
"""
Lookup through cache hierarchy.
"""
for level in ['local', 'regional', 'origin']:
cache = cache_hierarchy[level]
if cache.has(request.key):
response = cache.get(request.key)
if not response.is_stale():
return response
# Stale-while-revalidate
if level != 'origin':
async_revalidate(request, cache_hierarchy)
return response
# Full miss - fetch from origin
return fetch_from_origin(request)

Geographic Load Balancing with Cache Awareness

Traditional geo-load balancing routes to nearest region. Cache-aware routing considers both proximity and cache state.

def cache_aware_routing(user_location, cdn_regions):
"""
Route based on location + cache hit probability.
"""
candidates = []
for region in cdn_regions:
proximity_score = calculate_proximity(user_location, region)
cache_score = estimate_cache_hit_rate(region)
# Weighted combination
combined_score = (proximity_score * 0.4) + (cache_score * 0.6)
candidates.append((region, combined_score))
return max(candidates, key=lambda x: x[1])[0]
def estimate_cache_hit_rate(region):
"""
Predict cache hit rate based on recent metrics.
"""
metrics = get_recent_metrics(region, window='5min')
# Factor in: object popularity, TTL, recent purges
popularity = metrics.get('object_popularity', 0.5)
ttl_factor = metrics.get('avg_ttl_remaining', 0.5)
purge_impact = 1 - metrics.get('recent_purge_ratio', 0)
return (popularity + ttl_factor + purge_impact) / 3

Cache Synchronization Across CDN Providers

Multi-CDN requires cache synchronization to prevent stale content in one provider while another is updated.

Invalidation Broadcast Pattern

┌─────────────────────────────────────────────────────────────────┐
│ Invalidation Event │
│ (content updated) │
└─────────────────────────┬───────────────────────────────────────┘
┌───────────────────────┐
│ Event Bus │
│ (Kafka/SNS) │
└───────┬───────┬───────┘
│ │
┌───────────┘ └───────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ CDN A Worker │ │ CDN B Worker │
│ Purge API │ │ Purge API │
└───────────────┘ └───────────────┘

Implementation:

import json
from kafka import KafkaConsumer, KafkaProducer
class CacheInvalidator:
def __init__(self, cdn_clients):
self.cdn_clients = cdn_clients
self.consumer = KafkaConsumer(
'cache-invalidation',
bootstrap_servers=['kafka:9092'],
group_id='cdn-invalidator'
)
def process_invalidations(self):
for message in self.consumer:
event = json.loads(message.value)
# Parallel purge across all CDNs
results = parallel_purge(
self.cdn_clients,
event['keys'],
event.get('soft_purge', False)
)
# Log any failures for retry
failed = [r for r in results if not r.success]
if failed:
self.retry_queue.send(
'invalidation-retry',
json.dumps(failed)
)

Invalidation SLAs

PriorityTarget SLAUse CaseImplementation
Critical< 1sSecurity patches, legal removalSynchronous purge, verify
High< 10sPrice changes, product updatesAsync with confirmation
Normal< 60sContent updatesAsync, best effort
Low< 300sNon-critical updatesBatch processing

Advanced Invalidation Patterns

Event-Sourced Invalidation

Traditional invalidation: application calls purge API directly. Event-sourced: all invalidations flow through event bus, enabling audit trails, replay, and guaranteed delivery.

Kafka Pattern

# Kafka topic configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: cache-invalidation
spec:
partitions: 12
replicas: 3
config:
retention.ms: 86400000 # 24 hours
cleanup.policy: compact # Keep latest per key

Producer (application):

from kafka import KafkaProducer
import json
class CacheInvalidationProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode()
)
def invalidate_content(self, keys, priority='normal'):
"""
Emit invalidation event.
"""
event = {
'keys': keys,
'priority': priority,
'timestamp': time.time(),
'source': 'cms',
'correlation_id': generate_uuid()
}
# Partition by first key for ordering
partition = hash(keys[0]) % 12
future = self.producer.send(
'cache-invalidation',
value=event,
partition=partition,
headers={
'priority': priority.encode(),
'source': 'cms'
}
)
return future

Consumer (CDN worker):

from kafka import KafkaConsumer
import asyncio
class CacheInvalidationWorker:
def __init__(self, cdn_client):
self.cdn_client = cdn_client
self.consumer = KafkaConsumer(
'cache-invalidation',
bootstrap_servers=['kafka:9092'],
group_id='cdn-purger',
enable_auto_commit=False
)
async def process_batch(self):
"""
Batch process invalidations for efficiency.
"""
batch = []
for message in self.consumer:
batch.append(json.loads(message.value))
if len(batch) >= 100: # Batch size
break
# Dedupe keys across batch
all_keys = set()
for event in batch:
all_keys.update(event['keys'])
# Single purge call for batch
result = await self.cdn_client.purge_bulk(list(all_keys))
if result.success:
self.consumer.commit()
else:
# Retry failed keys
await self.retry_failed(result.failed_keys)

RabbitMQ Pattern

import pika
import json
class CacheInvalidationRabbitMQ:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
self.channel = self.connection.channel()
# Declare exchange with dead letter
self.channel.exchange_declare(
exchange='cache-invalidation',
exchange_type='direct',
durable=True
)
# Priority queue
args = {
'x-max-priority': 3,
'x-dead-letter-exchange': 'cache-invalidation-dlx'
}
self.channel.queue_declare(
queue='cdn-purge',
durable=True,
arguments=args
)
def publish(self, keys, priority=1):
"""
priority: 1=normal, 2=high, 3=critical
"""
self.channel.basic_publish(
exchange='cache-invalidation',
routing_key='cdn-purge',
body=json.dumps({'keys': keys}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
priority=priority
)
)

Invalidation at Scale (Millions of Objects)

Purging millions of objects requires batching, rate limiting, and async processing.

Surrogate Key Strategy

class SurrogateKeyManager:
"""
Organize content by surrogate keys for efficient bulk invalidation.
"""
def __init__(self):
self.key_registry = {} # key -> [surrogate_keys]
def assign_surrogate_keys(self, content_id, keys):
"""
Assign surrogate keys to content.
Keys are hierarchical: site:category:subcategory
"""
self.key_registry[content_id] = keys
def get_keys_to_purge(self, surrogate_key):
"""
Get all content IDs matching surrogate key.
"""
return [
cid for cid, keys in self.key_registry.items()
if surrogate_key in keys
]

Surrogate key patterns:

PatternExampleInvalidation Scope
Site-levelsite:acmeEntire site
Sectionsite:acme:blogBlog section
Content typesite:acme:productAll products
Entityproduct:12345Single product
Dependencyproduct:12345:reviewsProduct reviews

Bulk Purge Implementation

import asyncio
from dataclasses import dataclass
@dataclass
class PurgeResult:
total: int
success: int
failed: int
rate_limited: int
async def bulk_purge(cdn_client, keys, batch_size=1000, rate_limit=100):
"""
Purge millions of keys with rate limiting.
"""
results = PurgeResult(total=len(keys), success=0, failed=0, rate_limited=0)
# Process in batches
for i in range(0, len(keys), batch_size):
batch = keys[i:i + batch_size]
try:
result = await cdn_client.purge(batch)
results.success += result.purged
results.failed += result.failed
except RateLimitError:
results.rate_limited += len(batch)
await asyncio.sleep(1) # Backoff
# Requeue batch
await asyncio.sleep(1 / rate_limit) # Rate limit
return results

Conflict Resolution for Distributed Cache

Multiple edges may have different versions of cached content during invalidation propagation.

Version Vector Pattern

import json
from typing import Dict, List
class VersionedCache:
"""
Track content versions across distributed cache.
"""
def __init__(self):
self.versions: Dict[str, Dict[str, int]] = {} # key -> {edge: version}
def update(self, key: str, edge: str, version: int):
if key not in self.versions:
self.versions[key] = {}
self.versions[key][edge] = version
def get_latest_version(self, key: str) -> int:
if key not in self.versions:
return 0
return max(self.versions[key].values())
def is_stale(self, key: str, edge: str, version: int) -> bool:
latest = self.get_latest_version(key)
return version < latest
def resolve_conflict(self, key: str, versions: List[tuple]) -> tuple:
"""
versions: [(edge, version, content), ...]
Returns: (winning_edge, winning_content)
"""
# Highest version wins
# If tie, use edge priority
edge_priority = {'edge-primary': 1, 'edge-secondary': 2}
return max(
versions,
key=lambda x: (x[1], -edge_priority.get(x[0], 999))
)

Cost Optimization for Purges

CDN providers charge for purges. Optimize purge strategy to minimize costs.

Purge Cost Model

Purge Cost = (Number of objects × Cost per object) + API calls
Example (typical pricing):
- First 1,000 purges/month: Free
- Next 99,000: $0.005 per purge
- 100,000+: $0.01 per purge
Cost for 1M purges:
- Free tier: 1,000
- Tier 2: 99,000 × $0.005 = $495
- Tier 3: 900,000 × $0.01 = $9,000
- Total: $9,495

Cost Reduction Strategies

StrategySavingsTrade-off
Use surrogate keys10-100x fewer API callsRequires planning
Batch invalidations50-80% fewer callsDelayed propagation
TTL-based expiration100% purge savingsStale until TTL
Soft purge30-50% fewer revalidationsServes stale briefly
Invalidation deduplication20-40% fewer callsProcessing overhead

Batching implementation:

from collections import defaultdict
import time
class PurgeBatcher:
"""
Batch purges over time window to reduce API calls.
"""
def __init__(self, window_seconds=5, max_batch_size=1000):
self.window = window_seconds
self.max_batch = max_batch_size
self.pending = defaultdict(list)
self.timers = {}
async def queue_purge(self, key: str, priority: str = 'normal'):
self.pending[priority].append(key)
# Start timer if not running
if priority not in self.timers:
self.timers[priority] = asyncio.create_task(
self._process_after_window(priority)
)
# Immediate flush if batch full
if len(self.pending[priority]) >= self.max_batch:
await self._flush(priority)
async def _process_after_window(self, priority: str):
await asyncio.sleep(self.window)
await self._flush(priority)
async def _flush(self, priority: str):
if self.pending[priority]:
keys = self.pending.pop(priority)
await self._execute_purge(keys, priority)
if priority in self.timers:
self.timers[priority].cancel()
del self.timers[priority]

Enterprise Scale Performance

Capacity Planning Formulas

Cache Storage Calculation

Storage Required = Σ(Object Size × Popularity Factor) × Replication Factor
Where:
- Object Size: Average size per object
- Popularity Factor: 0.0-1.0 based on access frequency
- Replication Factor: Number of edge locations storing the object
Example:
- 10M objects
- Average size: 50KB
- Hot objects (20%): PF = 1.0, replicated to 50 edges
- Warm objects (30%): PF = 0.5, replicated to 10 edges
- Cold objects (50%): PF = 0.1, replicated to 3 edges
Storage = (2M × 50KB × 50) + (3M × 50KB × 10 × 0.5) + (5M × 50KB × 3 × 0.1)
= 5,000GB + 750GB + 75GB
= 5,825GB ≈ 5.8TB

Origin Load Calculation

Origin Requests/sec = (Total Requests × (1 - Hit Ratio)) / TTL
Example:
- 100,000 requests/sec peak
- 90% hit ratio
- 300 second average TTL
Miss rate = 100,000 × 0.10 = 10,000 req/sec to origin

Bandwidth Savings

Savings = (Total Traffic × Hit Ratio × Edge Cost Premium) - Purge Costs
Where:
- Edge Cost Premium: Cost difference between origin and edge bandwidth
Example:
- 10TB/day traffic
- 90% hit ratio
- Origin: $0.08/GB, Edge: $0.02/GB
- Purge costs: $500/month
Savings = (10TB × 30 × 0.90 × ($0.08 - $0.02)) - $500
= $1,620 - $500
= $1,120/month

Cache Partitioning Strategies

Partition cache by access patterns: hot, warm, cold.

┌─────────────────────────────────────────────────────────────────┐
│ Total Cache │
│ (100TB Example) │
├─────────────────┬─────────────────┬─────────────────────────────┤
│ HOT │ WARM │ COLD │
│ (20TB) │ (30TB) │ (50TB) │
│ │ │ │
│ • 5% of objects │ • 15% objects │ • 80% of objects │
│ • 80% of hits │ • 15% of hits │ • 5% of hits │
│ • TTL: 1 day │ • TTL: 1 hour │ • TTL: 5 min │
│ • Memory/SSD │ • SSD │ • HDD/Object store │
│ • All edges │ • Regional │ • On-demand load │
└─────────────────┴─────────────────┴─────────────────────────────┘

Partitioning logic:

from collections import defaultdict
import time
class CachePartitioner:
def __init__(self, thresholds):
self.thresholds = thresholds # {'hot': 1000, 'warm': 100}
self.access_counts = defaultdict(int)
self.last_access = {}
def classify(self, key: str) -> str:
"""
Classify object based on access patterns.
"""
accesses = self.access_counts.get(key, 0)
if accesses >= self.thresholds['hot']:
return 'hot'
elif accesses >= self.thresholds['warm']:
return 'warm'
else:
return 'cold'
def record_access(self, key: str):
self.access_counts[key] += 1
self.last_access[key] = time.time()
def promote(self, key: str, from_tier: str, to_tier: str):
"""
Move object to higher tier.
"""
# Pre-fetch to all edges if promoting to hot
if to_tier == 'hot':
prefetch_to_edges(key)
def demote(self, key: str, from_tier: str, to_tier: str):
"""
Move object to lower tier.
"""
# Evict from edge caches if demoting from hot
if from_tier == 'hot':
evict_from_edges(key)

Handling Traffic Spikes

Black Friday, product launches, and viral events create 10-100x traffic spikes.

Pre-warming Strategy

class CacheWarmer:
def __init__(self, cdn_client, origin_client):
self.cdn = cdn_client
self.origin = origin_client
async def warm_cache(self, urls: list, priority: str = 'normal'):
"""
Pre-populate cache before expected spike.
"""
results = []
for url in urls:
try:
# Fetch from origin (not from cache)
content = await self.origin.fetch(url)
# Push to CDN cache
await self.cdn.store(url, content)
results.append({'url': url, 'status': 'warmed'})
except Exception as e:
results.append({'url': url, 'status': 'failed', 'error': str(e)})
return results
async def warm_by_pattern(self, patterns: list):
"""
Warm all URLs matching patterns.
"""
for pattern in patterns:
urls = await self.get_urls_matching(pattern)
await self.warm_cache(urls)

Spike Detection and Auto-scaling

class TrafficSpikeHandler:
def __init__(self, thresholds):
self.thresholds = thresholds
self.baseline = None
def detect_spike(self, current_rps: float) -> dict:
if not self.baseline:
self.baseline = current_rps
return {'spike': False}
multiplier = current_rps / self.baseline
if multiplier > self.thresholds['spike']:
return {
'spike': True,
'severity': 'high' if multiplier > 10 else 'medium',
'multiplier': multiplier
}
return {'spike': False}
async def handle_spike(self, severity: str):
"""
Automatic actions during spike.
"""
if severity == 'high':
# Extend TTLs
await self.extend_ttls(multiplier=2)
# Enable stale-while-revalidate for all
await self.enable_stale_handlers()
# Scale origin capacity
await self.scale_origin(replicas=3)
elif severity == 'medium':
# Moderate TTL extension
await self.extend_ttls(multiplier=1.5)

Cache Performance Under DDoS

DDoS attacks overwhelm origin through cache misses.

Cache-aware DDoS mitigation

class CacheAwareDDoSMitigation:
def __init__(self, rate_limits):
self.limits = rate_limits
self.request_counts = defaultdict(int)
def analyze_request(self, request) -> dict:
"""
Analyze request for DDoS patterns.
"""
key = request.path
# Check if cached
if self.is_cache_hit(key):
# Cached content - low risk
return {'action': 'serve', 'risk': 'low'}
# Check cache bypass attempts
if self.is_cache_bypass(request):
return {'action': 'block', 'reason': 'cache_bypass_attempt'}
# Check for cache poisoning attempt
if self.has_malicious_headers(request):
return {'action': 'sanitize', 'headers': self.sanitize(request)}
# Rate limit cache misses
self.request_counts[key] += 1
if self.request_counts[key] > self.limits['miss_per_path']:
return {'action': 'rate_limit', 'ttl': 60}
return {'action': 'forward', 'risk': 'medium'}
def is_cache_bypass(self, request) -> bool:
"""
Detect attempts to bypass cache.
"""
bypass_indicators = [
request.headers.get('Pragma') == 'no-cache',
request.headers.get('Cache-Control') == 'no-cache',
'?' in request.path and random_query_string(request),
]
return any(bypass_indicators)

Cost Modeling

Total Cost of Ownership

TCO = (Origin Bandwidth × Origin Cost)
+ (Edge Bandwidth × Edge Cost)
+ (Purge Operations × Purge Cost)
+ (Storage × Storage Cost)
- (Savings from Reduced Origin Load)
Break-even Hit Ratio:
Hit Ratio where edge costs = origin cost savings
Break-even = Edge Cost / (Origin Cost × Traffic Multiplier)
Example:
- Origin: $0.08/GB
- Edge: $0.02/GB
- Traffic to origin reduced by factor of 10 at 90% hit ratio
Break-even = $0.02 / ($0.08 × 0.1) = 25% hit ratio

Cost Optimization Decision Matrix

Hit RatioActionRationale
< 60%Investigate cacheabilityCost exceeding value
60-80%Optimize TTLsModerate savings
80-95%Monitor and tuneGood performance
> 95%Consider TTL reductionMay be over-caching

Cache for Specific Workloads

Video Streaming Cache Patterns

HLS/DASH Segment Caching

HLS Manifest Structure:
├── master.m3u8 → TTL: 10s (frequent updates)
├── playlist.m3u8 → TTL: 5s (segment list changes)
└── segments/
├── seg001.ts → TTL: 1 year (immutable)
├── seg002.ts → TTL: 1 year (immutable)
└── ...
DASH Manifest Structure:
├── manifest.mpd → TTL: 10s
└── segments/
├── init.mp4 → TTL: 1 year
└── seg_001.m4s → TTL: 1 year

Configuration:

# Cache rules for streaming
rules:
- match: "*.m3u8"
ttl: 10
stale_if_error: 300
- match: "*.mpd"
ttl: 10
stale_if_error: 300
- match: "*.ts"
ttl: 31536000 # 1 year
immutable: true
- match: "*.m4s"
ttl: 31536000
immutable: true

Live Streaming Considerations

class LiveStreamCacheManager:
def __init__(self, segment_window=10):
self.window = segment_window # Keep last N segments
def configure_live_cache(self, stream_id: str):
"""
Live streaming needs short TTL + stale handlers.
"""
return {
'manifest_ttl': 2, # 2 seconds
'segment_ttl': 300, # 5 minutes (live segments expire)
'stale_if_error': 10, # Brief fallback
'purge_on_stream_end': True
}
async def purge_expired_segments(self, stream_id: str):
"""
Remove old segments from live stream.
"""
segments = await self.get_segment_list(stream_id)
if len(segments) > self.window:
old_segments = segments[:-self.window]
await self.purge(old_segments)

API Response Caching at Scale

Public API Caching

from functools import wraps
import hashlib
def cache_response(ttl=60, vary_on=None):
"""
Decorator for API response caching.
"""
def decorator(func):
@wraps(func)
async def wrapper(request, *args, **kwargs):
# Build cache key
key = build_cache_key(request, vary_on)
# Check cache
cached = await cache.get(key)
if cached:
return cached
# Execute function
response = await func(request, *args, **kwargs)
# Store in cache
await cache.set(key, response, ttl=ttl)
return response
return wrapper
return decorator
def build_cache_key(request, vary_on):
"""
Build cache key from request and vary headers.
"""
key_parts = [request.path]
if vary_on:
for header in vary_on:
key_parts.append(request.headers.get(header, ''))
if request.query_string:
# Normalize query string
normalized = normalize_query_string(request.query_string)
key_parts.append(normalized)
return hashlib.sha256('|'.join(key_parts).encode()).hexdigest()

Authenticated API Caching

class AuthenticatedAPICache:
"""
Cache responses per-user with privacy guarantees.
"""
def __init__(self):
self.cache = {}
async def get_user_response(self, user_id: str, endpoint: str):
key = f"user:{user_id}:{endpoint}"
return await self.cache.get(key)
async def set_user_response(self, user_id: str, endpoint: str,
response: dict, ttl: int):
key = f"user:{user_id}:{endpoint}"
await self.cache.set(key, response, ttl)
# Track for invalidation
await self.track_user_key(user_id, key)
async def invalidate_user_cache(self, user_id: str):
"""
Invalidate all cache for a user.
"""
keys = await self.get_user_keys(user_id)
await self.cache.delete_many(keys)

GraphQL Query Caching

Persisted Queries

class PersistedQueryCache:
"""
Cache GraphQL queries by hash.
"""
def __init__(self):
self.query_registry = {} # hash -> query
async def register_query(self, query: str) -> str:
"""
Register query and return hash.
"""
query_hash = hashlib.sha256(query.encode()).hexdigest()
self.query_registry[query_hash] = query
return query_hash
async def execute_persisted(self, query_hash: str, variables: dict):
"""
Execute persisted query with caching.
"""
if query_hash not in self.query_registry:
raise QueryNotFoundError(query_hash)
query = self.query_registry[query_hash]
# Cache key includes query hash + variables hash
cache_key = f"gql:{query_hash}:{hash_variables(variables)}"
cached = await cache.get(cache_key)
if cached:
return cached
result = await self.execute(query, variables)
await cache.set(cache_key, result, ttl=self.get_query_ttl(query))
return result
def get_query_ttl(self, query: str) -> int:
"""
Determine TTL based on query type.
"""
if 'getUser(' in query:
return 60
elif 'getProduct(' in query:
return 300
elif 'search(' in query:
return 30
return 60

Automatic Persisted Queries (APQ)

class AutomaticPersistedQueries:
"""
Client sends hash first, server responds with query if not found.
"""
async def handle_request(self, request):
query_hash = request.get('extensions', {}).get('persistedQuery', {}).get('sha256Hash')
if query_hash:
# Try to get from cache
result = await self.try_persisted(query_hash, request.get('variables'))
if result:
return result
# Query not found - client must send full query
if not request.get('query'):
return {'errors': [{'message': 'PersistedQueryNotFound'}]}
# Register for future
await self.register(query_hash, request['query'])
# Execute normally
return await self.execute(request['query'], request.get('variables'))

Real-time Data Caching Challenges

Real-time data (stock prices, sports scores) has conflicting requirements: freshness vs cacheability.

class RealtimeCacheStrategy:
"""
Cache real-time data with strict invalidation.
"""
def __init__(self, max_staleness_ms=100):
self.max_staleness = max_staleness_ms
self.last_update = {}
async def get_with_freshness_check(self, key: str):
"""
Return data only if fresh enough.
"""
cached = await cache.get(key)
if not cached:
return None
staleness_ms = (time.time() - self.last_update[key]) * 1000
if staleness_ms > self.max_staleness:
# Stale - trigger async refresh
asyncio.create_task(self.refresh(key))
# Return stale with warning
cached['_stale'] = True
cached['_staleness_ms'] = staleness_ms
return cached
async def refresh(self, key: str):
"""
Background refresh from source.
"""
fresh_data = await self.fetch_from_source(key)
await cache.set(key, fresh_data)
self.last_update[key] = time.time()

Edge Compute + Cache Patterns

Execute logic at edge before/after cache lookup.

// Edge function: Pre-cache logic
async function handleRequest(event) {
const request = event.request;
// Pre-cache: Check authentication
const user = await authenticate(request);
if (!user) {
return new Response('Unauthorized', { status: 401 });
}
// Personalized cache key
const cacheKey = `user:${user.id}:${request.url}`;
// Check cache
const cache = caches.default;
let response = await cache.match(cacheKey);
if (response) {
// Post-cache: Add personalization header
response = new Response(response.body, response);
response.headers.set('X-User-ID', user.id);
return response;
}
// Cache miss - fetch from origin
response = await fetch(request);
// Post-cache: Transform response
response = await transformResponse(response, user);
// Cache with user-specific key
const headers = new Headers(response.headers);
headers.set('Cache-Control', 'private, max-age=300');
event.waitUntil(
cache.put(cacheKey, new Response(response.body, { headers }))
);
return response;
}

Advanced Observability

Distributed Tracing Across Cache Layers

from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
tracer = trace.get_tracer(__name__)
propagator = TraceContextTextMapPropagator()
class TracedCacheLookup:
async def lookup(self, request):
"""
Distributed trace through cache hierarchy.
"""
with tracer.start_as_current_span("cache.lookup") as span:
span.set_attribute("cache.key", request.cache_key)
span.set_attribute("cache.tier", "edge")
# Local edge lookup
with tracer.start_span("edge_cache.get"):
result = await self.edge_cache.get(request.cache_key)
span.set_attribute("cache.hit", result is not None)
if result:
span.set_attribute("cache.result", "hit")
return result
# Regional edge lookup
with tracer.start_span("regional_cache.get"):
result = await self.regional_cache.get(request.cache_key)
if result:
span.set_attribute("cache.result", "regional_hit")
return result
# Origin fetch
with tracer.start_span("origin.fetch") as origin_span:
# Propagate trace context to origin
headers = {}
propagator.inject(headers)
result = await self.origin.fetch(
request,
headers={'traceparent': headers.get('traceparent')}
)
origin_span.set_attribute("http.status_code", result.status)
span.set_attribute("cache.result", "miss")
return result

ML-Based Cache Prediction

import numpy as np
from sklearn.ensemble import RandomForestClassifier
class CachePredictor:
"""
Predict cache hits to optimize routing.
"""
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100)
self.feature_history = []
self.label_history = []
def extract_features(self, request):
"""
Features for prediction.
"""
return [
len(request.path),
request.path.count('/'),
1 if '?' in request.path else 0,
self.get_path_popularity(request.path),
self.get_recent_hit_rate(request.path),
request.headers.get('User-Agent', '').count('bot'),
]
def train(self, requests, outcomes):
"""
Train on historical data.
outcomes: 1 for hit, 0 for miss
"""
features = [self.extract_features(r) for r in requests]
self.model.fit(features, outcomes)
def predict_hit_probability(self, request):
"""
Predict probability of cache hit.
"""
features = np.array([self.extract_features(request)])
return self.model.predict_proba(features)[0][1]
async def route_with_prediction(self, request, cdn_options):
"""
Route to best CDN based on hit prediction.
"""
predictions = {}
for cdn in cdn_options:
# CDN-specific prediction
features = self.extract_features_for_cdn(request, cdn)
predictions[cdn] = self.model.predict_proba([features])[0][1]
return max(predictions, key=predictions.get)

Anomaly Detection in Cache Patterns

from collections import deque
import statistics
class CacheAnomalyDetector:
"""
Detect anomalies in cache behavior.
"""
def __init__(self, window_size=1000, z_threshold=3):
self.window_size = window_size
self.z_threshold = z_threshold
self.hit_ratios = deque(maxlen=window_size)
self.latencies = deque(maxlen=window_size)
self.error_rates = deque(maxlen=window_size)
def check_hit_ratio_anomaly(self, current_ratio):
"""
Detect sudden hit ratio changes.
"""
self.hit_ratios.append(current_ratio)
if len(self.hit_ratios) < 100:
return None
mean = statistics.mean(self.hit_ratios)
stdev = statistics.stdev(self.hit_ratios)
if stdev == 0:
return None
z_score = (current_ratio - mean) / stdev
if abs(z_score) > self.z_threshold:
return {
'type': 'hit_ratio_anomaly',
'current': current_ratio,
'expected': mean,
'z_score': z_score,
'direction': 'drop' if z_score < 0 else 'spike'
}
return None
def check_latency_anomaly(self, current_latency):
"""
Detect latency spikes.
"""
self.latencies.append(current_latency)
if len(self.latencies) < 100:
return None
p50 = statistics.median(self.latencies)
p95 = np.percentile(list(self.latencies), 95)
if current_latency > p95 * 2:
return {
'type': 'latency_anomaly',
'current': current_latency,
'p50': p50,
'p95': p95,
'severity': 'high' if current_latency > p95 * 5 else 'medium'
}
return None
def detect_cache_bypass_attack(self, request_pattern):
"""
Detect systematic cache bypass attempts.
"""
indicators = [
request_pattern.get('unique_paths_per_ip', 0) > 100,
request_pattern.get('cache_bypass_headers_rate', 0) > 0.5,
request_pattern.get('random_query_strings_rate', 0) > 0.7,
]
if sum(indicators) >= 2:
return {
'type': 'cache_bypass_attack',
'confidence': sum(indicators) / 3,
'indicators': indicators
}
return None

Capacity Forecasting Models

from scipy import stats
import pandas as pd
class CacheCapacityForecaster:
"""
Forecast cache capacity needs.
"""
def __init__(self, history_days=90):
self.history_days = history_days
self.data = []
def add_daily_metrics(self, date, metrics):
"""
Add daily metrics for forecasting.
"""
self.data.append({
'date': date,
'traffic_gb': metrics['traffic_gb'],
'hit_ratio': metrics['hit_ratio'],
'unique_objects': metrics['unique_objects'],
'storage_gb': metrics['storage_gb'],
'origin_requests': metrics['origin_requests'],
})
def forecast_traffic(self, days_ahead=30):
"""
Linear regression forecast for traffic.
"""
df = pd.DataFrame(self.data)
df['day'] = (pd.to_datetime(df['date']) - pd.to_datetime(df['date'].min())).dt.days
slope, intercept, r_value, p_value, std_err = stats.linregress(
df['day'], df['traffic_gb']
)
future_days = df['day'].max() + np.arange(1, days_ahead + 1)
forecast = slope * future_days + intercept
return {
'forecast_gb': forecast.tolist(),
'confidence_interval': self.calculate_ci(forecast, std_err),
'r_squared': r_value ** 2,
'slope': slope,
}
def forecast_storage_needs(self, days_ahead=30):
"""
Forecast storage capacity needs.
"""
df = pd.DataFrame(self.data)
# Factor in: traffic growth, hit ratio changes, object size distribution
traffic_forecast = self.forecast_traffic(days_ahead)['forecast_gb']
avg_hit_ratio = df['hit_ratio'].mean()
avg_object_size = df['storage_gb'].sum() / df['unique_objects'].sum()
# Storage grows with unique objects
slope, intercept, _, _, _ = stats.linregress(
range(len(df)), df['unique_objects']
)
future_objects = intercept + slope * (len(df) + days_ahead)
storage_needed = future_objects * avg_object_size
return {
'current_storage_gb': df['storage_gb'].iloc[-1],
'forecast_storage_gb': storage_needed,
'additional_storage_gb': storage_needed - df['storage_gb'].iloc[-1],
'recommendation': self.get_capacity_recommendation(storage_needed),
}
def get_capacity_recommendation(self, forecast_gb):
"""
Get actionable recommendation.
"""
current = self.data[-1]['storage_gb']
utilization = forecast_gb / (current * 1.5) # Assume 50% headroom
if utilization > 0.9:
return 'CRITICAL: Provision additional storage immediately'
elif utilization > 0.75:
return 'WARNING: Plan storage expansion within 30 days'
elif utilization > 0.6:
return 'Monitor: Consider expansion planning'
else:
return 'OK: Sufficient capacity'

Architecture Decision Records

ADR-001: When to Use Multiple CDNs

Status: Accepted

Context: Single CDN creates vendor lock-in and single point of failure. Multi-CDN adds complexity but improves resilience and potentially cost.

Decision:

Traffic LevelSLA RequirementRecommendation
< 10TB/month99%Single CDN sufficient
10-100TB/month99.5%Two CDNs, failover
> 100TB/month99.9%Two+ CDNs, active-active
Global audience99.99%Three CDNs, geo-routing

Consequences:

  • Increased operational complexity (monitoring, configuration sync)
  • Negotiating leverage with vendors
  • Potential for inconsistent cache states
  • Higher engineering overhead

ADR-002: TTL Strategy by Business Criticality

Status: Accepted

Context: TTL directly impacts freshness vs performance. Different content types have different tolerance for staleness.

Decision:

Content TypeBusiness ImpactMax StalenessTTLStale Handler
Pricing/PaymentCritical0s0stale-if-error=60
InventoryHigh30s30stale-while-revalidate=300
Product detailsMedium5min300stale-while-revalidate=3600
Static assetsLow31536000N/A
User contentVariable0-60sPer-userprivate

Consequences:

  • Higher origin load for critical content
  • More complex cache rules
  • Need for proactive invalidation on critical updates
  • Reduced stale risk for important business flows

ADR-003: Cache Warming vs On-Demand Caching

Status: Accepted

Context: Cache warming pre-populates content before user requests. On-demand caching populates on first request. Each has costs and benefits.

Decision:

ScenarioStrategyRationale
Product launchCache warmingPredictable spike, known URLs
Daily newsOn-demand + warm top 100Unpredictable content, warm high-traffic
E-commerce catalogWarm top 10K productsPredictable traffic patterns
User-generated contentOn-demandCannot predict access
Video on demandWarm manifest, on-demand segmentsHybrid approach

Cache warming decision tree:

Is traffic predictable?
├── Yes → Can you identify top content?
│ ├── Yes → Warm top N items
│ └── No → Monitor and warm on access pattern
└── No → Is first-hit latency critical?
├── Yes → Speculative warming
└── No → On-demand caching

Consequences:

  • Warming requires origin capacity for bulk fetches
  • Need to track warming success/failure
  • May warm content never accessed (waste)
  • First user experience improved for warmed content

ADR-004: Edge Compute vs Origin Processing

Status: Accepted

Context: Some processing can happen at edge (authentication, personalization, A/B testing) vs origin. Edge compute reduces latency but adds complexity.

Decision:

Processing TypeEdge vs OriginFactors
AuthenticationEdgeLatency, offload origin
A/B test routingEdgeConsistency, speed
PersonalizationEdgeLatency, cache per-user
Complex business logicOriginCode complexity, data access
Database writesOriginConsistency, transactions
Image resizeEdgeCompute cost vs bandwidth

Decision matrix:

def decide_processing_location(requirements):
"""
Determine if processing should be edge or origin.
"""
score = 0
# Latency sensitivity
if requirements.latency_sensitivity == 'high':
score += 3
elif requirements.latency_sensitivity == 'medium':
score += 1
# Data access needs
if requirements.needs_database_write:
score -= 5
if requirements.needs_complex_query:
score -= 3
# Code complexity
if requirements.code_complexity == 'high':
score -= 2
# Cacheability
if requirements.response_cacheable:
score += 2
return 'edge' if score > 0 else 'origin'

Consequences:

  • Edge compute requires different deployment model
  • Debugging more complex across locations
  • Reduced origin load for edge processing
  • Need to manage state at edge

Frequently Asked Questions

How do I estimate cache storage for a new product? Use the formula: Storage = (Expected Objects × Avg Size × Replication Factor). For a product with 100K pages averaging 50KB, replicated to 20 edges: 100K × 50KB × 20 = 100GB. Add 20% for metadata and indexes.

What hit ratio should I target for APIs? Public APIs: 80-90% with 1-5 minute TTLs. Authenticated APIs: 30-60% with user-specific cache keys. Real-time APIs: < 20% acceptable if staleness is critical.

How do I handle cache invalidation during deployments? Use content hashes in URLs for static assets (auto-invalidation). For dynamic content, implement deployment webhooks that trigger cache purges. Version your API endpoints (v1, v2) to allow parallel caching.

What’s the cost of invalidating 1M objects? At typical pricing ($0.005-0.01 per object): $5,000-10,000. Use surrogate keys to group objects and reduce individual purges. Batch invalidations over time windows.

How do multi-CDN architectures handle cache inconsistency? Implement event-sourced invalidation with confirmation from all CDNs. Use version vectors for conflict resolution. Accept eventual consistency for non-critical content.

Should I use stale-while-revalidate for all content? No. Use it for content where stale is acceptable (product descriptions, blog posts). Don’t use for critical real-time data (stock prices, inventory counts) where staleness has business impact.

How do I plan capacity for Black Friday? Forecast 5-10x normal traffic. Pre-warm cache with expected hot content. Extend TTLs temporarily. Enable aggressive stale handlers. Scale origin capacity 3x. Test with simulated load.

What metrics indicate cache problems? Hit ratio dropping > 5% suddenly. Origin traffic spiking without user traffic spike. TTFB P95 increasing. Purge queue backing up. Cache storage near capacity (> 85%).

How This Applies in Practice

Enterprise cache architecture balances performance, cost, and consistency. Multi-CDN provides resilience but requires synchronization. Event-sourced invalidation ensures reliable propagation. Capacity planning prevents outages during spikes.

Key metrics to track: hit ratio by content type, origin load ratio, invalidation latency, storage utilization, cost per GB served. Set alerts for anomalies. Review ADRs quarterly as traffic patterns evolve.

How to Implement with Azion

Azion provides enterprise cache capabilities:

  1. Edge Cache: Hierarchical caching with configurable TTLs and stale handlers
  2. Functions: JavaScript/Wasm for custom cache logic at edge
  3. Cache API: Bulk invalidation with surrogate keys
  4. Real-Time Metrics: Per-path hit ratio, TTFB distribution, storage usage
  5. Multi-CDN Integration: Configure origin shields and cache-aware routing

Example enterprise configuration:

Rules Engine:
- If: Path matches /api/v1/products/*
Then:
Cache TTL = 300
Stale-While-Revalidate = 3600
Surrogate-Key = products
- If: Path matches /static/*
Then:
Cache TTL = 31536000
Immutable = true
- If: Path matches /live/*
Then:
Cache TTL = 2
Stale-If-Error = 10
Functions:
- Pre-cache: Authentication check, personalized cache keys
- Post-cache: Header modification, response transformation
Cache API:
- Purge by surrogate key: POST /cache/purge {"keys": ["products"]}
- Bulk purge: POST /cache/purge {"urls": ["..."], "soft": true}

Learn more in the Azion Serverless Applications documentation.


Sources:

  • RFC 7234. “Hypertext Transfer Protocol (HTTP/1.1): Caching.” IETF. 2014.
  • CDN Planet. “Multi-CDN Architecture Patterns.” 2024.
  • Netflix. “Scaling Video Cache Globally.” 2024.
  • Shopify. “Black Friday Cache Strategy.” 2024.
  • HTTP Archive. “Web Almanac 2024: Caching.” 2024.
stay up to date

Subscribe to our Newsletter

Get the latest product updates, event highlights, and tech industry insights delivered to your inbox.