强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

19 - Python 集成

Python 集成

19.1 redis-py 概述

redis-py 是 Python 最流行的 Redis 客户端库,支持同步和异步操作。

安装

# 基础安装
pip install redis

# 带异步支持
pip install redis[hiredis]

# hiredis 是 C 语言实现的解析器,性能提升 2-3 倍

版本特性

版本Python特性
4.x3.7+异步支持、JSON、Search
5.x3.8+改进连接池、更好的 Cluster 支持

19.2 基础连接

基本连接

import redis

# 最简连接
r = redis.Redis(host='localhost', port=6379, db=0)
r.ping()  # True

# 带密码和解码
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password='yourpassword',
    decode_responses=True,  # 自动解码为字符串
    socket_timeout=5,
    socket_connect_timeout=5,
    retry_on_timeout=True,
)

连接池

import redis

# 创建连接池
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    password='yourpassword',
    db=0,
    max_connections=100,
    decode_responses=True,
    socket_timeout=5,
    socket_connect_timeout=5,
)

# 从连接池获取连接
r = redis.Redis(connection_pool=pool)

# 使用
r.set('key', 'value')
value = r.get('key')

Sentinel 连接

from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('192.168.1.1', 26379),
     ('192.168.1.2', 26379),
     ('192.168.1.3', 26379)],
    socket_timeout=0.5,
    password='sentinel_password'
)

# 获取主节点连接
master = sentinel.master_for('mymaster', password='redis_password')
master.set('key', 'value')

# 获取从节点连接
slave = sentinel.slave_for('mymaster', password='redis_password')
value = slave.get('key')

Cluster 连接

from redis.cluster import RedisCluster

rc = RedisCluster(
    startup_nodes=[
        {"host": "192.168.1.1", "port": 6379},
        {"host": "192.168.1.2", "port": 6379},
        {"host": "192.168.1.3", "port": 6379},
    ],
    password='cluster_password',
    decode_responses=True,
)

rc.set('key', 'value')
value = rc.get('key')

19.3 数据类型操作

String 操作

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 基础操作
r.set('name', '张三')
r.get('name')                    # '张三'

# 设置过期时间
r.setex('session:abc', 3600, 'user_data')   # 3600秒
r.psetex('token:xyz', 60000, 'jwt_token')    # 60000毫秒

# 不存在才设置
r.setnx('lock:order', 'owner-uuid')         # True/False

# 设置值+过期时间+NX(分布式锁推荐)
r.set('lock:order', 'owner-uuid', nx=True, ex=30)

# 批量操作
r.mset({'k1': 'v1', 'k2': 'v2', 'k3': 'v3'})
r.mget('k1', 'k2', 'k3')        # ['v1', 'v2', 'v3']

# 数值操作
r.set('counter', 0)
r.incr('counter')                # 1
r.incr('counter', 5)             # 6
r.decr('counter')                # 5
r.incrbyfloat('counter', 1.5)    # 6.5

# 追加
r.append('greeting', 'Hello')
r.append('greeting', ' World')
r.get('greeting')                # 'Hello World'

# JSON 序列化存储
user = {'name': '张三', 'age': 25, 'city': '北京'}
r.set('user:1001', json.dumps(user, ensure_ascii=False))
user_data = json.loads(r.get('user:1001'))

Hash 操作

# 设置字段
r.hset('user:1001', 'name', '张三')
r.hset('user:1001', mapping={'age': 25, 'city': '北京'})

# 获取字段
r.hget('user:1001', 'name')     # '张三'
r.hmget('user:1001', 'name', 'age')  # ['张三', '25']
r.hgetall('user:1001')          # {'name': '张三', 'age': '25', 'city': '北京'}

# 数值操作
r.hincrby('user:1001', 'age', 1)    # 26
r.hincrbyfloat('user:1001', 'score', 0.5)

# 判断字段是否存在
r.hexists('user:1001', 'name')      # True
r.hexists('user:1001', 'phone')     # False

# 删除字段
r.hdel('user:1001', 'city')

# 获取所有字段名/值
r.hkeys('user:1001')   # ['name', 'age']
r.hvals('user:1001')   # ['张三', 26]
r.hlen('user:1001')    # 2

List 操作

# 推入
r.lpush('queue:tasks', 'task1', 'task2', 'task3')
r.rpush('queue:tasks', 'task4')

# 弹出
r.lpop('queue:tasks')       # 'task3'
r.rpop('queue:tasks')       # 'task4'

# 阻塞弹出
r.blpop('queue:tasks', timeout=30)    # ('queue:tasks', 'task2')
r.brpop('queue:tasks', timeout=30)

# 范围查询
r.lrange('queue:tasks', 0, -1)       # 所有元素
r.lrange('queue:tasks', 0, 9)        # 前 10 个

# 长度
r.llen('queue:tasks')

# 索引
r.lindex('queue:tasks', 0)

# 修剪
r.ltrim('queue:tasks', 0, 99)        # 只保留前 100 个

