强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 完全指南 / 22 - 实战场景

实战场景

本章将前面学到的所有知识综合运用,实现生产级别的 Redis 实战场景。

22.1 缓存系统

多级缓存架构

请求 → 本地缓存 → Redis 缓存 → 数据库
              ↓ miss    ↓ miss     ↓
            Redis    数据库      返回结果
              ↓ hit     ↓ hit      ↓
            返回结果  写回缓存   写回缓存

完整实现(Python)

import redis
import json
import time
import hashlib
from functools import wraps
from typing import Optional, Any

class MultiLevelCache:
    """多级缓存实现"""
    
    def __init__(self, redis_client, local_ttl=30, redis_ttl=300):
        self.redis = redis_client
        self.local_cache = {}
        self.local_ttl = local_ttl
        self.redis_ttl = redis_ttl
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        # 1. 查本地缓存
        if key in self.local_cache:
            value, timestamp = self.local_cache[key]
            if time.time() - timestamp < self.local_ttl:
                return value
            del self.local_cache[key]
        
        # 2. 查 Redis 缓存
        try:
            redis_value = self.redis.get(key)
            if redis_value:
                value = json.loads(redis_value)
                # 回写本地缓存
                self.local_cache[key] = (value, time.time())
                return value
        except redis.RedisError:
            pass
        
        return None
    
    def set(self, key: str, value: Any, ttl: int = None):
        """设置缓存"""
        # 写本地缓存
        self.local_cache[key] = (value, time.time())
        
        # 写 Redis 缓存
        try:
            self.redis.setex(
                key, 
                ttl or self.redis_ttl, 
                json.dumps(value, ensure_ascii=False)
            )
        except redis.RedisError:
            pass
    
    def delete(self, key: str):
        """删除缓存"""
        self.local_cache.pop(key, None)
        try:
            self.redis.delete(key)
        except redis.RedisError:
            pass


