CDN Cache, Invalidation and Performance: Enterprise Architecture
Enterprise cache architecture requires coordinating multiple CDN providers, invalidating millions of objects atomically, and maintaining performance under extreme load. This guide covers architecture patterns, scale considerations, and decision frameworks for teams operating globally.
Last updated: 2026-06-18
Global Scale Cache Architectures
Multi-CDN Strategies
Single CDN creates single point of failure. Multi-CDN architectures distribute risk and optimize for cost, performance, and resilience.
Active-Active Configuration
Both CDNs serve traffic simultaneously. Load balancer distributes based on performance, cost, or weighted ratios.
┌──────────────────────────────────────────────────────────────────┐│ DNS Load Balancer ││ (Route 53, Cloudflare, NS1) │└─────────────────────┬────────────────────┬───────────────────────┘ │ │ ▼ ▼ ┌───────────────┐ ┌───────────────┐ │ CDN A │ │ CDN B │ │ 60% traffic │ │ 40% traffic │ └───────┬───────┘ └───────┬───────┘ │ │ └────────┬───────────┘ │ ▼ ┌───────────────┐ │ Origin │ └───────────────┘Weighted distribution algorithm:
def select_cdn(request, weights): """ weights = {'cdn_a': 0.6, 'cdn_b': 0.4} """ cdn = weighted_choice(weights)
# Override if one CDN has issues if health_check(cdn) == 'degraded': cdn = fallback_cdn(cdn, weights)
return cdnFailover Configuration
Primary CDN serves all traffic. Secondary activates on primary failure.
┌─────────────────────────────────────────────────────────────────┐│ Health Monitor ││ (checks /health every 10s) │└─────────────────────────┬───────────────────────────────────────┘ │ ▼ ┌───────────────────────┐ │ Primary CDN │ ← Normal: 100% traffic │ Status: Healthy │ └───────────┬───────────┘ │ │ On failure ▼ ┌───────────────────────┐ │ Secondary CDN │ ← Failover: 100% traffic │ Status: Standby │ └───────────────────────┘Failover decision matrix:
| Metric | Threshold | Action | Recovery |
|---|---|---|---|
| Error rate | > 5% | Route to secondary | < 1% for 5 min |
| P95 latency | > 2s | Route to secondary | < 500ms for 5 min |
| Availability | < 99% | Route to secondary | > 99.9% for 10 min |
| RUM complaints | Spike > 3x | Investigate + potential failover | Baseline |
Performance-Based Routing
Route to CDN with lowest latency for user’s location.
def select_best_cdn(user_ip, cdn_providers): """ Latency-aware CDN selection. """ latencies = {}
for cdn in cdn_providers: # Use real user measurements or synthetic probes latencies[cdn] = get_p95_latency(user_ip, cdn)
# Select CDN with lowest latency best_cdn = min(latencies, key=latencies.get)
# Apply business rules (cost caps, contract minimums) if get_monthly_spend(best_cdn) > cost_cap: best_cdn = get_next_best(latencies, exclude=best_cdn)
return best_cdnCache Hierarchy
Enterprise CDNs implement hierarchical caching: origin → regional edge → local edge.
┌─────────────────────────────────────────────────────────────────────┐│ Origin ││ (Single source of truth) │└───────────────────────────────┬─────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────────────┐│ Regional Edge Cache ││ (US-East, EU-West, APAC, LATAM) ││ TTL: Base TTL × 1.5 (longer-lived) │└───────┬─────────────────┬─────────────────┬─────────────────┬───────┘ │ │ │ │ ▼ ▼ ▼ ▼┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐│ Local Edge │ │ Local Edge │ │ Local Edge │ │ Local Edge ││ New York │ │ London │ │ São Paulo │ │ Tokyo ││ TTL: Base │ │ TTL: Base │ │ TTL: Base │ │ TTL: Base │└───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │ │ │ │ ▼ ▼ ▼ ▼ [Users] [Users] [Users] [Users]Hierarchy TTL configuration:
| Level | Purpose | TTL Multiplier | Storage |
|---|---|---|---|
| Origin | Source | N/A | Full dataset |
| Regional | Reduce cross-region requests | 1.5x base TTL | Hot + warm |
| Local | Serve users | Base TTL | Hot only |
Cache lookup flow:
def hierarchical_lookup(request, cache_hierarchy): """ Lookup through cache hierarchy. """ for level in ['local', 'regional', 'origin']: cache = cache_hierarchy[level]
if cache.has(request.key): response = cache.get(request.key)
if not response.is_stale(): return response
# Stale-while-revalidate if level != 'origin': async_revalidate(request, cache_hierarchy) return response
# Full miss - fetch from origin return fetch_from_origin(request)Geographic Load Balancing with Cache Awareness
Traditional geo-load balancing routes to nearest region. Cache-aware routing considers both proximity and cache state.
def cache_aware_routing(user_location, cdn_regions): """ Route based on location + cache hit probability. """ candidates = []
for region in cdn_regions: proximity_score = calculate_proximity(user_location, region) cache_score = estimate_cache_hit_rate(region)
# Weighted combination combined_score = (proximity_score * 0.4) + (cache_score * 0.6) candidates.append((region, combined_score))
return max(candidates, key=lambda x: x[1])[0]
def estimate_cache_hit_rate(region): """ Predict cache hit rate based on recent metrics. """ metrics = get_recent_metrics(region, window='5min')
# Factor in: object popularity, TTL, recent purges popularity = metrics.get('object_popularity', 0.5) ttl_factor = metrics.get('avg_ttl_remaining', 0.5) purge_impact = 1 - metrics.get('recent_purge_ratio', 0)
return (popularity + ttl_factor + purge_impact) / 3Cache Synchronization Across CDN Providers
Multi-CDN requires cache synchronization to prevent stale content in one provider while another is updated.
Invalidation Broadcast Pattern
┌─────────────────────────────────────────────────────────────────┐│ Invalidation Event ││ (content updated) │└─────────────────────────┬───────────────────────────────────────┘ │ ▼ ┌───────────────────────┐ │ Event Bus │ │ (Kafka/SNS) │ └───────┬───────┬───────┘ │ │ ┌───────────┘ └───────────┐ │ │ ▼ ▼ ┌───────────────┐ ┌───────────────┐ │ CDN A Worker │ │ CDN B Worker │ │ Purge API │ │ Purge API │ └───────────────┘ └───────────────┘Implementation:
import jsonfrom kafka import KafkaConsumer, KafkaProducer
class CacheInvalidator: def __init__(self, cdn_clients): self.cdn_clients = cdn_clients self.consumer = KafkaConsumer( 'cache-invalidation', bootstrap_servers=['kafka:9092'], group_id='cdn-invalidator' )
def process_invalidations(self): for message in self.consumer: event = json.loads(message.value)
# Parallel purge across all CDNs results = parallel_purge( self.cdn_clients, event['keys'], event.get('soft_purge', False) )
# Log any failures for retry failed = [r for r in results if not r.success] if failed: self.retry_queue.send( 'invalidation-retry', json.dumps(failed) )Invalidation SLAs
| Priority | Target SLA | Use Case | Implementation |
|---|---|---|---|
| Critical | < 1s | Security patches, legal removal | Synchronous purge, verify |
| High | < 10s | Price changes, product updates | Async with confirmation |
| Normal | < 60s | Content updates | Async, best effort |
| Low | < 300s | Non-critical updates | Batch processing |
Advanced Invalidation Patterns
Event-Sourced Invalidation
Traditional invalidation: application calls purge API directly. Event-sourced: all invalidations flow through event bus, enabling audit trails, replay, and guaranteed delivery.
Kafka Pattern
# Kafka topic configurationapiVersion: kafka.strimzi.io/v1beta2kind: KafkaTopicmetadata: name: cache-invalidationspec: partitions: 12 replicas: 3 config: retention.ms: 86400000 # 24 hours cleanup.policy: compact # Keep latest per keyProducer (application):
from kafka import KafkaProducerimport json
class CacheInvalidationProducer: def __init__(self): self.producer = KafkaProducer( bootstrap_servers=['kafka:9092'], value_serializer=lambda v: json.dumps(v).encode() )
def invalidate_content(self, keys, priority='normal'): """ Emit invalidation event. """ event = { 'keys': keys, 'priority': priority, 'timestamp': time.time(), 'source': 'cms', 'correlation_id': generate_uuid() }
# Partition by first key for ordering partition = hash(keys[0]) % 12
future = self.producer.send( 'cache-invalidation', value=event, partition=partition, headers={ 'priority': priority.encode(), 'source': 'cms' } )
return futureConsumer (CDN worker):
from kafka import KafkaConsumerimport asyncio
class CacheInvalidationWorker: def __init__(self, cdn_client): self.cdn_client = cdn_client self.consumer = KafkaConsumer( 'cache-invalidation', bootstrap_servers=['kafka:9092'], group_id='cdn-purger', enable_auto_commit=False )
async def process_batch(self): """ Batch process invalidations for efficiency. """ batch = []
for message in self.consumer: batch.append(json.loads(message.value))
if len(batch) >= 100: # Batch size break
# Dedupe keys across batch all_keys = set() for event in batch: all_keys.update(event['keys'])
# Single purge call for batch result = await self.cdn_client.purge_bulk(list(all_keys))
if result.success: self.consumer.commit() else: # Retry failed keys await self.retry_failed(result.failed_keys)RabbitMQ Pattern
import pikaimport json
class CacheInvalidationRabbitMQ: def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters('rabbitmq') ) self.channel = self.connection.channel()
# Declare exchange with dead letter self.channel.exchange_declare( exchange='cache-invalidation', exchange_type='direct', durable=True )
# Priority queue args = { 'x-max-priority': 3, 'x-dead-letter-exchange': 'cache-invalidation-dlx' } self.channel.queue_declare( queue='cdn-purge', durable=True, arguments=args )
def publish(self, keys, priority=1): """ priority: 1=normal, 2=high, 3=critical """ self.channel.basic_publish( exchange='cache-invalidation', routing_key='cdn-purge', body=json.dumps({'keys': keys}), properties=pika.BasicProperties( delivery_mode=2, # Persistent priority=priority ) )Invalidation at Scale (Millions of Objects)
Purging millions of objects requires batching, rate limiting, and async processing.
Surrogate Key Strategy
class SurrogateKeyManager: """ Organize content by surrogate keys for efficient bulk invalidation. """ def __init__(self): self.key_registry = {} # key -> [surrogate_keys]
def assign_surrogate_keys(self, content_id, keys): """ Assign surrogate keys to content. Keys are hierarchical: site:category:subcategory """ self.key_registry[content_id] = keys
def get_keys_to_purge(self, surrogate_key): """ Get all content IDs matching surrogate key. """ return [ cid for cid, keys in self.key_registry.items() if surrogate_key in keys ]Surrogate key patterns:
| Pattern | Example | Invalidation Scope |
|---|---|---|
| Site-level | site:acme | Entire site |
| Section | site:acme:blog | Blog section |
| Content type | site:acme:product | All products |
| Entity | product:12345 | Single product |
| Dependency | product:12345:reviews | Product reviews |
Bulk Purge Implementation
import asynciofrom dataclasses import dataclass
@dataclassclass PurgeResult: total: int success: int failed: int rate_limited: int
async def bulk_purge(cdn_client, keys, batch_size=1000, rate_limit=100): """ Purge millions of keys with rate limiting. """ results = PurgeResult(total=len(keys), success=0, failed=0, rate_limited=0)
# Process in batches for i in range(0, len(keys), batch_size): batch = keys[i:i + batch_size]
try: result = await cdn_client.purge(batch) results.success += result.purged results.failed += result.failed
except RateLimitError: results.rate_limited += len(batch) await asyncio.sleep(1) # Backoff # Requeue batch
await asyncio.sleep(1 / rate_limit) # Rate limit
return resultsConflict Resolution for Distributed Cache
Multiple edges may have different versions of cached content during invalidation propagation.
Version Vector Pattern
import jsonfrom typing import Dict, List
class VersionedCache: """ Track content versions across distributed cache. """ def __init__(self): self.versions: Dict[str, Dict[str, int]] = {} # key -> {edge: version}
def update(self, key: str, edge: str, version: int): if key not in self.versions: self.versions[key] = {} self.versions[key][edge] = version
def get_latest_version(self, key: str) -> int: if key not in self.versions: return 0 return max(self.versions[key].values())
def is_stale(self, key: str, edge: str, version: int) -> bool: latest = self.get_latest_version(key) return version < latest
def resolve_conflict(self, key: str, versions: List[tuple]) -> tuple: """ versions: [(edge, version, content), ...] Returns: (winning_edge, winning_content) """ # Highest version wins # If tie, use edge priority edge_priority = {'edge-primary': 1, 'edge-secondary': 2}
return max( versions, key=lambda x: (x[1], -edge_priority.get(x[0], 999)) )Cost Optimization for Purges
CDN providers charge for purges. Optimize purge strategy to minimize costs.
Purge Cost Model
Purge Cost = (Number of objects × Cost per object) + API calls
Example (typical pricing):- First 1,000 purges/month: Free- Next 99,000: $0.005 per purge- 100,000+: $0.01 per purge
Cost for 1M purges:- Free tier: 1,000- Tier 2: 99,000 × $0.005 = $495- Tier 3: 900,000 × $0.01 = $9,000- Total: $9,495Cost Reduction Strategies
| Strategy | Savings | Trade-off |
|---|---|---|
| Use surrogate keys | 10-100x fewer API calls | Requires planning |
| Batch invalidations | 50-80% fewer calls | Delayed propagation |
| TTL-based expiration | 100% purge savings | Stale until TTL |
| Soft purge | 30-50% fewer revalidations | Serves stale briefly |
| Invalidation deduplication | 20-40% fewer calls | Processing overhead |
Batching implementation:
from collections import defaultdictimport time
class PurgeBatcher: """ Batch purges over time window to reduce API calls. """ def __init__(self, window_seconds=5, max_batch_size=1000): self.window = window_seconds self.max_batch = max_batch_size self.pending = defaultdict(list) self.timers = {}
async def queue_purge(self, key: str, priority: str = 'normal'): self.pending[priority].append(key)
# Start timer if not running if priority not in self.timers: self.timers[priority] = asyncio.create_task( self._process_after_window(priority) )
# Immediate flush if batch full if len(self.pending[priority]) >= self.max_batch: await self._flush(priority)
async def _process_after_window(self, priority: str): await asyncio.sleep(self.window) await self._flush(priority)
async def _flush(self, priority: str): if self.pending[priority]: keys = self.pending.pop(priority) await self._execute_purge(keys, priority)
if priority in self.timers: self.timers[priority].cancel() del self.timers[priority]Enterprise Scale Performance
Capacity Planning Formulas
Cache Storage Calculation
Storage Required = Σ(Object Size × Popularity Factor) × Replication Factor
Where:- Object Size: Average size per object- Popularity Factor: 0.0-1.0 based on access frequency- Replication Factor: Number of edge locations storing the object
Example:- 10M objects- Average size: 50KB- Hot objects (20%): PF = 1.0, replicated to 50 edges- Warm objects (30%): PF = 0.5, replicated to 10 edges- Cold objects (50%): PF = 0.1, replicated to 3 edges
Storage = (2M × 50KB × 50) + (3M × 50KB × 10 × 0.5) + (5M × 50KB × 3 × 0.1) = 5,000GB + 750GB + 75GB = 5,825GB ≈ 5.8TBOrigin Load Calculation
Origin Requests/sec = (Total Requests × (1 - Hit Ratio)) / TTL
Example:- 100,000 requests/sec peak- 90% hit ratio- 300 second average TTL
Miss rate = 100,000 × 0.10 = 10,000 req/sec to originBandwidth Savings
Savings = (Total Traffic × Hit Ratio × Edge Cost Premium) - Purge Costs
Where:- Edge Cost Premium: Cost difference between origin and edge bandwidth
Example:- 10TB/day traffic- 90% hit ratio- Origin: $0.08/GB, Edge: $0.02/GB- Purge costs: $500/month
Savings = (10TB × 30 × 0.90 × ($0.08 - $0.02)) - $500 = $1,620 - $500 = $1,120/monthCache Partitioning Strategies
Partition cache by access patterns: hot, warm, cold.
┌─────────────────────────────────────────────────────────────────┐│ Total Cache ││ (100TB Example) │├─────────────────┬─────────────────┬─────────────────────────────┤│ HOT │ WARM │ COLD ││ (20TB) │ (30TB) │ (50TB) ││ │ │ ││ • 5% of objects │ • 15% objects │ • 80% of objects ││ • 80% of hits │ • 15% of hits │ • 5% of hits ││ • TTL: 1 day │ • TTL: 1 hour │ • TTL: 5 min ││ • Memory/SSD │ • SSD │ • HDD/Object store ││ • All edges │ • Regional │ • On-demand load │└─────────────────┴─────────────────┴─────────────────────────────┘Partitioning logic:
from collections import defaultdictimport time
class CachePartitioner: def __init__(self, thresholds): self.thresholds = thresholds # {'hot': 1000, 'warm': 100} self.access_counts = defaultdict(int) self.last_access = {}
def classify(self, key: str) -> str: """ Classify object based on access patterns. """ accesses = self.access_counts.get(key, 0)
if accesses >= self.thresholds['hot']: return 'hot' elif accesses >= self.thresholds['warm']: return 'warm' else: return 'cold'
def record_access(self, key: str): self.access_counts[key] += 1 self.last_access[key] = time.time()
def promote(self, key: str, from_tier: str, to_tier: str): """ Move object to higher tier. """ # Pre-fetch to all edges if promoting to hot if to_tier == 'hot': prefetch_to_edges(key)
def demote(self, key: str, from_tier: str, to_tier: str): """ Move object to lower tier. """ # Evict from edge caches if demoting from hot if from_tier == 'hot': evict_from_edges(key)Handling Traffic Spikes
Black Friday, product launches, and viral events create 10-100x traffic spikes.
Pre-warming Strategy
class CacheWarmer: def __init__(self, cdn_client, origin_client): self.cdn = cdn_client self.origin = origin_client
async def warm_cache(self, urls: list, priority: str = 'normal'): """ Pre-populate cache before expected spike. """ results = []
for url in urls: try: # Fetch from origin (not from cache) content = await self.origin.fetch(url)
# Push to CDN cache await self.cdn.store(url, content) results.append({'url': url, 'status': 'warmed'})
except Exception as e: results.append({'url': url, 'status': 'failed', 'error': str(e)})
return results
async def warm_by_pattern(self, patterns: list): """ Warm all URLs matching patterns. """ for pattern in patterns: urls = await self.get_urls_matching(pattern) await self.warm_cache(urls)Spike Detection and Auto-scaling
class TrafficSpikeHandler: def __init__(self, thresholds): self.thresholds = thresholds self.baseline = None
def detect_spike(self, current_rps: float) -> dict: if not self.baseline: self.baseline = current_rps return {'spike': False}
multiplier = current_rps / self.baseline
if multiplier > self.thresholds['spike']: return { 'spike': True, 'severity': 'high' if multiplier > 10 else 'medium', 'multiplier': multiplier }
return {'spike': False}
async def handle_spike(self, severity: str): """ Automatic actions during spike. """ if severity == 'high': # Extend TTLs await self.extend_ttls(multiplier=2) # Enable stale-while-revalidate for all await self.enable_stale_handlers() # Scale origin capacity await self.scale_origin(replicas=3)
elif severity == 'medium': # Moderate TTL extension await self.extend_ttls(multiplier=1.5)Cache Performance Under DDoS
DDoS attacks overwhelm origin through cache misses.
Cache-aware DDoS mitigation
class CacheAwareDDoSMitigation: def __init__(self, rate_limits): self.limits = rate_limits self.request_counts = defaultdict(int)
def analyze_request(self, request) -> dict: """ Analyze request for DDoS patterns. """ key = request.path
# Check if cached if self.is_cache_hit(key): # Cached content - low risk return {'action': 'serve', 'risk': 'low'}
# Check cache bypass attempts if self.is_cache_bypass(request): return {'action': 'block', 'reason': 'cache_bypass_attempt'}
# Check for cache poisoning attempt if self.has_malicious_headers(request): return {'action': 'sanitize', 'headers': self.sanitize(request)}
# Rate limit cache misses self.request_counts[key] += 1 if self.request_counts[key] > self.limits['miss_per_path']: return {'action': 'rate_limit', 'ttl': 60}
return {'action': 'forward', 'risk': 'medium'}
def is_cache_bypass(self, request) -> bool: """ Detect attempts to bypass cache. """ bypass_indicators = [ request.headers.get('Pragma') == 'no-cache', request.headers.get('Cache-Control') == 'no-cache', '?' in request.path and random_query_string(request), ] return any(bypass_indicators)Cost Modeling
Total Cost of Ownership
TCO = (Origin Bandwidth × Origin Cost) + (Edge Bandwidth × Edge Cost) + (Purge Operations × Purge Cost) + (Storage × Storage Cost) - (Savings from Reduced Origin Load)
Break-even Hit Ratio:Hit Ratio where edge costs = origin cost savings
Break-even = Edge Cost / (Origin Cost × Traffic Multiplier)
Example:- Origin: $0.08/GB- Edge: $0.02/GB- Traffic to origin reduced by factor of 10 at 90% hit ratio
Break-even = $0.02 / ($0.08 × 0.1) = 25% hit ratioCost Optimization Decision Matrix
| Hit Ratio | Action | Rationale |
|---|---|---|
| < 60% | Investigate cacheability | Cost exceeding value |
| 60-80% | Optimize TTLs | Moderate savings |
| 80-95% | Monitor and tune | Good performance |
| > 95% | Consider TTL reduction | May be over-caching |
Cache for Specific Workloads
Video Streaming Cache Patterns
HLS/DASH Segment Caching
HLS Manifest Structure:├── master.m3u8 → TTL: 10s (frequent updates)├── playlist.m3u8 → TTL: 5s (segment list changes)└── segments/ ├── seg001.ts → TTL: 1 year (immutable) ├── seg002.ts → TTL: 1 year (immutable) └── ...
DASH Manifest Structure:├── manifest.mpd → TTL: 10s└── segments/ ├── init.mp4 → TTL: 1 year └── seg_001.m4s → TTL: 1 yearConfiguration:
# Cache rules for streamingrules: - match: "*.m3u8" ttl: 10 stale_if_error: 300
- match: "*.mpd" ttl: 10 stale_if_error: 300
- match: "*.ts" ttl: 31536000 # 1 year immutable: true
- match: "*.m4s" ttl: 31536000 immutable: trueLive Streaming Considerations
class LiveStreamCacheManager: def __init__(self, segment_window=10): self.window = segment_window # Keep last N segments
def configure_live_cache(self, stream_id: str): """ Live streaming needs short TTL + stale handlers. """ return { 'manifest_ttl': 2, # 2 seconds 'segment_ttl': 300, # 5 minutes (live segments expire) 'stale_if_error': 10, # Brief fallback 'purge_on_stream_end': True }
async def purge_expired_segments(self, stream_id: str): """ Remove old segments from live stream. """ segments = await self.get_segment_list(stream_id)
if len(segments) > self.window: old_segments = segments[:-self.window] await self.purge(old_segments)API Response Caching at Scale
Public API Caching
from functools import wrapsimport hashlib
def cache_response(ttl=60, vary_on=None): """ Decorator for API response caching. """ def decorator(func): @wraps(func) async def wrapper(request, *args, **kwargs): # Build cache key key = build_cache_key(request, vary_on)
# Check cache cached = await cache.get(key) if cached: return cached
# Execute function response = await func(request, *args, **kwargs)
# Store in cache await cache.set(key, response, ttl=ttl)
return response return wrapper return decorator
def build_cache_key(request, vary_on): """ Build cache key from request and vary headers. """ key_parts = [request.path]
if vary_on: for header in vary_on: key_parts.append(request.headers.get(header, ''))
if request.query_string: # Normalize query string normalized = normalize_query_string(request.query_string) key_parts.append(normalized)
return hashlib.sha256('|'.join(key_parts).encode()).hexdigest()Authenticated API Caching
class AuthenticatedAPICache: """ Cache responses per-user with privacy guarantees. """ def __init__(self): self.cache = {}
async def get_user_response(self, user_id: str, endpoint: str): key = f"user:{user_id}:{endpoint}" return await self.cache.get(key)
async def set_user_response(self, user_id: str, endpoint: str, response: dict, ttl: int): key = f"user:{user_id}:{endpoint}" await self.cache.set(key, response, ttl)
# Track for invalidation await self.track_user_key(user_id, key)
async def invalidate_user_cache(self, user_id: str): """ Invalidate all cache for a user. """ keys = await self.get_user_keys(user_id) await self.cache.delete_many(keys)GraphQL Query Caching
Persisted Queries
class PersistedQueryCache: """ Cache GraphQL queries by hash. """ def __init__(self): self.query_registry = {} # hash -> query
async def register_query(self, query: str) -> str: """ Register query and return hash. """ query_hash = hashlib.sha256(query.encode()).hexdigest() self.query_registry[query_hash] = query return query_hash
async def execute_persisted(self, query_hash: str, variables: dict): """ Execute persisted query with caching. """ if query_hash not in self.query_registry: raise QueryNotFoundError(query_hash)
query = self.query_registry[query_hash]
# Cache key includes query hash + variables hash cache_key = f"gql:{query_hash}:{hash_variables(variables)}"
cached = await cache.get(cache_key) if cached: return cached
result = await self.execute(query, variables) await cache.set(cache_key, result, ttl=self.get_query_ttl(query))
return result
def get_query_ttl(self, query: str) -> int: """ Determine TTL based on query type. """ if 'getUser(' in query: return 60 elif 'getProduct(' in query: return 300 elif 'search(' in query: return 30 return 60Automatic Persisted Queries (APQ)
class AutomaticPersistedQueries: """ Client sends hash first, server responds with query if not found. """ async def handle_request(self, request): query_hash = request.get('extensions', {}).get('persistedQuery', {}).get('sha256Hash')
if query_hash: # Try to get from cache result = await self.try_persisted(query_hash, request.get('variables')) if result: return result
# Query not found - client must send full query if not request.get('query'): return {'errors': [{'message': 'PersistedQueryNotFound'}]}
# Register for future await self.register(query_hash, request['query'])
# Execute normally return await self.execute(request['query'], request.get('variables'))Real-time Data Caching Challenges
Real-time data (stock prices, sports scores) has conflicting requirements: freshness vs cacheability.
class RealtimeCacheStrategy: """ Cache real-time data with strict invalidation. """ def __init__(self, max_staleness_ms=100): self.max_staleness = max_staleness_ms self.last_update = {}
async def get_with_freshness_check(self, key: str): """ Return data only if fresh enough. """ cached = await cache.get(key)
if not cached: return None
staleness_ms = (time.time() - self.last_update[key]) * 1000
if staleness_ms > self.max_staleness: # Stale - trigger async refresh asyncio.create_task(self.refresh(key))
# Return stale with warning cached['_stale'] = True cached['_staleness_ms'] = staleness_ms
return cached
async def refresh(self, key: str): """ Background refresh from source. """ fresh_data = await self.fetch_from_source(key) await cache.set(key, fresh_data) self.last_update[key] = time.time()Edge Compute + Cache Patterns
Execute logic at edge before/after cache lookup.
// Edge function: Pre-cache logicasync function handleRequest(event) { const request = event.request;
// Pre-cache: Check authentication const user = await authenticate(request); if (!user) { return new Response('Unauthorized', { status: 401 }); }
// Personalized cache key const cacheKey = `user:${user.id}:${request.url}`;
// Check cache const cache = caches.default; let response = await cache.match(cacheKey);
if (response) { // Post-cache: Add personalization header response = new Response(response.body, response); response.headers.set('X-User-ID', user.id); return response; }
// Cache miss - fetch from origin response = await fetch(request);
// Post-cache: Transform response response = await transformResponse(response, user);
// Cache with user-specific key const headers = new Headers(response.headers); headers.set('Cache-Control', 'private, max-age=300');
event.waitUntil( cache.put(cacheKey, new Response(response.body, { headers })) );
return response;}Advanced Observability
Distributed Tracing Across Cache Layers
from opentelemetry import tracefrom opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
tracer = trace.get_tracer(__name__)propagator = TraceContextTextMapPropagator()
class TracedCacheLookup: async def lookup(self, request): """ Distributed trace through cache hierarchy. """ with tracer.start_as_current_span("cache.lookup") as span: span.set_attribute("cache.key", request.cache_key) span.set_attribute("cache.tier", "edge")
# Local edge lookup with tracer.start_span("edge_cache.get"): result = await self.edge_cache.get(request.cache_key) span.set_attribute("cache.hit", result is not None)
if result: span.set_attribute("cache.result", "hit") return result
# Regional edge lookup with tracer.start_span("regional_cache.get"): result = await self.regional_cache.get(request.cache_key)
if result: span.set_attribute("cache.result", "regional_hit") return result
# Origin fetch with tracer.start_span("origin.fetch") as origin_span: # Propagate trace context to origin headers = {} propagator.inject(headers)
result = await self.origin.fetch( request, headers={'traceparent': headers.get('traceparent')} ) origin_span.set_attribute("http.status_code", result.status)
span.set_attribute("cache.result", "miss") return resultML-Based Cache Prediction
import numpy as npfrom sklearn.ensemble import RandomForestClassifier
class CachePredictor: """ Predict cache hits to optimize routing. """ def __init__(self): self.model = RandomForestClassifier(n_estimators=100) self.feature_history = [] self.label_history = []
def extract_features(self, request): """ Features for prediction. """ return [ len(request.path), request.path.count('/'), 1 if '?' in request.path else 0, self.get_path_popularity(request.path), self.get_recent_hit_rate(request.path), request.headers.get('User-Agent', '').count('bot'), ]
def train(self, requests, outcomes): """ Train on historical data. outcomes: 1 for hit, 0 for miss """ features = [self.extract_features(r) for r in requests] self.model.fit(features, outcomes)
def predict_hit_probability(self, request): """ Predict probability of cache hit. """ features = np.array([self.extract_features(request)]) return self.model.predict_proba(features)[0][1]
async def route_with_prediction(self, request, cdn_options): """ Route to best CDN based on hit prediction. """ predictions = {}
for cdn in cdn_options: # CDN-specific prediction features = self.extract_features_for_cdn(request, cdn) predictions[cdn] = self.model.predict_proba([features])[0][1]
return max(predictions, key=predictions.get)Anomaly Detection in Cache Patterns
from collections import dequeimport statistics
class CacheAnomalyDetector: """ Detect anomalies in cache behavior. """ def __init__(self, window_size=1000, z_threshold=3): self.window_size = window_size self.z_threshold = z_threshold self.hit_ratios = deque(maxlen=window_size) self.latencies = deque(maxlen=window_size) self.error_rates = deque(maxlen=window_size)
def check_hit_ratio_anomaly(self, current_ratio): """ Detect sudden hit ratio changes. """ self.hit_ratios.append(current_ratio)
if len(self.hit_ratios) < 100: return None
mean = statistics.mean(self.hit_ratios) stdev = statistics.stdev(self.hit_ratios)
if stdev == 0: return None
z_score = (current_ratio - mean) / stdev
if abs(z_score) > self.z_threshold: return { 'type': 'hit_ratio_anomaly', 'current': current_ratio, 'expected': mean, 'z_score': z_score, 'direction': 'drop' if z_score < 0 else 'spike' }
return None
def check_latency_anomaly(self, current_latency): """ Detect latency spikes. """ self.latencies.append(current_latency)
if len(self.latencies) < 100: return None
p50 = statistics.median(self.latencies) p95 = np.percentile(list(self.latencies), 95)
if current_latency > p95 * 2: return { 'type': 'latency_anomaly', 'current': current_latency, 'p50': p50, 'p95': p95, 'severity': 'high' if current_latency > p95 * 5 else 'medium' }
return None
def detect_cache_bypass_attack(self, request_pattern): """ Detect systematic cache bypass attempts. """ indicators = [ request_pattern.get('unique_paths_per_ip', 0) > 100, request_pattern.get('cache_bypass_headers_rate', 0) > 0.5, request_pattern.get('random_query_strings_rate', 0) > 0.7, ]
if sum(indicators) >= 2: return { 'type': 'cache_bypass_attack', 'confidence': sum(indicators) / 3, 'indicators': indicators }
return NoneCapacity Forecasting Models
from scipy import statsimport pandas as pd
class CacheCapacityForecaster: """ Forecast cache capacity needs. """ def __init__(self, history_days=90): self.history_days = history_days self.data = []
def add_daily_metrics(self, date, metrics): """ Add daily metrics for forecasting. """ self.data.append({ 'date': date, 'traffic_gb': metrics['traffic_gb'], 'hit_ratio': metrics['hit_ratio'], 'unique_objects': metrics['unique_objects'], 'storage_gb': metrics['storage_gb'], 'origin_requests': metrics['origin_requests'], })
def forecast_traffic(self, days_ahead=30): """ Linear regression forecast for traffic. """ df = pd.DataFrame(self.data) df['day'] = (pd.to_datetime(df['date']) - pd.to_datetime(df['date'].min())).dt.days
slope, intercept, r_value, p_value, std_err = stats.linregress( df['day'], df['traffic_gb'] )
future_days = df['day'].max() + np.arange(1, days_ahead + 1) forecast = slope * future_days + intercept
return { 'forecast_gb': forecast.tolist(), 'confidence_interval': self.calculate_ci(forecast, std_err), 'r_squared': r_value ** 2, 'slope': slope, }
def forecast_storage_needs(self, days_ahead=30): """ Forecast storage capacity needs. """ df = pd.DataFrame(self.data)
# Factor in: traffic growth, hit ratio changes, object size distribution traffic_forecast = self.forecast_traffic(days_ahead)['forecast_gb'] avg_hit_ratio = df['hit_ratio'].mean() avg_object_size = df['storage_gb'].sum() / df['unique_objects'].sum()
# Storage grows with unique objects slope, intercept, _, _, _ = stats.linregress( range(len(df)), df['unique_objects'] )
future_objects = intercept + slope * (len(df) + days_ahead) storage_needed = future_objects * avg_object_size
return { 'current_storage_gb': df['storage_gb'].iloc[-1], 'forecast_storage_gb': storage_needed, 'additional_storage_gb': storage_needed - df['storage_gb'].iloc[-1], 'recommendation': self.get_capacity_recommendation(storage_needed), }
def get_capacity_recommendation(self, forecast_gb): """ Get actionable recommendation. """ current = self.data[-1]['storage_gb'] utilization = forecast_gb / (current * 1.5) # Assume 50% headroom
if utilization > 0.9: return 'CRITICAL: Provision additional storage immediately' elif utilization > 0.75: return 'WARNING: Plan storage expansion within 30 days' elif utilization > 0.6: return 'Monitor: Consider expansion planning' else: return 'OK: Sufficient capacity'Architecture Decision Records
ADR-001: When to Use Multiple CDNs
Status: Accepted
Context: Single CDN creates vendor lock-in and single point of failure. Multi-CDN adds complexity but improves resilience and potentially cost.
Decision:
| Traffic Level | SLA Requirement | Recommendation |
|---|---|---|
| < 10TB/month | 99% | Single CDN sufficient |
| 10-100TB/month | 99.5% | Two CDNs, failover |
| > 100TB/month | 99.9% | Two+ CDNs, active-active |
| Global audience | 99.99% | Three CDNs, geo-routing |
Consequences:
- Increased operational complexity (monitoring, configuration sync)
- Negotiating leverage with vendors
- Potential for inconsistent cache states
- Higher engineering overhead
ADR-002: TTL Strategy by Business Criticality
Status: Accepted
Context: TTL directly impacts freshness vs performance. Different content types have different tolerance for staleness.
Decision:
| Content Type | Business Impact | Max Staleness | TTL | Stale Handler |
|---|---|---|---|---|
| Pricing/Payment | Critical | 0s | 0 | stale-if-error=60 |
| Inventory | High | 30s | 30 | stale-while-revalidate=300 |
| Product details | Medium | 5min | 300 | stale-while-revalidate=3600 |
| Static assets | Low | ∞ | 31536000 | N/A |
| User content | Variable | 0-60s | Per-user | private |
Consequences:
- Higher origin load for critical content
- More complex cache rules
- Need for proactive invalidation on critical updates
- Reduced stale risk for important business flows
ADR-003: Cache Warming vs On-Demand Caching
Status: Accepted
Context: Cache warming pre-populates content before user requests. On-demand caching populates on first request. Each has costs and benefits.
Decision:
| Scenario | Strategy | Rationale |
|---|---|---|
| Product launch | Cache warming | Predictable spike, known URLs |
| Daily news | On-demand + warm top 100 | Unpredictable content, warm high-traffic |
| E-commerce catalog | Warm top 10K products | Predictable traffic patterns |
| User-generated content | On-demand | Cannot predict access |
| Video on demand | Warm manifest, on-demand segments | Hybrid approach |
Cache warming decision tree:
Is traffic predictable?├── Yes → Can you identify top content?│ ├── Yes → Warm top N items│ └── No → Monitor and warm on access pattern└── No → Is first-hit latency critical? ├── Yes → Speculative warming └── No → On-demand cachingConsequences:
- Warming requires origin capacity for bulk fetches
- Need to track warming success/failure
- May warm content never accessed (waste)
- First user experience improved for warmed content
ADR-004: Edge Compute vs Origin Processing
Status: Accepted
Context: Some processing can happen at edge (authentication, personalization, A/B testing) vs origin. Edge compute reduces latency but adds complexity.
Decision:
| Processing Type | Edge vs Origin | Factors |
|---|---|---|
| Authentication | Edge | Latency, offload origin |
| A/B test routing | Edge | Consistency, speed |
| Personalization | Edge | Latency, cache per-user |
| Complex business logic | Origin | Code complexity, data access |
| Database writes | Origin | Consistency, transactions |
| Image resize | Edge | Compute cost vs bandwidth |
Decision matrix:
def decide_processing_location(requirements): """ Determine if processing should be edge or origin. """ score = 0
# Latency sensitivity if requirements.latency_sensitivity == 'high': score += 3 elif requirements.latency_sensitivity == 'medium': score += 1
# Data access needs if requirements.needs_database_write: score -= 5 if requirements.needs_complex_query: score -= 3
# Code complexity if requirements.code_complexity == 'high': score -= 2
# Cacheability if requirements.response_cacheable: score += 2
return 'edge' if score > 0 else 'origin'Consequences:
- Edge compute requires different deployment model
- Debugging more complex across locations
- Reduced origin load for edge processing
- Need to manage state at edge
Frequently Asked Questions
How do I estimate cache storage for a new product?
Use the formula: Storage = (Expected Objects × Avg Size × Replication Factor). For a product with 100K pages averaging 50KB, replicated to 20 edges: 100K × 50KB × 20 = 100GB. Add 20% for metadata and indexes.
What hit ratio should I target for APIs? Public APIs: 80-90% with 1-5 minute TTLs. Authenticated APIs: 30-60% with user-specific cache keys. Real-time APIs: < 20% acceptable if staleness is critical.
How do I handle cache invalidation during deployments? Use content hashes in URLs for static assets (auto-invalidation). For dynamic content, implement deployment webhooks that trigger cache purges. Version your API endpoints (v1, v2) to allow parallel caching.
What’s the cost of invalidating 1M objects? At typical pricing ($0.005-0.01 per object): $5,000-10,000. Use surrogate keys to group objects and reduce individual purges. Batch invalidations over time windows.
How do multi-CDN architectures handle cache inconsistency? Implement event-sourced invalidation with confirmation from all CDNs. Use version vectors for conflict resolution. Accept eventual consistency for non-critical content.
Should I use stale-while-revalidate for all content? No. Use it for content where stale is acceptable (product descriptions, blog posts). Don’t use for critical real-time data (stock prices, inventory counts) where staleness has business impact.
How do I plan capacity for Black Friday? Forecast 5-10x normal traffic. Pre-warm cache with expected hot content. Extend TTLs temporarily. Enable aggressive stale handlers. Scale origin capacity 3x. Test with simulated load.
What metrics indicate cache problems? Hit ratio dropping > 5% suddenly. Origin traffic spiking without user traffic spike. TTFB P95 increasing. Purge queue backing up. Cache storage near capacity (> 85%).
How This Applies in Practice
Enterprise cache architecture balances performance, cost, and consistency. Multi-CDN provides resilience but requires synchronization. Event-sourced invalidation ensures reliable propagation. Capacity planning prevents outages during spikes.
Key metrics to track: hit ratio by content type, origin load ratio, invalidation latency, storage utilization, cost per GB served. Set alerts for anomalies. Review ADRs quarterly as traffic patterns evolve.
How to Implement with Azion
Azion provides enterprise cache capabilities:
- Edge Cache: Hierarchical caching with configurable TTLs and stale handlers
- Functions: JavaScript/Wasm for custom cache logic at edge
- Cache API: Bulk invalidation with surrogate keys
- Real-Time Metrics: Per-path hit ratio, TTFB distribution, storage usage
- Multi-CDN Integration: Configure origin shields and cache-aware routing
Example enterprise configuration:
Rules Engine:- If: Path matches /api/v1/products/* Then: Cache TTL = 300 Stale-While-Revalidate = 3600 Surrogate-Key = products
- If: Path matches /static/* Then: Cache TTL = 31536000 Immutable = true
- If: Path matches /live/* Then: Cache TTL = 2 Stale-If-Error = 10
Functions:- Pre-cache: Authentication check, personalized cache keys- Post-cache: Header modification, response transformation
Cache API:- Purge by surrogate key: POST /cache/purge {"keys": ["products"]}- Bulk purge: POST /cache/purge {"urls": ["..."], "soft": true}Learn more in the Azion Serverless Applications documentation.
Related Resources
- Edge Computing vs CDN
- What is JWT?
- Serverless Applications
- MDN: HTTP Caching
- RFC 7234: HTTP Caching
- OpenTelemetry: Distributed Tracing
Sources:
- RFC 7234. “Hypertext Transfer Protocol (HTTP/1.1): Caching.” IETF. 2014.
- CDN Planet. “Multi-CDN Architecture Patterns.” 2024.
- Netflix. “Scaling Video Cache Globally.” 2024.
- Shopify. “Black Friday Cache Strategy.” 2024.
- HTTP Archive. “Web Almanac 2024: Caching.” 2024.