强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第10章 最佳实践

第10章 最佳实践

从协议选择到安全加固,汇总 Memcached 在生产环境中的核心最佳实践。


10.1 协议选择指南

决策流程图

开始选择协议
    │
    ├── 需要 telnet 调试?
    │   └── 是 → 文本协议
    │
    ├── 需要 Meta 协议特性(CAS 删除、惰性失效等)?
    │   └── 是 → Meta 协议(1.6+)
    │
    ├── 客户端库已支持二进制协议?
    │   └── 是 → 二进制协议
    │
    └── 默认 → 文本协议(最广泛支持)

协议选择对比表

场景推荐协议理由
Web 应用缓存文本协议简单可靠,客户端成熟
高性能微服务二进制协议解析效率高
需要 CAS 删除Meta 协议原生支持
需要惰性失效Meta 协议原生支持
运维调试文本协议telnet 直接操作
批量元数据查询Meta 协议单次往返获取多项元数据
客户端库开发文本协议 + Meta双协议支持

10.2 客户端开发规范

协议解析要点

# 1. 严格使用 CRLF
LINE_TERMINATOR = b"\r\n"

# 2. 精确读取数据块长度
def read_value(sock, length: int) -> bytes:
    data = b""
    while len(data) < length:
        chunk = sock.recv(length - len(data))
        if not chunk:
            raise ConnectionError("连接断开")
        data += chunk
    # 读取尾部 CRLF
    crlf = sock.recv(2)
    assert crlf == b"\r\n", f"期望 CRLF,收到 {crlf!r}"
    return data

# 3. 正确处理部分读取
def recv_until(sock, marker: bytes, buffer_size: int = 4096) -> bytes:
    buf = b""
    while marker not in buf:
        chunk = sock.recv(buffer_size)
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

# 4. 验证响应格式
def validate_response(line: str, expected: list[str]):
    if line not in expected:
        raise ProtocolError(f"期望 {expected},收到: {line!r}")

class ProtocolError(Exception):
    pass

Key 设计规范

规则说明示例
最长 250 字节超长 key 会被截断或报错user:1001
无空白字符空格、\r\n 不允许session:abc123
使用冒号分隔便于阅读和分类product:1001:detail
包含版本号缓存失效时更新版本v2:user:1001
命名空间前缀避免 key 冲突myapp:user:1001

Key 设计示例

class CacheKeyBuilder:
    """缓存 Key 构建器"""

    def __init__(self, app: str, version: str = "v1"):
        self.app = app
        self.version = version

    def build(self, *parts: str) -> str:
        key = f"{self.app}:{self.version}:" + ":".join(str(p) for p in parts)
        if len(key.encode()) > 250:
            raise ValueError(f"Key 过长: {len(key.encode())} bytes")
        return key

    def user(self, user_id: int) -> str:
        return self.build("user", str(user_id))

    def session(self, session_id: str) -> str:
        return self.build("session", session_id)

    def product(self, product_id: int, field: str = "detail") -> str:
        return self.build("product", str(product_id), field)

# 使用
kb = CacheKeyBuilder("myapp", "v2")
print(kb.user(1001))           # myapp:v2:user:1001
print(kb.session("abc123"))    # myapp:v2:session:abc123
print(kb.product(2001))        # myapp:v2:product:2001:detail

10.3 缓存策略

Cache-Aside(旁路缓存)

最常用的缓存模式,读写都不经过缓存,由应用层控制:

class CacheAsidePattern:
    def __init__(self, cache_client, db_client):
        self.cache = cache_client
        self.db = db_client

    def get(self, key: str, db_query_fn, ttl: int = 300):
        """
        1. 先查缓存
        2. 未命中则查数据库
        3. 查询结果写入缓存
        """
        # 1. 查缓存
        cached = self.cache.get(key)
        if cached is not None:
            return json.loads(cached)

        # 2. 查数据库
        data = db_query_fn()
        if data is None:
            return None

        # 3. 写缓存(设置合理的 TTL)
        self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
        return data

    def update(self, key: str, data: dict, db_update_fn, ttl: int = 300):
        """
        1. 先更新数据库
        2. 再更新缓存(或删除缓存)
        """
        # 1. 更新数据库
        db_update_fn(data)

        # 2. 更新缓存(或者删除缓存,让下次读取时重新加载)
        self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
        # 或者: self.cache.delete(key)

缓存穿透防护

