强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 15 章:多语言客户端

第 15 章:多语言客户端

15.1 客户端库总览

语言主流库协议一致性哈希批量操作推荐
PHPphp-memcached文本+二进制✅ Ketama★★★★★
JavaSpyMemcached文本+二进制✅ Ketama★★★★★
Gogomemcache文本并发★★★★
Pythonpymemcache文本★★★★★
Pythonpython-memcached文本★★★
Node.jsmemjs文本★★★★
Rubydalli文本★★★★
C/C++libmemcached文本+二进制✅ Ketama★★★★★
.NETEnyimMemcached文本+二进制★★★★

15.2 PHP

安装

# PECL 安装
sudo pecl install memcached

# 启用扩展
echo "extension=memcached.so" | sudo tee /etc/php/mods-available/memcached.ini
sudo phpenmod memcached

# 验证
php -m | grep memcached

基本使用

<?php
// 创建客户端(持久化连接)
$mc = new Memcached('pool_name');
$mc->addServer('localhost', 11211);

// SET
$mc->set('user:1001', ['name' => 'Alice', 'age' => 30], 3600);

// GET
$user = $mc->get('user:1001');
if ($user === false) {
    // 未命中或出错
    $code = $mc->getResultCode();
    if ($code === Memcached::RES_NOTFOUND) {
        echo "Key 不存在";
    }
}

// DELETE
$mc->delete('user:1001');

// INCR / DECR
$mc->set('counter', 0, 0);
$mc->increment('counter', 5);    // 5
$mc->decrement('counter', 2);    // 3

// CAS
$mc->setOption(Memcached::OPT_LIBKETAMA_COMPATIBLE, true);
$mc->get('balance', null, $cas_token);
$mc->cas($cas_token, 'balance', 200, 0);

// 批量操作
$keys = ['user:1001', 'user:1002', 'user:1003'];
$users = $mc->getMulti($keys);
// $users = ['user:1001' => ..., 'user:1003' => ...]

$data = [
    'user:1001' => ['name' => 'Alice'],
    'user:1002' => ['name' => 'Bob'],
];
$mc->setMulti($data, 3600);
$mc->deleteMulti(['user:1001', 'user:1002']);

PHP 高级配置

<?php
$mc = new Memcached('pool_name');

// 一致性哈希
$mc->setOption(Memcached::OPT_LIBKETAMA_COMPATIBLE, true);
$mc->setOption(Memcached::OPT_HASH, Memcached::HASH_MURMUR);

// 压缩阈值(大于 2KB 自动压缩)
$mc->setOption(Memcached::OPT_COMPRESSION_TYPE, Memcached::COMPRESSION_FASTLZ);
$mc->setOption(Memcached::OPT_COMPRESSION_THRESHOLD, 2048);

// 序列化(推荐 igbinary)
$mc->setOption(Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_IGBINARY);

// 超时设置
$mc->setOption(Memcached::OPT_CONNECT_TIMEOUT, 1000);     // 连接超时 1s
$mc->setOption(Memcached::OPT_SEND_TIMEOUT, 100000);       // 发送超时 100ms
$mc->setOption(Memcached::OPT_RECV_TIMEOUT, 100000);       // 接收超时 100ms
$mc->setOption(Memcached::OPT_SERVER_FAILURE_LIMIT, 3);    // 失败阈值

// 二进制协议
$mc->setOption(Memcached::OPT_BINARY_PROTOCOL, true);

// 添加服务器(带权重)
$mc->addServers([
    ['mc1', 11211, 100],    // [host, port, weight]
    ['mc2', 11211, 100],
    ['mc3', 11211, 50],
]);

PHP Session Handler

<?php
// 将 Session 存储在 Memcached 中
ini_set('session.save_handler', 'memcached');
ini_set('session.save_path', 'mc1:11211,mc2:11211');
ini_set('session.gc_maxlifetime', 1800);  // 30 分钟

15.3 Java

依赖

<!-- Maven -->
<dependency>
    <groupId>net.spy</groupId>
    <artifactId>spymemcached</artifactId>
    <version>2.12.3</version>
