第 16 章:生产最佳实践
第 16 章:生产最佳实践
16.1 缓存策略选型
Cache Aside Pattern(旁路缓存)
最广泛使用的缓存模式,由应用层管理缓存逻辑。
读流程:
1. 读缓存 → 命中 → 返回
2. 未命中 → 读数据库 → 写缓存 → 返回
写流程:
1. 更新数据库
2. 删除缓存(而非更新缓存)
def get_user(user_id):
"""Cache Aside 读取"""
# 1. 查缓存
cache_key = f"user:{user_id}"
data = mc.get(cache_key)
if data is not None:
return json.loads(data) if data != "NULL" else None
# 2. 查数据库
user = db.query_user(user_id)
# 3. 写缓存(空值也缓存,防穿透)
if user:
mc.set(cache_key, json.dumps(user), time=3600)
else:
mc.set(cache_key, "NULL", time=60)
return user
def update_user(user_id, updates):
"""Cache Aside 更新"""
# 1. 先更新数据库
db.update_user(user_id, updates)
# 2. 再删除缓存(不是更新!)
mc.delete(f"user:{user_id}")
为什么删除而不是更新缓存?
# ❌ 更新缓存:存在并发问题
def update_user_bad(user_id, updates):
db.update_user(user_id, updates)
mc.set(f"user:{user_id}", json.dumps(db.query_user(user_id)))
# 问题:如果另一个请求在 db.update 和 mc.set 之间读取了旧数据并写入缓存
# 结果:缓存中存的是旧数据
# ✅ 删除缓存:最终一致性
def update_user_good(user_id, updates):
db.update_user(user_id, updates)
mc.delete(f"user:{user_id}")
# 优势:下次读取时自动从 DB 加载最新数据
Read/Write Through
由缓存层统一管理读写,应用层不直接访问数据库。
class UserService:
def __init__(self, cache, db):
self.cache = cache
self.db = db
def get_user(self, user_id):
"""Read Through: 缓存层自动加载"""
return self.cache.get_or_load(
f"user:{user_id}",
lambda: self.db.query_user(user_id),
ttl=3600,
)
def update_user(self, user_id, updates):
"""Write Through: 缓存层自动同步"""
self.db.update_user(user_id, updates)
self.cache.delete(f"user:{user_id}")
Write Behind(异步写回)
import queue
import threading
class WriteBehindCache:
def __init__(self, cache, db, batch_size=100, flush_interval=5):
self.cache = cache
self.db = db
self.write_queue = queue.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
# 启动后台写入线程
self._start_writer()
def _start_writer(self):
def writer():
while True:
batch = []
deadline = time.time() + self.flush_interval
while len(batch) < self.batch_size and time.time() < deadline:
try:
timeout = max(0.1, deadline - time.time())
item = self.write_queue.get(timeout=timeout)
batch.append(item)
except queue.Empty:
break
if batch:
self.db.batch_update(batch)
t = threading.Thread(target=writer, daemon=True)
t.start()
def set(self, key, data, ttl=3600):
self.cache.set(key, json.dumps(data), time=ttl)
self.write_queue.put({"key": key, "data": data})
def get(self, key):
raw = self.cache.get(key)
return json.loads(raw) if raw else None
16.2 缓存策略选型指南
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Cache Aside | 通用场景 | 简单、灵活 | 应用层逻辑多 |
| Read/Write Through | 数据访问层封装 | 应用层简洁 | 需要封装层 |
| Write Behind | 高写入场景 | 写入延迟低 | 可能丢失数据 |
| Refresh Ahead | 热点数据 | 无冷启动 | 实现复杂 |
选择建议:
不确定 → Cache Aside ✓
需要统一数据层 → Read/Write Through ✓
写入量大 → Write Behind ✓
热点数据 → Cache Aside + 预热 ✓
16.3 Key 设计规范
命名规范
格式: {业务}:{类型}:{标识}[:{子标识}]
示例:
user:1001 # 用户信息
user:1001:profile # 用户资料
product:5001 # 商品信息
product:5001:price # 商品价格
session:abc123 # 会话
config:db # 数据库配置
rank:product:daily # 排行榜
rate:api:user:1001:20240101 # 限流计数器
lock:order:12345 # 分布式锁
Key 长度控制
# ❌ 不好的 Key
key = f"user_data_cache_for_user_id_{user_id}_with_full_profile_information"
# ✅ 好的 Key
key = f"user:{user_id}:profile"
# ❌ Key 太短,容易冲突
key = str(user_id) # "1001" —— 可能和其他业务冲突
# ❌ Key 包含特殊字符
key = f"user@{user_id}#info" # 避免特殊字符
# ✅ 只使用字母、数字、冒号、下划线、短横线
key = f"user:{user_id}:info"
Key 版本管理
# 当数据结构变更时,使用版本号
CACHE_VERSION = 3
def user_key(user_id):
return f"v{CACHE_VERSION}:user:{user_id}"
# 升级时只需修改 CACHE_VERSION,旧缓存自动失效
Key 分布优化
# ❌ 可能导致热点的 Key 设计
key = f"product:hot" # 所有请求打到同一节点
key = f"config:global" # 全局配置,单一 Key
# ✅ 分散热点
key = f"product:hot:{random.randint(0, 9)}" # 10 个副本分散到不同节点
key = f"config:global:{shard_id}" # 按分片存储
16.4 容量规划
内存需求计算
#!/usr/bin/env python3
"""Memcached 容量规划计算器"""
def calc_memcached_memory(
num_items: int,
avg_key_size: int,
avg_value_size: int,
growth_factor: float = 1.25,
overhead_ratio: float = 1.2,
) -> dict:
"""
计算 Memcached 内存需求
参数:
num_items: 预估 Item 数量
avg_key_size: 平均 Key 长度(字节)
avg_value_size: 平均 Value 长度(字节)
growth_factor: Slab 增长因子
overhead_ratio: 冗余比例(碎片 + 元数据)
"""
item_overhead = 48 # Item 头部开销
# 每个 Item 实际占用
item_size = item_overhead + avg_key_size + 8 + avg_value_size
# 选择最接近的 Slab chunk 大小
chunk_size = 96
while chunk_size < item_size:
chunk_size = int(chunk_size * growth_factor)
chunk_size = (chunk_size + 7) & ~7 # 8 字节对齐
# 内部碎片率
fragmentation = (chunk_size - item_size) / chunk_size * 100
# 总内存需求
raw_memory = num_items * chunk_size
total_memory = raw_memory * overhead_ratio
# 转换为 MB
total_mb = total_memory / (1024 * 1024)
# 推荐配置(留 20% 余量)
recommended_mb = int(total_mb * 1.2 / 64 + 1) * 64 # 向上取整到 64MB
return {
"item_size": item_size,
"chunk_size": chunk_size,
"fragmentation_pct": round(fragmentation, 1),
"raw_memory_mb": round(total_mb, 1),
"recommended_mb": recommended_mb,
}
# 示例计算
result = calc_memcached_memory(
num_items=1_000_000,
avg_key_size=30,
avg_value_size=200,
)
print(f"预估 Item 数量: 1,000,000")
print(f"平均 Item 大小: {result['item_size']}B")
print(f"Slab Chunk 大小: {result['chunk_size']}B")
print(f"内部碎片率: {result['fragmentation_pct']}%")
print(f"原始内存需求: {result['raw_memory_mb']}MB")
print(f"推荐配置 (-m): {result['recommended_mb']}MB")
连接数规划
连接数规划:
1. 统计应用实例数: N
2. 每个实例的连接池大小: C
3. 预留 20% 余量
推荐连接数 = N × C × 1.2
示例:
10 个应用实例,每个 50 个连接
推荐: 10 × 50 × 1.2 = 600
配置: -c 1000(留更大余量)
节点数规划
节点数规划:
1. 总内存需求: M MB
2. 单节点内存: m MB(推荐 4-16GB)
3. 节点数: N = ceil(M / m)
示例:
总需求 64GB,单节点 8GB
节点数: 64 / 8 = 8 台
冗余考虑:
如果需要故障时容量仍足够 → 额外 1-2 台
8 + 2 = 10 台
16.5 缓存预热
冷启动问题
场景: 服务重启 / 新增节点
问题: 缓存为空,所有请求都回源数据库
结果: 数据库压力激增,可能被压垮
预热策略
class CacheWarmer:
"""缓存预热器"""
def __init__(self, cache, db):
self.cache = cache
self.db = db
def warmup(self, keys, loader, ttl=3600, batch_size=100):
"""批量预热"""
total = len(keys)
loaded = 0
for i in range(0, total, batch_size):
batch_keys = keys[i:i + batch_size]
# 检查哪些 Key 已在缓存中
existing = self.cache.get_multi(batch_keys)
missing = [k for k in batch_keys if k not in existing]
if missing:
# 从数据库加载
items = loader(missing)
self.cache.set_multi(items, ttl=ttl)
loaded += len(missing)
progress = min(i + batch_size, total)
print(f"预热进度: {progress}/{total}, 新加载: {loaded}")
def warmup_hot_data(self, top_n=10000):
"""预热热点数据"""
# 从数据库加载最热门的数据
hot_items = self.db.query(
"SELECT * FROM products ORDER BY view_count DESC LIMIT %s",
(top_n,)
)
items = {f"product:{p['id']}": json.dumps(p) for p in hot_items}
self.cache.set_multi(items, ttl=7200)
print(f"预热完成: {len(items)} 条热点数据")
# 使用
warmer = CacheWarmer(mc, db)
# 服务启动时预热
if is_startup:
hot_ids = db.query("SELECT id FROM users WHERE is_vip = true")
keys = [f"user:{u['id']}" for u in hot_ids]
warmer.warmup(keys, lambda ids: db.batch_get_users(ids))
16.6 缓存穿透/击穿/雪崩防护
综合防护方案
class ResilientCache:
"""带防护的缓存客户端"""
def __init__(self, mc, db):
self.mc = mc
self.db = db
self._locks = {}
def get_user(self, user_id):
cache_key = f"user:{user_id}"
# 1. 查缓存
data = self.mc.get(cache_key)
if data is not None:
if data == b"NULL":
return None # 空值缓存
return json.loads(data)
# 2. 布隆过滤器检查(如有)
# if not bloom_filter.exists(cache_key):
# return None
# 3. 获取分布式锁(防击穿)
lock_key = f"lock:{cache_key}"
if not self.mc.add(lock_key, "1", time=10):
# 等待其他进程加载
import time
for _ in range(50):
time.sleep(0.1)
data = self.mc.get(cache_key)
if data:
return json.loads(data) if data != b"NULL" else None
# 超时直接查询
return self._load_and_cache(user_id, cache_key)
try:
return self._load_and_cache(user_id, cache_key)
finally:
self.mc.delete(lock_key)
def _load_and_cache(self, user_id, cache_key):
user = self.db.query_user(user_id)
if user:
# 使用随机 TTL 防雪崩
import random
jitter = random.randint(-300, 300)
ttl = max(60, 3600 + jitter)
self.mc.set(cache_key, json.dumps(user), time=ttl)
else:
# 缓存空值(防穿透)
self.mc.set(cache_key, "NULL", time=60)
return user
16.7 故障处理
客户端降级
class DegradeableCache:
"""支持降级的缓存客户端"""
def __init__(self, mc, db):
self.mc = mc
self.db = db
self._healthy = True
self._fail_count = 0
self._fail_threshold = 5
def get_with_fallback(self, key, loader, ttl=3600):
"""缓存读取,失败时降级到数据库"""
if not self._healthy:
return loader()
try:
data = self.mc.get(key)
if data:
return json.loads(data)
except Exception as e:
self._record_failure()
logger.warning(f"Cache read failed: {e}")
# 缓存未命中或失败,回源
data = loader()
# 尝试回写缓存
try:
self.mc.set(key, json.dumps(data), time=ttl)
except Exception:
pass
return data
def _record_failure(self):
self._fail_count += 1
if self._fail_count >= self._fail_threshold:
self._healthy = False
logger.error("Cache marked as unhealthy, falling back to DB")
# 启动恢复检查
threading.Timer(30, self._check_recovery).start()
def _check_recovery(self):
try:
self.mc.set("__health_check__", "1", time=10)
self._healthy = True
self._fail_count = 0
logger.info("Cache recovered")
except Exception:
threading.Timer(30, self._check_recovery).start()
灰度发布
def get_with_shadow_cache(user_id):
"""灰度对比新旧缓存"""
old_key = f"user:{user_id}"
new_key = f"v2:user:{user_id}"
old_data = old_mc.get(old_key)
if feature_flag.is_enabled("use_new_cache", user_id):
new_data = new_mc.get(new_key)
if new_data != old_data:
logger.warning(f"Cache mismatch: user={user_id}")
return json.loads(new_data) if new_data else json.loads(old_data)
return json.loads(old_data)
16.8 团队协作规范
缓存 Key 注册表
# cache_keys.py — 集中管理所有缓存 Key
class CacheKeys:
"""缓存 Key 定义"""
# 用户相关
USER = "user:{user_id}" # TTL: 1h
USER_PROFILE = "user:{user_id}:profile" # TTL: 30min
USER_SESSION = "session:{token}" # TTL: 30min
# 商品相关
PRODUCT = "product:{product_id}" # TTL: 1h
PRODUCT_PRICE = "product:{product_id}:price" # TTL: 5min
PRODUCT_RANK = "rank:product:daily" # TTL: 1min
# 配置相关
CONFIG = "config:{key}" # TTL: 5min
FEATURE_FLAG = "flag:{feature}" # TTL: 1min
# 限流
RATE_LIMIT = "rate:{service}:{user}:{window}" # TTL: 1min
# 分布式锁
LOCK = "lock:{resource}:{id}" # TTL: 10s
@classmethod
def user_key(cls, user_id):
return cls.USER.format(user_id=user_id)
缓存使用规范
# Memcached 使用规范
## 1. Key 设计
- 使用 `{业务}:{类型}:{ID}` 格式
- Key 长度 10-60 字节
- 只使用字母、数字、冒号、下划线
- 所有 Key 必须在 `CacheKeys` 中注册
## 2. TTL 设置
- 默认 TTL: 3600 秒(1 小时)
- 最短 TTL: 60 秒
- 最长 TTL: 86400 秒(1 天)
- 配置类: 300 秒(5 分钟)
- 热点数据: 60 秒(1 分钟)
## 3. Value 大小
- 推荐: < 10KB
- 最大: 500KB
- 超过 100KB 考虑压缩
- 超过 500KB 考虑拆分
## 4. 序列化
- 使用 JSON(可读性好)
- 超过 100 个字段使用 MessagePack
- Value 中包含版本号
## 5. 缓存策略
- 读: Cache Aside
- 写: 先更新 DB,再删除缓存
- 禁止: 缓存和 DB 双写(无事务保证)
## 6. 监控告警
- 命中率 < 80% 告警
- 内存 > 90% 告警
- 淘汰 > 0 告警
16.9 生产检查清单
部署前检查
#!/bin/bash
echo "=== Memcached 生产部署检查 ==="
# 1. 安全检查
echo "1. 安全配置"
echo " - 监听地址: $(echo 'stats settings' | nc localhost 11211 | grep inter)"
echo " - UDP 端口: $(echo 'stats settings' | nc localhost 11211 | grep udpport)"
# 2. 性能配置
echo "2. 性能配置"
echo " - 线程数: $(echo 'stats settings' | nc localhost 11211 | grep num_threads)"
echo " - 最大连接: $(echo 'stats settings' | nc localhost 11211 | grep maxconns)"
echo " - 最大内存: $(echo 'stats settings' | nc localhost 11211 | grep maxbytes)"
# 3. LRU 配置
echo "3. LRU 配置"
echo " - lru_maintainer: $(echo 'stats settings' | nc localhost 11211 | grep lru_maintainer)"
echo " - slab_automove: $(echo 'stats settings' | nc localhost 11211 | grep slab_automove)"
echo " - lru_crawler: $(echo 'stats settings' | nc localhost 11211 | grep lru_crawler)"
# 4. 连接验证
echo "4. 连接验证"
echo " - 当前连接: $(echo 'stats' | nc localhost 11211 | grep curr_connections)"
echo "=== 检查完成 ==="
16.10 全教程总结
Memcached 核心要点回顾:
1. 简单: 只做缓存,做到极致
2. 内存: Slab 分配器,无碎片
3. 快速: O(1) 查找,多线程,事件驱动
4. 分布: 客户端一致性哈希分片
5. 淘汰: 四级 LRU,TEMP 独立淘汰
6. 协议: Meta 协议最高效
7. 安全: 内网隔离 + SASL + TLS
8. 监控: Prometheus + 命中率告警
9. 策略: Cache Aside + 缓存空值 + 随机 TTL
10. Key: 简洁、有结构、版本化
扩展阅读
小结
| 要点 | 内容 |
|---|---|
| 缓存策略 | Cache Aside(通用)、Write Behind(高写入) |
| Key 规范 | {业务}:{类型}:{ID},10-60 字节 |
| 容量规划 | 内存 = Item数 × chunk大小 × 1.2 |
| 防护体系 | 缓存空值 + 分布式锁 + 随机 TTL + 降级 |
| 团队协作 | Key 注册表 + 使用规范 + 告警规则 |