class PenetrationProtection:
    """防止缓存穿透"""

    def __init__(self, cache_client):
        self.cache = cache_client

    def get_or_set_default(self, key: str, db_query_fn,
                           ttl: int = 300, null_ttl: int = 60):
        """
        查询数据库,如果结果为空则缓存空值(短 TTL)
        防止大量不存在的 key 穿透到数据库
        """
        cached = self.cache.get(key)
        if cached is not None:
            if cached == b"__NULL__":
                return None
            return json.loads(cached)

        data = db_query_fn()
        if data is None:
            # 缓存空值,短 TTL
            self.cache.set(key, b"__NULL__", exptime=null_ttl)
            return None

        self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
        return data

缓存击穿防护(互斥锁)

import time
import threading

class BreakdownProtection:
    """防止缓存击穿(热点 key 过期时大量请求穿透)"""

    def __init__(self, cache_client):
        self.cache = cache_client
        self.local_locks = {}
        self.lock = threading.Lock()

    def get_with_lock(self, key: str, db_query_fn,
                      ttl: int = 300, lock_ttl: int = 10):
        cached = self.cache.get(key)
        if cached is not None:
            return json.loads(cached)

        # 使用本地锁 + 分布式锁防止击穿
        with self.lock:
            if key not in self.local_locks:
                self.local_locks[key] = threading.Lock()

        local_lock = self.local_locks[key]

        # 尝试获取分布式锁
        lock_key = f"lock:{key}"
        lock_data = str(time.time()).encode()

        # 使用 add 实现分布式锁
        cmd = f"add {lock_key} 0 {lock_ttl} {len(lock_data)}\r\n"
        self.cache.set(lock_key, lock_data, exptime=lock_ttl)  # 简化版

        with local_lock:
            # 双重检查
            cached = self.cache.get(key)
            if cached is not None:
                return json.loads(cached)

            # 查询数据库
            data = db_query_fn()
            if data is not None:
                self.cache.set(key, json.dumps(data).encode(), exptime=ttl)

            return data

缓存雪崩防护

import random

class AvalancheProtection:
    """防止缓存雪崩(大量 key 同时过期)"""

    def set_with_jitter(self, cache, key: str, value: bytes,
                        base_ttl: int = 300, jitter: int = 60):
        """
        给 TTL 添加随机抖动,避免大量 key 同时过期
        """
        actual_ttl = base_ttl + random.randint(0, jitter)
        cache.set(key, value, exptime=actual_ttl)

10.4 性能优化

1. 批量操作

# ❌ 低效:逐个获取
for key in keys:
    value = client.get(key)

# ✅ 高效:批量获取
values = client.get_multi(keys)

2. Noreply 模式

# ✅ 批量写入使用 noreply
for key, value in items.items():
    client.set(key, value, noreply=True)

3. 管道化

# ✅ 管道化发送
sock.sendall(b"set k1 0 0 5\r\nhello\r\n")
sock.sendall(b"set k2 0 0 5\r\nworld\r\n")
sock.sendall(b"set k3 0 0 5\r\ntest!\r\n")
# 然后一次性读取所有响应

4. 压缩大 Value

import gzip

COMPRESS_THRESHOLD = 1024  # 超过 1KB 压缩

def set_with_compress(client, key: str, value: bytes, ttl: int = 300):
    if len(value) > COMPRESS_THRESHOLD:
        compressed = gzip.compress(value)
        if len(compressed) < len(value):
            # flags 标记压缩
            client.set(key, compressed, flags=0x01, exptime=ttl)
            return
    client.set(key, value, exptime=ttl)

def get_with_decompress(client, key: str) -> bytes | None:
    result = client.gets(key)
    if result is None:
        return None
    value, flags, cas = result
    if flags & 0x01:  # 压缩标志
        return gzip.decompress(value)
    return value

5. 连接复用

# ❌ 低效:每次操作新建连接
def bad_get(key):
    sock = socket.socket(...)
    sock.connect(...)
    sock.sendall(...)
    sock.close()

# ✅ 高效:复用长连接
class PersistentClient:
    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def __del__(self):
        self.sock.close()

6. 合理设置 TTL

# 根据数据更新频率设置 TTL
TTL_CONFIG = {
    'user_profile': 3600,      # 用户资料,1小时
    'session': 1800,           # 会话,30分钟
    'product_detail': 600,     # 商品详情,10分钟
    'hot_ranking': 60,         # 热榜,1分钟
    'config': 86400,           # 配置,1天
    'captcha': 300,            # 验证码,5分钟
}

10.5 监控与告警

关键监控指标