</dependency>

基本使用

import net.spy.memcached.MemcachedClient;
import net.spy.memcached.AddrUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MemcachedExample {
    private MemcachedClient mc;

    public MemcachedExample() throws Exception {
        // 创建客户端
        mc = new MemcachedClient(
            AddrUtil.getAddresses("mc1:11211 mc2:11211 mc3:11211")
        );
    }

    public void basicOps() throws Exception {
        // SET(异步)
        Future<Boolean> f = mc.set("user:1001", 3600, "{\"name\":\"Alice\"}");
        f.get(5, TimeUnit.SECONDS);  // 等待完成

        // GET
        Object value = mc.get("user:1001");
        System.out.println(value);  // {"name":"Alice"}

        // DELETE
        mc.delete("user:1001");

        // INCR / DECR
        mc.set("counter", 0, "0");
        mc.incr("counter", 5);   // 5
        mc.decr("counter", 2);   // 3

        // CAS
        CASValue<Object> casValue = mc.gets("balance");
        mc.cas("balance", 0, "200", casValue.getCas());

        // 批量获取
        Future<Map<String, Object>> bulkFuture = mc.asyncGetBulk(
            "user:1001", "user:1002", "user:1003"
        );
        Map<String, Object> results = bulkFuture.get(5, TimeUnit.SECONDS);
    }

    public void shutdown() {
        mc.shutdown();
    }
}

Java 高级配置

import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationTimeoutException;

public class MemcachedConfig {
    public static MemcachedClient createClient() throws Exception {
        ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
            .setHashAlg(HashAlgorithm.KETAMA_HASH)
            .setLocatorType(ConnectionFactoryBuilder.LocatorType.CONSISTENT)
            .setFailureMode(FailureMode.Redistribute)
            .setOpTimeout(1000)                     // 操作超时 1s
            .setShouldOptimize(true)                // 启用优化
            .setReadBufferSize(16384)               // 读缓冲区
            .setReadOpQueueFactory(
                new LinkedOperationQueueFactory(16384)
            )
            .setWriteOpQueueFactory(
                new LinkedOperationQueueFactory(16384)
            )
            .setAuthDescriptor(null)                // SASL 认证(如需要)
            .setMaxReconnectDelay(30)               // 最大重连延迟
            .setEnableHeuristicQueue(false)         // 禁用启发式队列
            .setShouldOptimize(true);

        return new MemcachedClient(
            builder.build(),
            AddrUtil.getAddresses("mc1:11211 mc2:11211 mc3:11211")
        );
    }
}

15.4 Go

安装

go get github.com/bradfitz/gomemcache/memcache

基本使用

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/bradfitz/gomemcache/memcache"
)

type User struct {
    ID    int      `json:"id"`
    Name  string   `json:"name"`
    Roles []string `json:"roles"`
}

func main() {
    // 创建客户端(内置一致性哈希)
    mc := memcache.New("mc1:11211", "mc2:11211", "mc3:11211")
    mc.Timeout = 100 * time.Millisecond
    mc.MaxIdleConns = 50

    // SET
    user := User{ID: 1001, Name: "Alice", Roles: []string{"admin"}}
    data, _ := json.Marshal(user)
    err := mc.Set(&memcache.Item{
        Key:        "user:1001",
        Value:      data,
        Expiration: 3600,
    })
    if err != nil {
        log.Fatal(err)
    }

    // GET
    item, err := mc.Get("user:1001")
    if err != nil {
        if err == memcache.ErrCacheMiss {
            fmt.Println("Key 不存在")
        }
        log.Fatal(err)
    }
    var u User
    json.Unmarshal(item.Value, &u)
    fmt.Printf("User: %+v\n", u)

    // DELETE
    mc.Delete("user:1001")

    // INCR
    mc.Set(&memcache.Item{Key: "counter", Value: []byte("0")})
    newVal, _ := mc.Increment("counter", 5)
    fmt.Println(newVal) // 5

    // DECR
    mc.Decrement("counter", 2) // 3

    // 批量获取(gomemcache 不直接支持,需并发)
    keys := []string{"user:1001", "user:1002", "user:1003"}
    results := batchGet(mc, keys)
    for k, v := range results {
        fmt.Printf("%s: %s\n", k, v)
    }
}

