21 - 最佳实践
最佳实践
21.1 Key 设计规范
命名约定
# ✅ 推荐格式:业务:对象:ID:属性
user:1001:profile
user:1001:session
order:20260510:1001
product:sku:001:stock
cache:hot:homepage
# ❌ 不推荐
UserProfile_1001 # 混合大小写和下划线
user-profile-1001 # 连字符风格
u # 太短,含义不清
user:very:long:key:name:with:many:segments # 层级太深
Key 命名规范表
| 规则 | 说明 | 示例 |
|---|
| 使用冒号分隔 | 清晰的层级关系 | user:1001:name |
| 全小写 | 统一风格 | order:detail:1001 |
| 有意义的缩写 | 控制 Key 长度 | usr:1001 (user) |
| 避免特殊字符 | 除冒号外不使用其他特殊字符 | ✅ user:1001 ❌ user#1001! |
| 长度控制 | 尽量简短 | < 100 字符 |
| 使用业务前缀 | 方便管理和扫描 | app:user:1001 |
Hash Tag 设计(集群模式)
# ✅ 使用 Hash Tag 确保相关 Key 在同一 Slot
{user:1001}:name
{user:1001}:email
{user:1001}:cart
# 这样可以进行批量操作
MGET {user:1001}:name {user:1001}:email
# ❌ 不使用 Hash Tag 导致无法批量操作
user:1001:name # Slot A
user:1001:email # Slot B(可能不同)
21.2 Value 设计规范
序列化策略对比
| 序列化方式 | 优点 | 缺点 | 适用场景 |
|---|
| JSON | 可读性好、通用性强 | 体积较大 | 跨语言、调试方便 |
| MessagePack | 紧凑二进制、速度快 | 不可读 | 高性能场景 |
| Protobuf | 极致紧凑、速度快 | 需要定义 Schema | 高性能、固定结构 |
| JDK 序列化 | Java 原生支持 | 仅限 Java、体积大 | 不推荐 |
JSON 序列化示例
import json
import redis
r = redis.Redis()
# 存储
user = {
"id": 1001,
"name": "张三",
"age": 25,
"created_at": "2026-05-10T10:00:00"
}
r.set("user:1001", json.dumps(user, ensure_ascii=False))
# 读取
data = json.loads(r.get("user:1001"))
MessagePack 示例
import msgpack
import redis
r = redis.Redis()
# 存储(比 JSON 小 30-50%)
data = {"id": 1001, "name": "张三", "scores": [90, 85, 92]}
r.set("user:1001", msgpack.packb(data))
# 读取
data = msgpack.unpackb(r.get("user:1001"), raw=False)
Value 大小控制
推荐 Value 大小:
├── String: < 10 KB(最优 < 1 KB)
├── Hash: < 5000 字段
├── List: < 10000 元素
├── Set: < 10000 元素
└── ZSet: < 10000 元素
⚠️ 注意:大 Value 会导致网络传输慢、内存碎片、主从同步延迟。超过 100 KB 的 Value 考虑压缩或拆分。
21.3 过期策略设计
TTL 设置原则
| 数据类型 | 推荐 TTL | 说明 |
|---|
| Session | 30 min - 2 hours | 用户会话 |
| Token | 15 min - 24 hours | JWT/Access Token |
| 验证码 | 5 min | 短信/邮件验证码 |
| 缓存 | 5 min - 2 hours | 热点数据缓存 |
| 临时数据 | 1 min - 30 min | 限流、防重 |
| 持久数据 | 不设置 | 用户信息、配置 |
随机化 TTL(防止缓存雪崩)
import random
def set_with_random_ttl(r, key, value, base_ttl):
"""随机化 TTL,防止大量 Key 同时过期"""
# 在 base_ttl 基础上随机 ±10%
jitter = random.randint(-base_ttl // 10, base_ttl // 10)
ttl = base_ttl + jitter
r.set(key, value, ex=ttl)
21.4 缓存策略
Cache Aside Pattern(旁路缓存)
def get_user(user_id):
cache_key = f"user:{user_id}"
# 1. 先查缓存
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查数据库
user = db.query_user(user_id)
if user:
# 3. 写入缓存
r.setex(cache_key, 3600, json.dumps(user))
return user
def update_user(user_id, data):
# 1. 先更新数据库
db.update_user(user_id, data)
# 2. 再删除缓存(不是更新!)
r.delete(f"user:{user_id}")
Read/Write Through Pattern
class CacheManager:
def __init__(self, redis_client, db):
self.cache = redis_client
self.db = db
def get(self, key):
value = self.cache.get(key)
if value is None:
value = self.db.get(key)
if value:
self.cache.setex(key, 3600, json.dumps(value))
else:
value = json.loads(value)
return value
def set(self, key, value, ttl=3600):
self.db.set(key, value)
self.cache.setex(key, ttl, json.dumps(value))
def delete(self, key):
self.db.delete(key)
self.cache.delete(key)
缓存穿透防护
import redis
r = redis.Redis()
# 方案一:布隆过滤器
def get_with_bloom(key, bf_name="bf:keys"):
# 先检查布隆过滤器
if not r.execute_command('BF.EXISTS', bf_name, key):
return None # Key 一定不存在
# 查询缓存
value = r.get(key)
if value:
return json.loads(value)
# 查询数据库
value = db.get(key)
if value:
r.setex(key, 3600, json.dumps(value))
return value
# 方案二:缓存空值
def get_with_null_cache(key):
value = r.get(key)
if value == "NULL":
return None
if value:
return json.loads(value)
value = db.get(key)
if value is None:
r.setex(key, 300, "NULL") # 缓存空值 5 分钟
else:
r.setex(key, 3600, json.dumps(value))
return value
缓存击穿防护
import threading
local_lock = threading.local()
def get_with_mutex(key, fetch_func):
"""使用互斥锁防止缓存击穿"""
value = r.get(key)
if value:
return json.loads(value)
lock_key = f"lock:{key}"
if r.set(lock_key, "1", nx=True, ex=30):
try:
value = fetch_func()
r.setex(key, 3600, json.dumps(value))
return value
finally:
r.delete(lock_key)
else:
# 等待其他线程加载完缓存
import time
for _ in range(100):
time.sleep(0.1)
value = r.get(key)
if value:
return json.loads(value)
return None
缓存雪崩防护
import random
def get_with_fallback(key, fetch_func):
"""带降级的缓存读取"""
try:
value = r.get(key)
if value:
return json.loads(value)
except redis.RedisError:
# Redis 不可用,直接查数据库
pass
value = fetch_func()
try:
# 随机化 TTL 防止雪崩
ttl = 3600 + random.randint(-300, 300)
r.setex(key, ttl, json.dumps(value))
except redis.RedisError:
pass
return value
21.5 连接管理
连接池配置
import redis
# 生产环境连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
password='yourpassword',
db=0,
max_connections=200, # 最大连接数
socket_timeout=5, # 读写超时
socket_connect_timeout=5, # 连接超时
retry_on_timeout=True, # 超时重试
health_check_interval=30, # 健康检查间隔
decode_responses=True,
)
r = redis.Redis(connection_pool=pool)
连接泄漏检测
# 监控连接数
def monitor_connections():
info = r.info('clients')
print(f"Connected: {info['connected_clients']}")
print(f"Blocked: {info['blocked_clients']}")
print(f"Tracking: {info.get('tracking_clients', 0)}")
# 查看客户端列表
def list_clients():
clients = r.client_list()
for c in clients:
print(f"ID={c['id']} addr={c['addr']} idle={c['idle']} cmd={c.get('cmd', 'N/A')}")
21.6 命令使用规范
避免的危险命令
| 命令 | 替代方案 | 原因 |
|---|
KEYS * | SCAN | 阻塞主线程 |
FLUSHDB | FLUSHDB ASYNC | 阻塞主线程 |
FLUSHALL | 禁用或重命名 | 极度危险 |
DEL (大 Key) | UNLINK | 异步删除 |
HGETALL (大 Hash) | HSCAN | 大数据量阻塞 |
SMEMBERS (大 Set) | SSCAN | 大数据量阻塞 |
LRANGE 0 -1 (大 List) | 分页查询 | 大数据量阻塞 |
DEBUG | 禁用 | 危险命令 |
SAVE | BGSAVE | 同步阻塞 |
使用 Pipeline 减少网络往返
# ❌ 逐条执行
for i in range(10000):
r.set(f'key:{i}', f'value:{i}')
# ✅ Pipeline 批量执行
pipe = r.pipeline()
for i in range(10000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()
21.7 生产环境 Checklist
部署前检查
□ Redis 版本 >= 6.0
□ 设置了 requirepass
□ bind 配置正确(非 0.0.0.0)
□ protected-mode 开启
□ 禁用了危险命令(FLUSHALL, FLUSHDB, DEBUG, CONFIG)
□ 配置了 maxmemory
□ 选择了合适的 maxmemory-policy
□ 开启了持久化(RDB + AOF 或混合持久化)
□ 配置了慢查询日志
□ 配置了连接数限制
□ 配置了 TCP keepalive
□ 防火墙规则正确
□ 内核参数优化(overcommit_memory, somaxconn, THP)
运维检查
□ 监控配置(Prometheus + Grafana)
□ 告警规则(内存、连接、复制、QPS)
□ 备份策略(定期 RDB 备份)
□ 日志轮转配置
□ 主从复制正常
□ 故障转移测试通过
□ 容量规划(内存增长趋势)
□ 变更流程文档
代码检查
□ 使用连接池
□ 设置了超时时间
□ Key 有 TTL(临时数据)
□ 避免了大 Key 操作
□ 使用 Pipeline 减少 RTT
□ 分布式锁有超时和释放逻辑
□ 缓存策略正确(先删缓存再更新 DB)
□ 异常处理(Redis 不可用时的降级方案)
📌 业务场景
场景一:新项目 Redis 接入
# 1. 设计 Key 命名规范
# 2. 选择序列化方式
# 3. 配置连接池
# 4. 实现缓存层
# 5. 添加监控告警
场景二:老项目 Redis 优化
# 1. 扫描大 Key
redis-cli --bigkeys
# 2. 检查慢查询
redis-cli SLOWLOG GET 10
# 3. 检查内存碎片
redis-cli INFO memory | grep mem_fragmentation_ratio
# 4. 优化命令使用(KEYS → SCAN, DEL → UNLINK)
🔗 扩展阅读