指标来源告警阈值说明
缓存命中率stats< 80%影响性能的关键指标
当前连接数stats> 80% 最大连接数连接数不足
内存使用率stats> 90% 最大内存内存不足
淘汰次数stats持续增长内存不足导致淘汰
命令延迟客户端> 5ms响应变慢
线程利用率stats> 80%Worker 线程繁忙

监控脚本

#!/usr/bin/env python3
"""monitor.py — Memcached 监控脚本"""

import socket
import time
import json

class MemcachedMonitor:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))
        self.prev_stats = {}

    def get_stats(self) -> dict:
        self.sock.sendall(b"stats\r\n")
        buffer = b""
        while True:
            chunk = self.sock.recv(8192)
            buffer += chunk
            if b"END\r\n" in buffer:
                break

        stats = {}
        for line in buffer.decode().split("\r\n"):
            if line.startswith("STAT"):
                parts = line.split()
                stats[parts[1]] = parts[2]
        return stats

    def check_health(self) -> dict:
        stats = self.get_stats()
        current = {k: int(v) for k, v in stats.items() if v.isdigit()}

        result = {
            'timestamp': time.time(),
            'curr_items': current.get('curr_items', 0),
            'bytes': current.get('bytes', 0),
            'max_bytes': current.get('max_bytes', 0),
            'curr_connections': current.get('curr_connections', 0),
            'cmd_get': current.get('cmd_get', 0),
            'cmd_set': current.get('cmd_set', 0),
            'get_hits': current.get('get_hits', 0),
            'get_misses': current.get('get_misses', 0),
            'evictions': current.get('evictions', 0),
        }

        # 计算命中率
        total = result['get_hits'] + result['get_misses']
        result['hit_rate'] = (
            result['get_hits'] / total * 100 if total > 0 else 0
        )

        # 计算增量
        if self.prev_stats:
            result['cmd_get_rate'] = (
                result['cmd_get'] - self.prev_stats.get('cmd_get', 0)
            )
            result['cmd_set_rate'] = (
                result['cmd_set'] - self.prev_stats.get('cmd_set', 0)
            )

        self.prev_stats = result
        return result

    def alert(self, stats: dict):
        """检查告警条件"""
        alerts = []

        if stats['hit_rate'] < 80:
            alerts.append(f"⚠️ 缓存命中率低: {stats['hit_rate']:.1f}%")

        if stats['bytes'] > 0 and stats['max_bytes'] > 0:
            usage = stats['bytes'] / stats['max_bytes'] * 100
            if usage > 90:
                alerts.append(f"⚠️ 内存使用率高: {usage:.1f}%")

        if stats['evictions'] > self.prev_stats.get('evictions', 0):
            alerts.append(f"⚠️ 发生淘汰: {stats['evictions']}")

        return alerts


# 使用
monitor = MemcachedMonitor()
for _ in range(10):
    stats = monitor.check_health()
    print(json.dumps(stats, indent=2))

    alerts = monitor.alert(stats)
    for alert in alerts:
        print(alert)

    time.sleep(1)

10.6 安全加固

1. 网络层安全

# 仅监听内网地址
memcached -l 127.0.0.1 -p 11211

# 或监听特定内网 IP
memcached -l 10.0.0.1 -p 11211

# 使用 iptables 限制访问
sudo iptables -A INPUT -p tcp --dport 11211 -s 10.0.0.0/24 -j ACCEPT
sudo iptables -A INPUT -p tcp --dport 11211 -j DROP

2. SASL 认证

# 启动时启用 SASL
memcached -S -l 127.0.0.1

# 配置 SASL
echo "mech_list: plain" > /etc/sasl2/memcached.conf
saslpasswd2 -c -a memcached -f /etc/sasldb2 memcached_user
# 客户端 SASL 认证
def sasl_auth(sock, username: str, password: str) -> bool:
    """SASL PLAIN 认证"""
    # 二进制协议 SASL 认证
    challenge = b"\0" + username.encode() + b"\0" + password.encode()
    mechanism = b"PLAIN"

    # ... 发送 SASL Auth 命令
    return True

3. 命令禁用

# 禁用危险命令(通过启动参数或 ACL)
memcached -l 127.0.0.1 --disable-evictions

# 或通过代理层过滤命令

4. 访问控制列表(ACL)

# 在代理层实现 ACL
ALLOWED_COMMANDS = {
    'get', 'gets', 'set', 'add', 'replace',
    'delete', 'incr', 'decr', 'flush_all',
    'stats', 'version', 'quit'
}

BLOCKED_COMMANDS = {'flush_all'}  # 生产环境禁用