Set 操作

# 添加
r.sadd('tags:article:1', 'python', 'redis', 'tutorial')

# 判断成员
r.sismember('tags:article:1', 'python')     # True

# 获取所有成员
r.smembers('tags:article:1')                # {'python', 'redis', 'tutorial'}

# 集合大小
r.scard('tags:article:1')                   # 3

# 集合运算
r.sadd('set_a', 'a', 'b', 'c', 'd')
r.sadd('set_b', 'b', 'c', 'e', 'f')

r.sinter('set_a', 'set_b')     # {'b', 'c'}       交集
r.sunion('set_a', 'set_b')     # {'a','b','c','d','e','f'} 并集
r.sdiff('set_a', 'set_b')      # {'a', 'd'}       差集

# 存储结果
r.sinterstore('result', 'set_a', 'set_b')

ZSet 操作

# 添加
r.zadd('leaderboard', {'Alice': 100, 'Bob': 85, 'Charlie': 92})

# 获取分数
r.zscore('leaderboard', 'Alice')      # 100.0

# 分数自增
r.zincrby('leaderboard', 5, 'Bob')    # 90.0

# 排名(降序,从 0 开始)
r.zrevrank('leaderboard', 'Alice')     # 0

# Top N(降序)
r.zrevrange('leaderboard', 0, 2, withscores=True)
# [('Alice', 100.0), ('Charlie', 92.0), ('Bob', 90.0)]

# 分数范围查询
r.zrangebyscore('leaderboard', 80, 100, withscores=True)

# 集合大小
r.zcard('leaderboard')                # 3

# 分数区间计数
r.zcount('leaderboard', 80, 100)      # 3

# 删除
r.zrem('leaderboard', 'Bob')

# 按排名范围删除
r.zremrangebyrank('leaderboard', 0, 0)   # 删除排名最低的

19.4 Pipeline 批量操作

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 无 Pipeline
start = time.time()
for i in range(10000):
    r.set(f'key:{i}', f'value:{i}')
print(f"No Pipeline: {time.time() - start:.2f}s")   # ~8s

# Pipeline
start = time.time()
pipe = r.pipeline(transaction=False)
for i in range(10000):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()
print(f"Pipeline: {time.time() - start:.2f}s")        # ~0.5s

# 事务 Pipeline(MULTI/EXEC)
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.execute()  # 原子执行

# 分批 Pipeline(避免一次发送过大)
def batch_pipeline(r, items, batch_size=1000):
    """分批执行 Pipeline"""
    pipe = r.pipeline(transaction=False)
    results = []
    for i, (key, value) in enumerate(items):
        pipe.set(key, value)
        if (i + 1) % batch_size == 0:
            results.extend(pipe.execute())
            pipe = r.pipeline(transaction=False)
    results.extend(pipe.execute())  # 执行剩余
    return results

19.5 分布式锁

import redis
import uuid
import time

class RedisLock:
    """Redis 分布式锁"""
    
    UNLOCK_SCRIPT = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    
    def __init__(self, redis_client, key, ttl=30):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.ttl = ttl
        self.token = str(uuid.uuid4())
    
    def acquire(self, blocking=True, timeout=None):
        """获取锁"""
        end_time = time.time() + timeout if timeout else None
        
        while True:
            if self.redis.set(self.key, self.token, nx=True, ex=self.ttl):
                return True
            
            if not blocking:
                return False
            
            if end_time and time.time() > end_time:
                return False
            
            time.sleep(0.1)
    
    def release(self):
        """释放锁(Lua 脚本保证原子性)"""
        return self.redis.eval(
            self.UNLOCK_SCRIPT, 1, self.key, self.token
        ) == 1
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 方式一:上下文管理器
with RedisLock(r, 'order:create:1001', ttl=30):
    # 临界区代码
    print("Lock acquired, processing order...")
    time.sleep(5)
# 自动释放锁

# 方式二:手动管理
lock = RedisLock(r, 'order:create:1001', ttl=30)
if lock.acquire(blocking=False):
    try:
        print("Lock acquired!")
    finally:
        lock.release()

19.6 异步 Redis(aioredis)

import asyncio
import redis.asyncio as aioredis

async def main():
    # 异步连接
    r = aioredis.Redis(
        host='localhost',
        port=6379,
        password='yourpassword',
        decode_responses=True,
    )
    
    # 异步操作
    await r.set('key', 'value')
    value = await r.get('key')
    print(f"Value: {value}")
    
    # 异步 Pipeline
    pipe = r.pipeline()
    for i in range(100):
        pipe.set(f'async:key:{i}', f'value:{i}')
    await pipe.execute()
    
    # 异步哈希操作
    await r.hset('user:1001', mapping={'name': '张三', 'age': 25})
    name = await r.hget('user:1001', 'name')
    print(f"Name: {name}")
    
    # 异步发布订阅
    pubsub = r.pubsub()
    await pubsub.subscribe('channel:news')
    
    async def listener():
        async for message in pubsub.listen():
            if message['type'] == 'message':
                print(f"Received: {message['data']}")
    
    # 启动监听任务
    task = asyncio.create_task(listener())
    
    # 发布消息
    await r.publish('channel:news', 'Hello async Redis!')
    
    await asyncio.sleep(1)
    task.cancel()
    
    await r.aclose()

