强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

21 - 最佳实践

最佳实践

21.1 Key 设计规范

命名约定

# ✅ 推荐格式:业务:对象:ID:属性
user:1001:profile
user:1001:session
order:20260510:1001
product:sku:001:stock
cache:hot:homepage

# ❌ 不推荐
UserProfile_1001          # 混合大小写和下划线
user-profile-1001          # 连字符风格
u                          # 太短,含义不清
user:very:long:key:name:with:many:segments  # 层级太深

Key 命名规范表

规则说明示例
使用冒号分隔清晰的层级关系user:1001:name
全小写统一风格order:detail:1001
有意义的缩写控制 Key 长度usr:1001 (user)
避免特殊字符除冒号外不使用其他特殊字符user:1001user#1001!
长度控制尽量简短< 100 字符
使用业务前缀方便管理和扫描app:user:1001

Hash Tag 设计(集群模式)

# ✅ 使用 Hash Tag 确保相关 Key 在同一 Slot
{user:1001}:name
{user:1001}:email
{user:1001}:cart

# 这样可以进行批量操作
MGET {user:1001}:name {user:1001}:email

# ❌ 不使用 Hash Tag 导致无法批量操作
user:1001:name    # Slot A
user:1001:email   # Slot B(可能不同)

21.2 Value 设计规范

序列化策略对比

序列化方式优点缺点适用场景
JSON可读性好、通用性强体积较大跨语言、调试方便
MessagePack紧凑二进制、速度快不可读高性能场景
Protobuf极致紧凑、速度快需要定义 Schema高性能、固定结构
JDK 序列化Java 原生支持仅限 Java、体积大不推荐

JSON 序列化示例

import json
import redis

r = redis.Redis()

# 存储
user = {
    "id": 1001,
    "name": "张三",
    "age": 25,
    "created_at": "2026-05-10T10:00:00"
}
r.set("user:1001", json.dumps(user, ensure_ascii=False))

# 读取
data = json.loads(r.get("user:1001"))

MessagePack 示例

import msgpack
import redis

r = redis.Redis()

# 存储(比 JSON 小 30-50%)
data = {"id": 1001, "name": "张三", "scores": [90, 85, 92]}
r.set("user:1001", msgpack.packb(data))

# 读取
data = msgpack.unpackb(r.get("user:1001"), raw=False)

Value 大小控制

推荐 Value 大小:
├── String: < 10 KB(最优 < 1 KB)
├── Hash: < 5000 字段
├── List: < 10000 元素
├── Set: < 10000 元素
└── ZSet: < 10000 元素

⚠️ 注意:大 Value 会导致网络传输慢、内存碎片、主从同步延迟。超过 100 KB 的 Value 考虑压缩或拆分。

21.3 过期策略设计

TTL 设置原则

数据类型推荐 TTL说明
Session30 min - 2 hours用户会话
Token15 min - 24 hoursJWT/Access Token
验证码5 min短信/邮件验证码
缓存5 min - 2 hours热点数据缓存
临时数据1 min - 30 min限流、防重
持久数据不设置用户信息、配置

随机化 TTL(防止缓存雪崩)

import random

def set_with_random_ttl(r, key, value, base_ttl):
    """随机化 TTL,防止大量 Key 同时过期"""
    # 在 base_ttl 基础上随机 ±10%
    jitter = random.randint(-base_ttl // 10, base_ttl // 10)
    ttl = base_ttl + jitter
    r.set(key, value, ex=ttl)

21.4 缓存策略

Cache Aside Pattern(旁路缓存)

def get_user(user_id):
    cache_key = f"user:{user_id}"
    
    # 1. 先查缓存
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 2. 缓存未命中,查数据库
    user = db.query_user(user_id)
    if user:
        # 3. 写入缓存
        r.setex(cache_key, 3600, json.dumps(user))
    
    return user

def update_user(user_id, data):
    # 1. 先更新数据库
    db.update_user(user_id, data)
    
    # 2. 再删除缓存(不是更新!)
    r.delete(f"user:{user_id}")

Read/Write Through Pattern

class CacheManager:
    def __init__(self, redis_client, db):
        self.cache = redis_client
        self.db = db
    
    def get(self, key):
        value = self.cache.get(key)
        if value is None:
            value = self.db.get(key)
            if value:
                self.cache.setex(key, 3600, json.dumps(value))
        else:
            value = json.loads(value)
        return value
    
    def set(self, key, value, ttl=3600):
        self.db.set(key, value)
        self.cache.setex(key, ttl, json.dumps(value))
    
    def delete(self, key):
        self.db.delete(key)
        self.cache.delete(key)

缓存穿透防护

import redis

r = redis.Redis()

# 方案一:布隆过滤器
def get_with_bloom(key, bf_name="bf:keys"):
    # 先检查布隆过滤器
    if not r.execute_command('BF.EXISTS', bf_name, key):
        return None  # Key 一定不存在
    
    # 查询缓存
    value = r.get(key)
    if value:
        return json.loads(value)
    
    # 查询数据库
    value = db.get(key)
    if value:
        r.setex(key, 3600, json.dumps(value))
    
    return value

# 方案二:缓存空值
def get_with_null_cache(key):
    value = r.get(key)
    if value == "NULL":
        return None
    if value:
        return json.loads(value)
    
    value = db.get(key)
    if value is None:
        r.setex(key, 300, "NULL")  # 缓存空值 5 分钟
    else:
        r.setex(key, 3600, json.dumps(value))
    
    return value

缓存击穿防护

import threading

local_lock = threading.local()

def get_with_mutex(key, fetch_func):
    """使用互斥锁防止缓存击穿"""
    value = r.get(key)
    if value:
        return json.loads(value)
    
    lock_key = f"lock:{key}"
    if r.set(lock_key, "1", nx=True, ex=30):
        try:
            value = fetch_func()
            r.setex(key, 3600, json.dumps(value))
            return value
        finally:
            r.delete(lock_key)
    else:
        # 等待其他线程加载完缓存
        import time
        for _ in range(100):
            time.sleep(0.1)
            value = r.get(key)
            if value:
                return json.loads(value)
        return None

缓存雪崩防护

import random

def get_with_fallback(key, fetch_func):
    """带降级的缓存读取"""
    try:
        value = r.get(key)
        if value:
            return json.loads(value)
    except redis.RedisError:
        # Redis 不可用,直接查数据库
        pass
    
    value = fetch_func()
    
    try:
        # 随机化 TTL 防止雪崩
        ttl = 3600 + random.randint(-300, 300)
        r.setex(key, ttl, json.dumps(value))
    except redis.RedisError:
        pass
    
    return value

21.5 连接管理

连接池配置

import redis

# 生产环境连接池
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    password='yourpassword',
    db=0,
    max_connections=200,           # 最大连接数
    socket_timeout=5,              # 读写超时
    socket_connect_timeout=5,      # 连接超时
    retry_on_timeout=True,         # 超时重试
    health_check_interval=30,      # 健康检查间隔
    decode_responses=True,
)