def cached(cache: MultiLevelCache, prefix: str, ttl: int = 300):
    """缓存装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存 Key
            cache_key = f"{prefix}:{hashlib.md5(str(args).encode()).hexdigest()}"
            
            # 尝试获取缓存
            result = cache.get(cache_key)
            if result is not None:
                return result
            
            # 调用原函数
            result = func(*args, **kwargs)
            
            # 写入缓存
            if result is not None:
                cache.set(cache_key, result, ttl)
            
            return result
        return wrapper
    return decorator


# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
cache = MultiLevelCache(r, local_ttl=30, redis_ttl=300)

@cached(cache, prefix='user', ttl=3600)
def get_user(user_id: int):
    """获取用户信息(带多级缓存)"""
    # 模拟数据库查询
    time.sleep(0.1)
    return {"id": user_id, "name": f"User_{user_id}", "email": f"user{user_id}@example.com"}

# 测试
user = get_user(1001)  # 第一次:查数据库
user = get_user(1001)  # 第二次:命中本地缓存

22.2 排行榜系统

完整排行榜实现

import redis
import json
import time
from typing import List, Dict, Optional

class Leaderboard:
    """实时排行榜"""
    
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.key = f"leaderboard:{name}"
        self.info_key = f"leaderboard:{name}:info"
    
    def add_score(self, user_id: str, score: float) -> bool:
        """添加/更新分数"""
        return bool(self.redis.zadd(self.key, {user_id: score}))
    
    def increment_score(self, user_id: str, delta: float) -> float:
        """增加分数"""
        return self.redis.zincrby(self.key, delta, user_id)
    
    def get_score(self, user_id: str) -> Optional[float]:
        """获取分数"""
        score = self.redis.zscore(self.key, user_id)
        return float(score) if score is not None else None
    
    def get_rank(self, user_id: str) -> Optional[int]:
        """获取排名(从 1 开始,分数最高排名 1)"""
        rank = self.redis.zrevrank(self.key, user_id)
        return rank + 1 if rank is not None else None
    
    def get_top_n(self, n: int = 10) -> List[Dict]:
        """获取 Top N"""
        results = self.redis.zrevrange(self.key, 0, n - 1, withscores=True)
        return [
            {"rank": i + 1, "user_id": user_id, "score": score}
            for i, (user_id, score) in enumerate(results)
        ]
    
    def get_user_rank_range(self, user_id: str, range_n: int = 5) -> List[Dict]:
        """获取用户附近的排名"""
        rank = self.redis.zrevrank(self.key, user_id)
        if rank is None:
            return []
        
        start = max(0, rank - range_n)
        end = rank + range_n
        
        results = self.redis.zrevrange(self.key, start, end, withscores=True)
        return [
            {"rank": start + i + 1, "user_id": uid, "score": score}
            for i, (uid, score) in enumerate(results)
        ]
    
    def get_total(self) -> int:
        """获取总人数"""
        return self.redis.zcard(self.key)
    
    def remove(self, user_id: str) -> bool:
        """移除用户"""
        return bool(self.redis.zrem(self.key, user_id))
    
    def get_score_range(self, min_score: float, max_score: float) -> List[Dict]:
        """分数区间查询"""
        results = self.redis.zrevrangebyscore(
            self.key, max_score, min_score, withscores=True
        )
        return [
            {"user_id": uid, "score": score}
            for uid, score in results
        ]


# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
lb = Leaderboard(r, "game:daily")

# 添加玩家分数
lb.add_score("player:1001", 1500)
lb.add_score("player:1002", 1200)
lb.add_score("player:1003", 1800)
lb.add_score("player:1004", 950)
lb.add_score("player:1005", 1650)

# 增加分数
lb.increment_score("player:1001", 100)

# 获取排名
print(f"Player 1001 rank: {lb.get_rank('player:1001')}")

# 获取 Top 3
top3 = lb.get_top_n(3)
for entry in top3:
    print(f"#{entry['rank']} {entry['user_id']}: {entry['score']}")

# 获取玩家附近的排名
nearby = lb.get_user_rank_range("player:1001", range_n=2)
for entry in nearby:
    print(f"#{entry['rank']} {entry['user_id']}: {entry['score']}")

多维度排行榜

class MultiDimensionLeaderboard:
    """多维度排行榜"""
    
    def __init__(self, redis_client, name: str):
        self.redis = redis_client
        self.name = name
    
    def add_score(self, user_id: str, score: float, dimension: str = "default"):
        key = f"leaderboard:{self.name}:{dimension}"
        self.redis.zadd(key, {user_id: score})
        self.redis.expire(key, 86400 * 7)  # 7 天过期
    
    def get_top_n(self, dimension: str = "default", n: int = 10):
        key = f"leaderboard:{self.name}:{dimension}"
        return self.redis.zrevrange(key, 0, n - 1, withscores=True)
    
    def get_rank(self, user_id: str, dimension: str = "default"):
        key = f"leaderboard:{self.name}:{dimension}"
        rank = self.redis.zrevrank(key, user_id)
        return rank + 1 if rank is not None else None


# 使用
mlb = MultiDimensionLeaderboard(r, "game")
mlb.add_score("player:1", 1000, "daily")
mlb.add_score("player:1", 5000, "weekly")
mlb.add_score("player:1", 20000, "monthly")

22.3 分布式锁

Redlock 算法实现

import redis
import uuid
import time
import threading

class Redlock:
    """Redlock 分布式锁实现"""
    
    UNLOCK_SCRIPT = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    
    def __init__(self, redis_instances: list, ttl: int = 30000):
        """
        初始化 Redlock
        
        :param redis_instances: Redis 实例列表
        :param ttl: 锁的过期时间(毫秒)
        """
        self.instances = redis_instances
        self.ttl = ttl
        self.quorum = len(redis_instances) // 2 + 1
        self.clock_drift_factor = 0.01
    
    def lock(self, resource: str, ttl: int = None) -> dict:
        """获取锁"""
        ttl = ttl or self.ttl
        token = str(uuid.uuid4())
        retry_count = 3
        
        for _ in range(retry_count):
            n = 0
            start_time = time.time() * 1000
            
            # 尝试在所有实例上加锁
            for instance in self.instances:
                try:
                    if self._acquire_instance(instance, resource, token, ttl):
                        n += 1
                except redis.RedisError:
                    pass
            
            # 计算耗时和有效时间
            elapsed = time.time() * 1000 - start_time
            validity = ttl - elapsed - (elapsed * self.clock_drift_factor)
            
            # 检查是否获得多数派锁
            if n >= self.quorum and validity > 0:
                return {
                    'resource': resource,
                    'token': token,
                    'validity': validity,
                }
            else:
                # 未能获得锁,释放所有实例
                for instance in self.instances:
                    try:
                        self._release_instance(instance, resource, token)
                    except redis.RedisError:
                        pass
            
            time.sleep(0.1)
        
        return None
    
    def unlock(self, lock_info: dict):
        """释放锁"""
        if lock_info:
            for instance in self.instances:
                try:
                    self._release_instance(
                        instance, 
                        lock_info['resource'], 
                        lock_info['token']
                    )
                except redis.RedisError:
                    pass
    
    def _acquire_instance(self, instance, resource, token, ttl):
        """在单个实例上获取锁"""
        return instance.set(
            f"lock:{resource}", 
            token, 
            nx=True, 
            px=ttl
        )
    
    def _release_instance(self, instance, resource, token):
        """在单个实例上释放锁"""
        script = instance.register_script(self.UNLOCK_SCRIPT)
        script(keys=[f"lock:{resource}"], args=[token])


# 使用示例
instances = [
    redis.Redis(host='redis1', port=6379, decode_responses=True),
    redis.Redis(host='redis2', port=6379, decode_responses=True),
    redis.Redis(host='redis3', port=6379, decode_responses=True),
]

redlock = Redlock(instances, ttl=30000)

# 获取锁
lock = redlock.lock("order:create:1001")
if lock:
    try:
        print(f"Lock acquired, validity: {lock['validity']}ms")
        # 执行业务逻辑
        time.sleep(1)
    finally:
        redlock.unlock(lock)
else:
    print("Failed to acquire lock")

22.4 消息队列

基于 Stream 的可靠消息队列

import redis
import json
import uuid
import time
import threading
from typing import Callable, Dict, Any

class StreamQueue:
    """基于 Redis Stream 的消息队列"""
    
    def __init__(self, redis_client, stream_key: str, group_name: str):
        self.redis = redis_client
        self.stream_key = stream_key
        self.group_name = group_name
        self._ensure_group()
    
    def _ensure_group(self):
        """确保消费者组存在"""
        try:
            self.redis.xgroup_create(
                self.stream_key, self.group_name, id='0', mkstream=True
            )
        except redis.exceptions.ResponseError:
            pass
    
    def publish(self, data: dict, max_len: int = 100000) -> str:
        """发布消息"""
        return self.redis.xadd(
            self.stream_key,
            {'data': json.dumps(data, ensure_ascii=False)},
            maxlen=max_len
        )
    
    def consume(self, consumer_name: str, handler: Callable, 
                count: int = 10, block: int = 5000):
        """消费消息"""
        while True:
            try:
                messages = self.redis.xreadgroup(
                    self.group_name, consumer_name,
                    {self.stream_key: '>'},
                    count=count,
                    block=block,
                )
                
                if not messages:
                    continue
                
                for stream, entries in messages:
                    for msg_id, data in entries:
                        try:
                            payload = json.loads(data['data'])
                            handler(payload)
                            self.redis.xack(
                                self.stream_key, self.group_name, msg_id
                            )
                        except Exception as e:
                            print(f"Error processing {msg_id}: {e}")
                            # 可选:推入死信队列
                            self.redis.xadd(
                                f"{self.stream_key}:dlq",
                                {'original_id': msg_id, 'error': str(e), 'data': data['data']},
                                maxlen=10000
                            )
                            self.redis.xack(
                                self.stream_key, self.group_name, msg_id
                            )
            except redis.RedisError as e:
                print(f"Redis error: {e}")
                time.sleep(1)
    
    def get_pending_count(self) -> int:
        """获取待确认消息数"""
        pending = self.redis.xpending(self.stream_key, self.group_name)
        return pending['pending'] if pending else 0
    
    def reclaim_timeout_messages(self, consumer_name: str, min_idle_time: int = 60000):
        """回收超时未确认的消息"""
        return self.redis.xclaim(
            self.stream_key, self.group_name, consumer_name,
            min_idle_time, justid=True
        )


# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)

# 创建消息队列
queue = StreamQueue(r, 'order_events', 'order_processors')

# 发布消息
for i in range(5):
    msg_id = queue.publish({
        'order_id': f'ORD-{1000 + i}',
        'action': 'create',
        'amount': 99.9 + i,
        'user_id': 1001 + i,
    })
    print(f"Published: {msg_id}")

# 消费消息
def process_order(data: dict):
    """处理订单"""
    print(f"Processing order: {data['order_id']}, action: {data['action']}")

# 启动消费者(后台线程)
t = threading.Thread(
    target=queue.consume,
    args=('consumer-1', process_order),
    daemon=True
)
t.start()

22.5 限流系统

多种限流算法实现

import redis
import time
from functools import wraps

class RateLimiter:
    """限流器"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def fixed_window(self, key: str, limit: int, window: int) -> bool:
        """
        固定窗口限流
        
        :param key: 限流 Key
        :param limit: 限制次数
        :param window: 窗口大小(秒)
        :return: True 允许, False 限流
        """
        current = self.redis.incr(key)
        if current == 1:
            self.redis.expire(key, window)
        return current <= limit
    
    def sliding_window_log(self, key: str, limit: int, window: int) -> bool:
        """
        滑动窗口日志限流
        """
        now = time.time()
        pipe = self.redis.pipeline()
        
        # 移除窗口外的记录
        pipe.zremrangebyscore(key, 0, now - window)
        # 统计窗口内的请求数
        pipe.zcard(key)
        # 添加当前请求
        pipe.zadd(key, {str(now): now})
        # 设置过期时间
        pipe.expire(key, window)
        
        results = pipe.execute()
        count = results[1]
        
        return count < limit
    
    def sliding_window_counter(self, key: str, limit: int, window: int) -> bool:
        """
        滑动窗口计数器限流(近似算法)
        """
        now = time.time()
        current_window = int(now // window) * window
        previous_window = current_window - window
        
        pipe = self.redis.pipeline()
        pipe.get(f"{key}:{previous_window}")
        pipe.get(f"{key}:{current_window}")
        pipe.incr(f"{key}:{current_window}")
        pipe.expire(f"{key}:{current_window}", window * 2)
        results = pipe.execute()
        
        previous_count = int(results[0] or 0)
        current_count = int(results[1] or 0)
        
        # 计算滑动窗口内的请求数(加权平均)
        elapsed_ratio = (now - current_window) / window
        estimated_count = previous_count * (1 - elapsed_ratio) + current_count
        
        return estimated_count < limit
    
    def token_bucket(self, key: str, rate: float, capacity: int) -> bool:
        """
        令牌桶限流
        
        :param key: 限流 Key
        :param rate: 令牌生成速率(个/秒)
        :param capacity: 桶容量
        :return: True 允许, False 限流
        """
        now = time.time()
        
        lua_script = """
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        local requested = tonumber(ARGV[4])
        
        local data = redis.call('HMGET', key, 'tokens', 'last_time')
        local tokens = tonumber(data[1]) or capacity
        local last_time = tonumber(data[2]) or now
        
        -- 计算新增令牌
        local elapsed = now - last_time
        local new_tokens = math.min(capacity, tokens + elapsed * rate)
        
        if new_tokens >= requested then
            new_tokens = new_tokens - requested
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
            redis.call('EXPIRE', key, 3600)
            return 0
        end
        """
        
        result = self.redis.eval(lua_script, 1, key, rate, capacity, now, 1)
        return bool(result)
    
    def leaky_bucket(self, key: str, rate: float, capacity: int) -> bool:
        """
        漏桶限流
        """
        now = time.time()
        
        lua_script = """
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        
        local data = redis.call('HMGET', key, 'water', 'last_time')
        local water = tonumber(data[1]) or 0
        local last_time = tonumber(data[2]) or now
        
        -- 计算漏出的水量
        local elapsed = now - last_time
        local leaked = elapsed * rate
        water = math.max(0, water - leaked)
        
        if water < capacity then
            water = water + 1
            redis.call('HMSET', key, 'water', water, 'last_time', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            redis.call('HMSET', key, 'water', water, 'last_time', now)
            redis.call('EXPIRE', key, 3600)
            return 0
        end
        """
        
        result = self.redis.eval(lua_script, 1, key, rate, capacity, now)
        return bool(result)


# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
limiter = RateLimiter(r)

# API 限流装饰器
def rate_limit(key_func, limit=100, window=60):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = key_func(*args, **kwargs)
            if not limiter.sliding_window_log(key, limit, window):
                raise Exception("Rate limit exceeded")
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 使用
@rate_limit(lambda user_id, **kw: f"rate:user:{user_id}", limit=100, window=60)
def get_user_data(user_id, **kwargs):
    return {"user_id": user_id, "data": "..."}

22.6 分布式 Session

import redis
import json
import uuid
import time
from typing import Dict, Any, Optional

class RedisSession:
    """基于 Redis 的分布式 Session"""
    
    def __init__(self, redis_client, prefix: str = "session", ttl: int = 1800):
        self.redis = redis_client
        self.prefix = prefix
        self.ttl = ttl
    
    def create(self, user_id: str, data: dict = None) -> str:
        """创建 Session"""
        session_id = str(uuid.uuid4())
        session_data = {
            'user_id': user_id,
            'created_at': time.time(),
            'data': data or {},
        }
        key = f"{self.prefix}:{session_id}"
        self.redis.setex(key, self.ttl, json.dumps(session_data, ensure_ascii=False))
        return session_id
    
    def get(self, session_id: str) -> Optional[Dict]:
        """获取 Session"""
        key = f"{self.prefix}:{session_id}"
        data = self.redis.get(key)
        if data:
            # 延长过期时间
            self.redis.expire(key, self.ttl)
            return json.loads(data)
        return None
    
    def update(self, session_id: str, data: dict) -> bool:
        """更新 Session"""
        key = f"{self.prefix}:{session_id}"
        session_data = self.get(session_id)
        if session_data:
            session_data['data'].update(data)
            self.redis.setex(key, self.ttl, json.dumps(session_data, ensure_ascii=False))
            return True
        return False
    
    def delete(self, session_id: str) -> bool:
        """删除 Session"""
        key = f"{self.prefix}:{session_id}"
        return bool(self.redis.delete(key))
    
    def touch(self, session_id: str) -> bool:
        """刷新 Session 过期时间"""
        key = f"{self.prefix}:{session_id}"
        return bool(self.redis.expire(key, self.ttl))


# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
session = RedisSession(r, ttl=3600)

# 创建 Session
sid = session.create("user:1001", {"role": "admin", "permissions": ["read", "write"]})
print(f"Session ID: {sid}")

# 获取 Session
data = session.get(sid)
print(f"Session data: {data}")

# 更新 Session
session.update(sid, {"last_page": "/dashboard"})

# 删除 Session(登出)
session.delete(sid)

22.7 布隆过滤器防穿透

import redis

class BloomFilter:
    """基于 Redis 的布隆过滤器"""
    
    def __init__(self, redis_client, name: str, capacity: int = 1000000, error_rate: float = 0.001):
        self.redis = redis_client
        self.name = name
        self.capacity = capacity
        self.error_rate = error_rate
    
    def create(self):
        """创建布隆过滤器"""
        try:
            self.redis.execute_command(
                'BF.RESERVE', self.name, self.error_rate, self.capacity
            )
        except redis.exceptions.ResponseError:
            pass  # 已存在
    
    def add(self, item: str) -> bool:
        """添加元素"""
        return bool(self.redis.execute_command('BF.ADD', self.name, item))
    
    def exists(self, item: str) -> bool:
        """检查元素是否存在"""
        return bool(self.redis.execute_command('BF.EXISTS', self.name, item))
    
    def batch_add(self, items: list) -> list:
        """批量添加"""
        return self.redis.execute_command('BF.MADD', self.name, *items)
    
    def batch_exists(self, items: list) -> list:
        """批量检查"""
        return self.redis.execute_command('BF.MEXISTS', self.name, *items)


# 缓存穿透防护
class CacheWithBloomFilter:
    """带布隆过滤器的缓存"""
    
    def __init__(self, redis_client, bloom_name: str = "bf:cache"):
        self.redis = redis_client
        self.bloom = BloomFilter(redis_client, bloom_name)
        self.bloom.create()
    
    def get(self, key: str, fetch_func=None):
        # 1. 布隆过滤器检查
        if not self.bloom.exists(key):
            return None  # Key 一定不存在
        
        # 2. 查缓存
        value = self.redis.get(key)
        if value:
            if value == "NULL":
                return None
            return json.loads(value)
        
        # 3. 查数据库
        if fetch_func:
            value = fetch_func(key)
            if value is None:
                self.redis.setex(key, 300, "NULL")  # 缓存空值
            else:
                self.redis.setex(key, 3600, json.dumps(value))
            return value
        
        return None
    
    def register(self, key: str):
        """注册 Key 到布隆过滤器"""
        self.bloom.add(key)


# 使用
r = redis.Redis(host='localhost', decode_responses=True)
cache = CacheWithBloomFilter(r)

# 注册合法 Key
cache.register("user:1001")
cache.register("user:1002")

# 查询
result = cache.get("user:9999")  # 不存在的 Key,布隆过滤器直接返回 None
result = cache.get("user:1001", lambda k: {"id": 1001, "name": "张三"})

📌 业务场景总结

场景数据结构关键命令注意事项
缓存String/HashGET/SET/SETEXTTL 随机化、缓存穿透
排行榜ZSetZADD/ZREVRANGE分数设计、范围查询
分布式锁StringSET NX EX安全释放(Lua)、续期
消息队列StreamXADD/XREADGROUPACK 机制、死信队列
限流String/ZSetINCR/ZADD算法选择、Lua 原子性
SessionString/HashGET/SET/HSET过期时间、序列化
计数器StringINCR/DECRBY原子操作、精度
标签系统SetSADD/SINTER去重、集合运算
位图统计BitmapSETBIT/BITCOUNT节省内存、批量统计
地理位置GeoSpatialGEOADD/GEOSEARCH经纬度精度、范围查询

🔗 扩展阅读