asyncio.run(main())

FastAPI 集成

from fastapi import FastAPI, Depends
import redis.asyncio as aioredis

app = FastAPI()

# Redis 连接(生命周期管理)
redis_pool = None

@app.on_event("startup")
async def startup():
    global redis_pool
    redis_pool = aioredis.ConnectionPool.from_url(
        "redis://:password@localhost:6379/0",
        max_connections=100,
        decode_responses=True,
    )

@app.on_event("shutdown")
async def shutdown():
    await redis_pool.aclose()

async def get_redis():
    return aioredis.Redis(connection_pool=redis_pool)

@app.get("/api/cache/{key}")
async def get_cache(key: str, r: aioredis.Redis = Depends(get_redis)):
    value = await r.get(key)
    return {"key": key, "value": value}

@app.post("/api/cache/{key}")
async def set_cache(key: str, value: str, ttl: int = 3600,
                    r: aioredis.Redis = Depends(get_redis)):
    await r.set(key, value, ex=ttl)
    return {"key": key, "status": "ok"}

19.7 消息队列(Stream)

import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ---- 生产者 ----
def produce():
    for i in range(10):
        msg_id = r.xadd('orders', {
            'order_id': f'ORD-{1000 + i}',
            'action': 'create',
            'amount': str(99.9 + i),
        })
        print(f"Produced: {msg_id}")

# ---- 消费者 ----
GROUP_NAME = 'order-processors'
CONSUMER_NAME = 'consumer-1'

# 创建消费者组
try:
    r.xgroup_create('orders', GROUP_NAME, id='0', mkstream=True)
except redis.exceptions.ResponseError:
    pass

def consume():
    while True:
        messages = r.xreadgroup(
            GROUP_NAME, CONSUMER_NAME,
            {'orders': '>'},
            count=10,
            block=2000,
        )
        
        if not messages:
            continue
        
        for stream, entries in messages:
            for msg_id, data in entries:
                print(f"Processing: {msg_id} -> {data}")
                r.xack('orders', GROUP_NAME, msg_id)

# 启动消费者线程
t = threading.Thread(target=consume, daemon=True)
t.start()

# 生产消息
produce()

📌 业务场景

场景一:Flask/Django 缓存

from functools import wraps
import redis
import json

r = redis.Redis(host='localhost', decode_responses=True)

def cached(prefix, ttl=300):
    """自定义缓存装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            cache_key = f"{prefix}:{':'.join(str(a) for a in args)}"
            cached_value = r.get(cache_key)
            
            if cached_value:
                return json.loads(cached_value)
            
            result = func(*args, **kwargs)
            r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
            return result
        return wrapper
    return decorator

@cached('user', ttl=3600)
def get_user(user_id):
    # 查询数据库
    return db.query_user(user_id)

场景二:限流器

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, key, limit, window):
        """滑动窗口限流"""
        now = time.time()
        pipe = self.redis.pipeline()
        
        # 移除窗口外的记录
        pipe.zremrangebyscore(key, 0, now - window)
        # 统计窗口内的请求数
        pipe.zcard(key)
        # 添加当前请求
        pipe.zadd(key, {str(now): now})
        # 设置过期时间
        pipe.expire(key, window)
        
        results = pipe.execute()
        count = results[1]
        
        return count < limit

limiter = RateLimiter(r)
if limiter.is_allowed('rate:user:1001', limit=100, window=60):
    print("Request allowed")
else:
    print("Rate limit exceeded")

场景三:分布式任务队列

import redis
import json
import uuid

class TaskQueue:
    def __init__(self, name, redis_client):
        self.name = name
        self.redis = redis_client
    
    def enqueue(self, task_type, payload):
        task = {
            'id': str(uuid.uuid4()),
            'type': task_type,
            'payload': payload,
        }
        self.redis.xadd(f'queue:{self.name}', {'data': json.dumps(task)})
        return task['id']
    
    def create_consumer(self, group, consumer):
        try:
            self.redis.xgroup_create(f'queue:{self.name}', group, id='0', mkstream=True)
        except redis.exceptions.ResponseError:
            pass
        
        while True:
            messages = self.redis.xreadgroup(
                group, consumer,
                {f'queue:{self.name}': '>'},
                count=1,
                block=5000,
            )
            if not messages:
                continue
            
            for stream, entries in messages:
                for msg_id, data in entries:
                    task = json.loads(data['data'])
                    yield msg_id, task

# 使用
queue = TaskQueue('email', r)
queue.enqueue('send_welcome', {'user_id': 1001, 'email': '[email protected]'})

🔗 扩展阅读