r = redis.Redis(connection_pool=pool)

连接泄漏检测

# 监控连接数
def monitor_connections():
    info = r.info('clients')
    print(f"Connected: {info['connected_clients']}")
    print(f"Blocked: {info['blocked_clients']}")
    print(f"Tracking: {info.get('tracking_clients', 0)}")

# 查看客户端列表
def list_clients():
    clients = r.client_list()
    for c in clients:
        print(f"ID={c['id']} addr={c['addr']} idle={c['idle']} cmd={c.get('cmd', 'N/A')}")

21.6 命令使用规范

避免的危险命令

命令替代方案原因
KEYS *SCAN阻塞主线程
FLUSHDBFLUSHDB ASYNC阻塞主线程
FLUSHALL禁用或重命名极度危险
DEL (大 Key)UNLINK异步删除
HGETALL (大 Hash)HSCAN大数据量阻塞
SMEMBERS (大 Set)SSCAN大数据量阻塞
LRANGE 0 -1 (大 List)分页查询大数据量阻塞
DEBUG禁用危险命令
SAVEBGSAVE同步阻塞

使用 Pipeline 减少网络往返

# ❌ 逐条执行
for i in range(10000):
    r.set(f'key:{i}', f'value:{i}')

# ✅ Pipeline 批量执行
pipe = r.pipeline()
for i in range(10000):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()

21.7 生产环境 Checklist

部署前检查

□ Redis 版本 >= 6.0
□ 设置了 requirepass
□ bind 配置正确(非 0.0.0.0)
□ protected-mode 开启
□ 禁用了危险命令(FLUSHALL, FLUSHDB, DEBUG, CONFIG)
□ 配置了 maxmemory
□ 选择了合适的 maxmemory-policy
□ 开启了持久化(RDB + AOF 或混合持久化)
□ 配置了慢查询日志
□ 配置了连接数限制
□ 配置了 TCP keepalive
□ 防火墙规则正确
□ 内核参数优化(overcommit_memory, somaxconn, THP)

运维检查

□ 监控配置(Prometheus + Grafana)
□ 告警规则(内存、连接、复制、QPS)
□ 备份策略(定期 RDB 备份)
□ 日志轮转配置
□ 主从复制正常
□ 故障转移测试通过
□ 容量规划(内存增长趋势)
□ 变更流程文档

代码检查

□ 使用连接池
□ 设置了超时时间
□ Key 有 TTL(临时数据)
□ 避免了大 Key 操作
□ 使用 Pipeline 减少 RTT
□ 分布式锁有超时和释放逻辑
□ 缓存策略正确(先删缓存再更新 DB)
□ 异常处理(Redis 不可用时的降级方案)

📌 业务场景

场景一:新项目 Redis 接入

# 1. 设计 Key 命名规范
# 2. 选择序列化方式
# 3. 配置连接池
# 4. 实现缓存层
# 5. 添加监控告警

场景二:老项目 Redis 优化

# 1. 扫描大 Key
redis-cli --bigkeys

# 2. 检查慢查询
redis-cli SLOWLOG GET 10

# 3. 检查内存碎片
redis-cli INFO memory | grep mem_fragmentation_ratio

# 4. 优化命令使用(KEYS → SCAN, DEL → UNLINK)

🔗 扩展阅读