强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 完全指南 / 21 - 最佳实践

最佳实践

21.1 Key 设计规范

命名约定

# ✅ 推荐格式:业务:对象:ID:属性
user:1001:profile
user:1001:session
order:20260510:1001
product:sku:001:stock
cache:hot:homepage

# ❌ 不推荐
UserProfile_1001          # 混合大小写和下划线
user-profile-1001          # 连字符风格
u                          # 太短,含义不清
user:very:long:key:name:with:many:segments  # 层级太深

Key 命名规范表

规则 说明 示例
使用冒号分隔 清晰的层级关系 user:1001:name
全小写 统一风格 order:detail:1001
有意义的缩写 控制 Key 长度 usr:1001 (user)
避免特殊字符 除冒号外不使用其他特殊字符 user:1001user#1001!
长度控制 尽量简短 < 100 字符
使用业务前缀 方便管理和扫描 app:user:1001

Hash Tag 设计(集群模式)

# ✅ 使用 Hash Tag 确保相关 Key 在同一 Slot
{user:1001}:name
{user:1001}:email
{user:1001}:cart

# 这样可以进行批量操作
MGET {user:1001}:name {user:1001}:email

# ❌ 不使用 Hash Tag 导致无法批量操作
user:1001:name    # Slot A
user:1001:email   # Slot B(可能不同)

21.2 Value 设计规范

序列化策略对比

序列化方式 优点 缺点 适用场景
JSON 可读性好、通用性强 体积较大 跨语言、调试方便
MessagePack 紧凑二进制、速度快 不可读 高性能场景
Protobuf 极致紧凑、速度快 需要定义 Schema 高性能、固定结构
JDK 序列化 Java 原生支持 仅限 Java、体积大 不推荐

JSON 序列化示例

import json
import redis

r = redis.Redis()

# 存储
user = {
    "id": 1001,
    "name": "张三",
    "age": 25,
    "created_at": "2026-05-10T10:00:00"
}
r.set("user:1001", json.dumps(user, ensure_ascii=False))

# 读取
data = json.loads(r.get("user:1001"))

MessagePack 示例

import msgpack
import redis

r = redis.Redis()

# 存储(比 JSON 小 30-50%)
data = {"id": 1001, "name": "张三", "scores": [90, 85, 92]}
r.set("user:1001", msgpack.packb(data))

# 读取
data = msgpack.unpackb(r.get("user:1001"), raw=False)

Value 大小控制

推荐 Value 大小:
├── String: < 10 KB(最优 < 1 KB)
├── Hash: < 5000 字段
├── List: < 10000 元素
├── Set: < 10000 元素
└── ZSet: < 10000 元素

⚠️ 注意:大 Value 会导致网络传输慢、内存碎片、主从同步延迟。超过 100 KB 的 Value 考虑压缩或拆分。

21.3 过期策略设计

TTL 设置原则

数据类型 推荐 TTL 说明
Session 30 min - 2 hours 用户会话
Token 15 min - 24 hours JWT/Access Token
验证码 5 min 短信/邮件验证码
缓存 5 min - 2 hours 热点数据缓存
临时数据 1 min - 30 min 限流、防重
持久数据 不设置 用户信息、配置

随机化 TTL(防止缓存雪崩)

import random

def set_with_random_ttl(r, key, value, base_ttl):
    """随机化 TTL,防止大量 Key 同时过期"""
    # 在 base_ttl 基础上随机 ±10%
    jitter = random.randint(-base_ttl // 10, base_ttl // 10)
    ttl = base_ttl + jitter
    r.set(key, value, ex=ttl)

21.4 缓存策略

Cache Aside Pattern(旁路缓存)

def get_user(user_id):
    cache_key = f"user:{user_id}"
    
    # 1. 先查缓存
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 2. 缓存未命中,查数据库
    user = db.query_user(user_id)
    if user:
        # 3. 写入缓存
        r.setex(cache_key, 3600, json.dumps(user))
    
    return user

def update_user(user_id, data):
    # 1. 先更新数据库
    db.update_user(user_id, data)
    
    # 2. 再删除缓存(不是更新!)
    r.delete(f"user:{user_id}")

Read/Write Through Pattern

class CacheManager:
    def __init__(self, redis_client, db):
        self.cache = redis_client
        self.db = db
    
    def get(self, key):
        value = self.cache.get(key)
        if value is None:
            value = self.db.get(key)
            if value:
                self.cache.setex(key, 3600, json.dumps(value))
        else:
            value = json.loads(value)
        return value
    
    def set(self, key, value, ttl=3600):
        self.db.set(key, value)
        self.cache.setex(key, ttl, json.dumps(value))
    
    def delete(self, key):
        self.db.delete(key)
        self.cache.delete(key)

缓存穿透防护

import redis

r = redis.Redis()

# 方案一:布隆过滤器
def get_with_bloom(key, bf_name="bf:keys"):
    # 先检查布隆过滤器
    if not r.execute_command('BF.EXISTS', bf_name, key):
        return None  # Key 一定不存在
    
    # 查询缓存
    value = r.get(key)
    if value:
        return json.loads(value)
    
    # 查询数据库
    value = db.get(key)
    if value:
        r.setex(key, 3600, json.dumps(value))
    
    return value

# 方案二:缓存空值
def get_with_null_cache(key):
    value = r.get(key)
    if value == "NULL":
        return None
    if value:
        return json.loads(value)
    
    value = db.get(key)
    if value is None:
        r.setex(key, 300, "NULL")  # 缓存空值 5 分钟
    else:
        r.setex(key, 3600, json.dumps(value))
    
    return value

缓存击穿防护

import threading

local_lock = threading.local()

def get_with_mutex(key, fetch_func):
    """使用互斥锁防止缓存击穿"""
    value = r.get(key)
    if value:
        return json.loads(value)
    
    lock_key = f"lock:{key}"
    if r.set(lock_key, "1", nx=True, ex=30):
        try:
            value = fetch_func()
            r.setex(key, 3600, json.dumps(value))
            return value
        finally:
            r.delete(lock_key)
    else:
        # 等待其他线程加载完缓存
        import time
        for _ in range(100):
            time.sleep(0.1)
            value = r.get(key)
            if value:
                return json.loads(value)
        return None

缓存雪崩防护

import random

def get_with_fallback(key, fetch_func):
    """带降级的缓存读取"""
    try:
        value = r.get(key)
        if value:
            return json.loads(value)
    except redis.RedisError:
        # Redis 不可用,直接查数据库
        pass
    
    value = fetch_func()
    
    try:
        # 随机化 TTL 防止雪崩
        ttl = 3600 + random.randint(-300, 300)
        r.setex(key, ttl, json.dumps(value))
    except redis.RedisError:
        pass
    
    return value

21.5 连接管理

连接池配置

import redis

# 生产环境连接池
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    password='yourpassword',
    db=0,
    max_connections=200,           # 最大连接数
    socket_timeout=5,              # 读写超时
    socket_connect_timeout=5,      # 连接超时
    retry_on_timeout=True,         # 超时重试
    health_check_interval=30,      # 健康检查间隔
    decode_responses=True,
)