def check_command_allowed(command: str) -> bool:
    cmd = command.strip().split()[0].lower()
    if cmd in BLOCKED_COMMANDS:
        return False
    return cmd in ALLOWED_COMMANDS

5. 值大小限制

# 设置最大 value 大小
memcached -I 512k  # 默认 1MB,调整为 512KB

10.7 运维最佳实践

启动参数推荐

# 生产环境推荐配置
memcached \
  -l 10.0.0.1 \         # 监听内网 IP
  -p 11211 \             # 端口
  -m 4096 \              # 4GB 内存
  -c 10000 \             # 最大 10000 连接
  -t 8 \                 # 8 个工作线程
  -U 0 \                 # 禁用 UDP
  -o modern \            # 启用现代特性(1.6+)
  -o slab_automove=1 \   # 自动调整 Slab 分配
  -I 1m \                # 最大 Value 1MB
  -v \                   # 基本日志
  -f 1.25 \              # Slab 增长因子
  -n 48                  # 最小 item 空间

systemd 服务配置

# /etc/systemd/system/memcached.service
[Unit]
Description=Memcached
Documentation=https://memcached.org/
After=network.target

[Service]
Type=simple
User=memcache
Group=memcache
ExecStart=/usr/bin/memcached \
  -l 127.0.0.1 \
  -p 11211 \
  -m 4096 \
  -c 10000 \
  -t 4 \
  -o modern \
  -U 0
Restart=always
RestartSec=5
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

日志分析

# 查看 Memcached 日志
journalctl -u memcached -f

# 统计连接信息
echo "stats" | nc 127.0.0.1 11211 | grep -E "(curr_items|bytes|cmd_|get_)" 

# 监控 slab 分配
watch -n 1 'echo "stats slabs" | nc 127.0.0.1 11211 | head -30'

10.8 常见问题排查

问题一:缓存命中率低

# 诊断步骤
echo "stats" | nc 127.0.0.1 11211 | grep -E "(get_hits|get_misses|evictions)"

# 可能原因:
# 1. TTL 过短 → 增加 TTL
# 2. 内存不足导致淘汰 → 增加内存或优化 key
# 3. Key 分布不均 → 检查哈希分布
# 4. 缓存穿透 → 使用空值缓存

问题二:内存使用过高

# 查看内存分配
echo "stats slabs" | nc 127.0.0.1 11211

# 查看 item 分布
echo "stats sizes" | nc 127.0.0.1 11211

# 可能原因:
# 1. 大量大 Value → 考虑压缩或拆分
# 2. Slab 碎片 → 调整增长因子 -f
# 3. 内存泄漏(item 未过期) → 检查 TTL 设置

问题三:连接数过多

# 查看连接信息
echo "stats" | nc 127.0.0.1 11211 | grep connection

# 可能原因:
# 1. 连接池配置不当 → 调整连接池大小
# 2. 连接未正确关闭 → 检查客户端代码
# 3. 慢查询阻塞 → 检查大 Value 操作

问题四:性能下降

# 检查淘汰率
echo "stats" | nc 127.0.0.1 11211 | grep evictions

# 检查 Slab 状态
echo "stats slabs" | nc 127.0.0.1 11211

# 可能原因:
# 1. Slab 不均衡 → 启用 slab_automove
# 2. 大量 CAS 操作 → 减少并发冲突
# 3. 网络延迟 → 检查网络状况

10.9 生产环境检查清单

检查项状态说明
仅监听内网地址-l 参数
禁用 UDP-U 0
设置最大内存-m 参数
设置最大连接数-c 参数
配置防火墙iptables / 安全组
启用 SASL 认证-S 参数
配置监控告警命中率、内存、连接数
配置日志journalctl
测试故障转移模拟节点故障
测试数据迁移节点增减测试
配置备份策略定期 dump
文档化架构架构图、key 设计

10.10 扩展阅读


全书总结

本教程系统讲解了 Memcached 的三大协议(文本协议、二进制协议、Meta 协议)及其完整命令集,深入探讨了分布式架构、一致性哈希、连接池、故障转移等核心主题,最后给出了生产环境的最佳实践。

核心要点回顾

主题关键知识
协议选择文本协议可读性好,二进制协议效率高,Meta 协议功能丰富
缓存策略Cache-Aside 模式最常用,注意穿透/击穿/雪崩防护
分布式设计一致性哈希保证节点增减时最小化数据迁移
性能优化批量操作、管道化、连接复用、压缩大 Value
安全加固网络隔离、SASL 认证、命令禁用、值大小限制

上一章: 第09章 代理与分布式 返回: 目录