func batchGet(mc *memcache.Client, keys []string) map[string]string {
    result := make(map[string]string)
    type kv struct {
        Key   string
        Value string
    }
    ch := make(chan kv, len(keys))

    for _, key := range keys {
        go func(k string) {
            item, err := mc.Get(k)
            if err == nil {
                ch <- kv{k, string(item.Value)}
            } else {
                ch <- kv{k, ""}
            }
        }(key)
    }

    for range keys {
        r := <-ch
        if r.Value != "" {
            result[r.Key] = r.Value
        }
    }
    return result
}

15.5 Python

安装

# pymemcache(推荐)
pip install pymemcache

# python-memcached(旧版)
pip install python-memcached

# bmemcached(支持 SASL)
pip install python-binary-memcached

pymemcache(推荐)

from pymemcache.client.base import Client
from pymemcache.client.hash import HashClient
import json

# 单机客户端
mc = Client(('localhost', 11211))

# 集群客户端(内置一致性哈希)
mc = HashClient([
    ('mc1', 11211),
    ('mc2', 11211),
    ('mc3', 11211),
], timeout=1.0, connect_timeout=1.0)

# SET
mc.set('user:1001', json.dumps({'name': 'Alice'}), expire=3600)

# GET
data = mc.get('user:1001')
if data:
    user = json.loads(data)
    print(user['name'])

# DELETE
mc.delete('user:1001')

# INCR / DECR
mc.set('counter', 0)
mc.incr('counter', 5)    # 5
mc.decr('counter', 2)    # 3

# 批量操作
data = {
    'user:1001': json.dumps({'name': 'Alice'}),
    'user:1002': json.dumps({'name': 'Bob'}),
}
mc.set_many(data, expire=3600)

results = mc.get_many(['user:1001', 'user:1002', 'user:1003'])
for key, value in results.items():
    print(key, json.loads(value))

mc.delete_many(['user:1001', 'user:1002'])

# CAS
@mc.cas('balance', expire=60)
def update_balance(current):
    if current is None:
        return '100'
    return str(int(current) + 10)

Python 完整封装

#!/usr/bin/env python3
"""Memcached 客户端封装(生产级)"""

import json
import logging
from typing import Any, Optional, List, Dict
from pymemcache.client.hash import HashClient
from pymemcache.serde import pickle_serde, compressed_serde

logger = logging.getLogger(__name__)