r = redis.Redis(connection_pool=pool)

连接泄漏检测

# 监控连接数
def monitor_connections():
    info = r.info('clients')
    print(f"Connected: {info['connected_clients']}")
    print(f"Blocked: {info['blocked_clients']}")
    print(f"Tracking: {info.get('tracking_clients', 0)}")

# 查看客户端列表
def list_clients():
    clients = r.client_list()
    for c in clients:
        print(f"ID={c['id']} addr={c['addr']} idle={c['idle']} cmd={c.get('cmd', 'N/A')}")

21.6 命令使用规范

避免的危险命令

命令 替代方案 原因
KEYS * SCAN 阻塞主线程
FLUSHDB FLUSHDB ASYNC 阻塞主线程
FLUSHALL 禁用或重命名 极度危险
DEL (大 Key) UNLINK 异步删除
HGETALL (大 Hash) HSCAN 大数据量阻塞
SMEMBERS (大 Set) SSCAN 大数据量阻塞
LRANGE 0 -1 (大 List) 分页查询 大数据量阻塞
DEBUG 禁用 危险命令
SAVE BGSAVE 同步阻塞

使用 Pipeline 减少网络往返

# ❌ 逐条执行
for i in range(10000):
    r.set(f'key:{i}', f'value:{i}')

# ✅ Pipeline 批量执行
pipe = r.pipeline()
for i in range(10000):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()

21.7 生产环境 Checklist

部署前检查

□ Redis 版本 >= 6.0
□ 设置了 requirepass
□ bind 配置正确(非 0.0.0.0)
□ protected-mode 开启
□ 禁用了危险命令(FLUSHALL, FLUSHDB, DEBUG, CONFIG)
□ 配置了 maxmemory
□ 选择了合适的 maxmemory-policy
□ 开启了持久化(RDB + AOF 或混合持久化)
□ 配置了慢查询日志
□ 配置了连接数限制
□ 配置了 TCP keepalive
□ 防火墙规则正确
□ 内核参数优化(overcommit_memory, somaxconn, THP)

运维检查

□ 监控配置(Prometheus + Grafana)
□ 告警规则(内存、连接、复制、QPS)
□ 备份策略(定期 RDB 备份)
□ 日志轮转配置
□ 主从复制正常
□ 故障转移测试通过
□ 容量规划(内存增长趋势)
□ 变更流程文档

代码检查

□ 使用连接池
□ 设置了超时时间
□ Key 有 TTL(临时数据)
□ 避免了大 Key 操作
□ 使用 Pipeline 减少 RTT
□ 分布式锁有超时和释放逻辑
□ 缓存策略正确(先删缓存再更新 DB)
□ 异常处理(Redis 不可用时的降级方案)

📌 业务场景

场景一:新项目 Redis 接入

# 1. 设计 Key 命名规范
# 2. 选择序列化方式
# 3. 配置连接池
# 4. 实现缓存层
# 5. 添加监控告警

场景二:老项目 Redis 优化

# 1. 扫描大 Key
redis-cli --bigkeys

# 2. 检查慢查询
redis-cli SLOWLOG GET 10

# 3. 检查内存碎片
redis-cli INFO memory | grep mem_fragmentation_ratio

# 4. 优化命令使用(KEYS → SCAN, DEL → UNLINK)

🔗 扩展阅读