class CacheClient:
    """Memcached 客户端封装"""

    def __init__(self, servers: List[str], namespace: str = 'app',
                 default_ttl: int = 3600, compress_threshold: int = 1024):
        self.namespace = namespace
        self.default_ttl = default_ttl

        # 解析服务器地址
        server_list = []
        for s in servers:
            if ':' in s:
                host, port = s.split(':')
                server_list.append((host, int(port)))
            else:
                server_list.append((s, 11211))

        self.mc = HashClient(
            server_list,
            timeout=1.0,
            connect_timeout=1.0,
            retry_attempts=2,
            retry_timeout=1,
            dead_timeout=5,
            serde=compressed_serde(compress_threshold),
        )

    def _key(self, key: str) -> str:
        return f"{self.namespace}:{key}"

    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        try:
            data = self.mc.get(self._key(key))
            if data is not None:
                return json.loads(data)
            return None
        except Exception as e:
            logger.error(f"Cache get error: {key}, {e}")
            return None

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """设置缓存"""
        try:
            ttl = ttl or self.default_ttl
            data = json.dumps(value, ensure_ascii=False)
            return self.mc.set(self._key(key), data, expire=ttl)
        except Exception as e:
            logger.error(f"Cache set error: {key}, {e}")
            return False

    def delete(self, key: str) -> bool:
        """删除缓存"""
        try:
            return self.mc.delete(self._key(key))
        except Exception as e:
            logger.error(f"Cache delete error: {key}, {e}")
            return False

    def get_multi(self, keys: List[str]) -> Dict[str, Any]:
        """批量获取"""
        try:
            prefixed_keys = [self._key(k) for k in keys]
            results = self.mc.get_many(prefixed_keys)
            return {
                k.replace(f"{self.namespace}:", ""): json.loads(v)
                for k, v in results.items()
            }
        except Exception as e:
            logger.error(f"Cache get_multi error: {e}")
            return {}

    def set_multi(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
        """批量设置"""
        try:
            ttl = ttl or self.default_ttl
            prefixed = {
                self._key(k): json.dumps(v, ensure_ascii=False)
                for k, v in data.items()
            }
            return self.mc.set_many(prefixed, expire=ttl)
        except Exception as e:
            logger.error(f"Cache set_multi error: {e}")
            return False

    def incr(self, key: str, delta: int = 1, default: int = 0, ttl: Optional[int] = None) -> int:
        """原子自增"""
        try:
            full_key = self._key(key)
            result = self.mc.incr(full_key, delta)
            if result is None:
                self.mc.set(full_key, str(default + delta), expire=ttl or self.default_ttl)
                return default + delta
            return result
        except Exception as e:
            logger.error(f"Cache incr error: {key}, {e}")
            return default

    def cache_aside(self, key: str, loader, ttl: Optional[int] = None) -> Any:
        """Cache Aside 模式"""
        data = self.get(key)
        if data is not None:
            return data
        data = loader()
        if data is not None:
            self.set(key, data, ttl)
        return data

    def stats(self) -> Dict[str, str]:
        """获取统计信息"""
        try:
            return self.mc.stats()
        except Exception as e:
            logger.error(f"Cache stats error: {e}")
            return {}


# 使用示例
cache = CacheClient(
    servers=['mc1:11211', 'mc2:11211', 'mc3:11211'],
    namespace='myapp',
    default_ttl=3600,
)

# Cache Aside 模式
def get_user(user_id):
    return cache.cache_aside(
        f"user:{user_id}",
        lambda: db.query_user(user_id),
        ttl=1800,
    )

# 设置/获取
cache.set("config:debug", True, ttl=300)
debug = cache.get("config:debug")

# 批量
cache.set_multi({"a": 1, "b": 2}, ttl=60)
results = cache.get_multi(["a", "b", "c"])

15.6 Node.js

安装

npm install memjs
# 或
npm install memcached

memjs 示例

const memjs = require('memjs');

// 创建客户端
const mc = memjs.Client.create('mc1:11211,mc2:11211', {
    timeout: 1,
    retries: 2,
    failover: true,
});

// SET
async function setKey() {
    await mc.set('user:1001', JSON.stringify({ name: 'Alice' }), { expires: 3600 });
}

// GET
async function getKey() {
    const { value } = await mc.get('user:1001');
    if (value) {
        const user = JSON.parse(value.toString());
        console.log(user.name);
    }
}

// DELETE
async function deleteKey() {
    await mc.delete('user:1001');
}

// 关闭
mc.close();

15.7 客户端选型指南

选择客户端:

PHP 项目?
  → php-memcached(PECL)✓

Java 项目?
  → Spymemcached ✓ 或 Xmemcached(NIO)

Go 项目?
  → gomemcache(官方推荐)✓

Python 项目?
  → pymemcache(推荐)✓
  → bmemcached(需要 SASL)

Node.js 项目?
  → memjs ✓

需要 SASL 认证?
  → PHP: php-memcached
  → Python: bmemcached
  → Java: Spymemcached
  → C/C++: libmemcached --enable-sasl

需要压缩?
  → PHP: php-memcached(内置)
  → Python: pymemcache serde
  → Java: Spymemcached(自定义 Transcoder)

扩展阅读

小结

语言推荐库关键特性
PHPphp-memcached内置一致性哈希、压缩、二进制协议
JavaSpymemcached异步操作、Ketama 一致性哈希
Gogomemcache轻量、内置一致性哈希
PythonpymemcacheHashClient、serde、生产级
Node.jsmemjs简单易用、failover 支持