第 15 章:多语言客户端
第 15 章:多语言客户端
15.1 客户端库总览
| 语言 | 主流库 | 协议 | 一致性哈希 | 批量操作 | 推荐 |
|---|
| PHP | php-memcached | 文本+二进制 | ✅ Ketama | ✅ | ★★★★★ |
| Java | SpyMemcached | 文本+二进制 | ✅ Ketama | ✅ | ★★★★★ |
| Go | gomemcache | 文本 | ✅ | 并发 | ★★★★ |
| Python | pymemcache | 文本 | ✅ | ✅ | ★★★★★ |
| Python | python-memcached | 文本 | ❌ | ✅ | ★★★ |
| Node.js | memjs | 文本 | ✅ | ✅ | ★★★★ |
| Ruby | dalli | 文本 | ✅ | ✅ | ★★★★ |
| C/C++ | libmemcached | 文本+二进制 | ✅ Ketama | ✅ | ★★★★★ |
| .NET | EnyimMemcached | 文本+二进制 | ✅ | ✅ | ★★★★ |
15.2 PHP
安装
# PECL 安装
sudo pecl install memcached
# 启用扩展
echo "extension=memcached.so" | sudo tee /etc/php/mods-available/memcached.ini
sudo phpenmod memcached
# 验证
php -m | grep memcached
基本使用
<?php
// 创建客户端(持久化连接)
$mc = new Memcached('pool_name');
$mc->addServer('localhost', 11211);
// SET
$mc->set('user:1001', ['name' => 'Alice', 'age' => 30], 3600);
// GET
$user = $mc->get('user:1001');
if ($user === false) {
// 未命中或出错
$code = $mc->getResultCode();
if ($code === Memcached::RES_NOTFOUND) {
echo "Key 不存在";
}
}
// DELETE
$mc->delete('user:1001');
// INCR / DECR
$mc->set('counter', 0, 0);
$mc->increment('counter', 5); // 5
$mc->decrement('counter', 2); // 3
// CAS
$mc->setOption(Memcached::OPT_LIBKETAMA_COMPATIBLE, true);
$mc->get('balance', null, $cas_token);
$mc->cas($cas_token, 'balance', 200, 0);
// 批量操作
$keys = ['user:1001', 'user:1002', 'user:1003'];
$users = $mc->getMulti($keys);
// $users = ['user:1001' => ..., 'user:1003' => ...]
$data = [
'user:1001' => ['name' => 'Alice'],
'user:1002' => ['name' => 'Bob'],
];
$mc->setMulti($data, 3600);
$mc->deleteMulti(['user:1001', 'user:1002']);
PHP 高级配置
<?php
$mc = new Memcached('pool_name');
// 一致性哈希
$mc->setOption(Memcached::OPT_LIBKETAMA_COMPATIBLE, true);
$mc->setOption(Memcached::OPT_HASH, Memcached::HASH_MURMUR);
// 压缩阈值(大于 2KB 自动压缩)
$mc->setOption(Memcached::OPT_COMPRESSION_TYPE, Memcached::COMPRESSION_FASTLZ);
$mc->setOption(Memcached::OPT_COMPRESSION_THRESHOLD, 2048);
// 序列化(推荐 igbinary)
$mc->setOption(Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_IGBINARY);
// 超时设置
$mc->setOption(Memcached::OPT_CONNECT_TIMEOUT, 1000); // 连接超时 1s
$mc->setOption(Memcached::OPT_SEND_TIMEOUT, 100000); // 发送超时 100ms
$mc->setOption(Memcached::OPT_RECV_TIMEOUT, 100000); // 接收超时 100ms
$mc->setOption(Memcached::OPT_SERVER_FAILURE_LIMIT, 3); // 失败阈值
// 二进制协议
$mc->setOption(Memcached::OPT_BINARY_PROTOCOL, true);
// 添加服务器(带权重)
$mc->addServers([
['mc1', 11211, 100], // [host, port, weight]
['mc2', 11211, 100],
['mc3', 11211, 50],
]);
PHP Session Handler
<?php
// 将 Session 存储在 Memcached 中
ini_set('session.save_handler', 'memcached');
ini_set('session.save_path', 'mc1:11211,mc2:11211');
ini_set('session.gc_maxlifetime', 1800); // 30 分钟
15.3 Java
依赖
<!-- Maven -->
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>2.12.3</version>
</dependency>
基本使用
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.AddrUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MemcachedExample {
private MemcachedClient mc;
public MemcachedExample() throws Exception {
// 创建客户端
mc = new MemcachedClient(
AddrUtil.getAddresses("mc1:11211 mc2:11211 mc3:11211")
);
}
public void basicOps() throws Exception {
// SET(异步)
Future<Boolean> f = mc.set("user:1001", 3600, "{\"name\":\"Alice\"}");
f.get(5, TimeUnit.SECONDS); // 等待完成
// GET
Object value = mc.get("user:1001");
System.out.println(value); // {"name":"Alice"}
// DELETE
mc.delete("user:1001");
// INCR / DECR
mc.set("counter", 0, "0");
mc.incr("counter", 5); // 5
mc.decr("counter", 2); // 3
// CAS
CASValue<Object> casValue = mc.gets("balance");
mc.cas("balance", 0, "200", casValue.getCas());
// 批量获取
Future<Map<String, Object>> bulkFuture = mc.asyncGetBulk(
"user:1001", "user:1002", "user:1003"
);
Map<String, Object> results = bulkFuture.get(5, TimeUnit.SECONDS);
}
public void shutdown() {
mc.shutdown();
}
}
Java 高级配置
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationTimeoutException;
public class MemcachedConfig {
public static MemcachedClient createClient() throws Exception {
ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setHashAlg(HashAlgorithm.KETAMA_HASH)
.setLocatorType(ConnectionFactoryBuilder.LocatorType.CONSISTENT)
.setFailureMode(FailureMode.Redistribute)
.setOpTimeout(1000) // 操作超时 1s
.setShouldOptimize(true) // 启用优化
.setReadBufferSize(16384) // 读缓冲区
.setReadOpQueueFactory(
new LinkedOperationQueueFactory(16384)
)
.setWriteOpQueueFactory(
new LinkedOperationQueueFactory(16384)
)
.setAuthDescriptor(null) // SASL 认证(如需要)
.setMaxReconnectDelay(30) // 最大重连延迟
.setEnableHeuristicQueue(false) // 禁用启发式队列
.setShouldOptimize(true);
return new MemcachedClient(
builder.build(),
AddrUtil.getAddresses("mc1:11211 mc2:11211 mc3:11211")
);
}
}
15.4 Go
安装
go get github.com/bradfitz/gomemcache/memcache
基本使用
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/bradfitz/gomemcache/memcache"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Roles []string `json:"roles"`
}
func main() {
// 创建客户端(内置一致性哈希)
mc := memcache.New("mc1:11211", "mc2:11211", "mc3:11211")
mc.Timeout = 100 * time.Millisecond
mc.MaxIdleConns = 50
// SET
user := User{ID: 1001, Name: "Alice", Roles: []string{"admin"}}
data, _ := json.Marshal(user)
err := mc.Set(&memcache.Item{
Key: "user:1001",
Value: data,
Expiration: 3600,
})
if err != nil {
log.Fatal(err)
}
// GET
item, err := mc.Get("user:1001")
if err != nil {
if err == memcache.ErrCacheMiss {
fmt.Println("Key 不存在")
}
log.Fatal(err)
}
var u User
json.Unmarshal(item.Value, &u)
fmt.Printf("User: %+v\n", u)
// DELETE
mc.Delete("user:1001")
// INCR
mc.Set(&memcache.Item{Key: "counter", Value: []byte("0")})
newVal, _ := mc.Increment("counter", 5)
fmt.Println(newVal) // 5
// DECR
mc.Decrement("counter", 2) // 3
// 批量获取(gomemcache 不直接支持,需并发)
keys := []string{"user:1001", "user:1002", "user:1003"}
results := batchGet(mc, keys)
for k, v := range results {
fmt.Printf("%s: %s\n", k, v)
}
}
func batchGet(mc *memcache.Client, keys []string) map[string]string {
result := make(map[string]string)
type kv struct {
Key string
Value string
}
ch := make(chan kv, len(keys))
for _, key := range keys {
go func(k string) {
item, err := mc.Get(k)
if err == nil {
ch <- kv{k, string(item.Value)}
} else {
ch <- kv{k, ""}
}
}(key)
}
for range keys {
r := <-ch
if r.Value != "" {
result[r.Key] = r.Value
}
}
return result
}
15.5 Python
安装
# pymemcache(推荐)
pip install pymemcache
# python-memcached(旧版)
pip install python-memcached
# bmemcached(支持 SASL)
pip install python-binary-memcached
pymemcache(推荐)
from pymemcache.client.base import Client
from pymemcache.client.hash import HashClient
import json
# 单机客户端
mc = Client(('localhost', 11211))
# 集群客户端(内置一致性哈希)
mc = HashClient([
('mc1', 11211),
('mc2', 11211),
('mc3', 11211),
], timeout=1.0, connect_timeout=1.0)
# SET
mc.set('user:1001', json.dumps({'name': 'Alice'}), expire=3600)
# GET
data = mc.get('user:1001')
if data:
user = json.loads(data)
print(user['name'])
# DELETE
mc.delete('user:1001')
# INCR / DECR
mc.set('counter', 0)
mc.incr('counter', 5) # 5
mc.decr('counter', 2) # 3
# 批量操作
data = {
'user:1001': json.dumps({'name': 'Alice'}),
'user:1002': json.dumps({'name': 'Bob'}),
}
mc.set_many(data, expire=3600)
results = mc.get_many(['user:1001', 'user:1002', 'user:1003'])
for key, value in results.items():
print(key, json.loads(value))
mc.delete_many(['user:1001', 'user:1002'])
# CAS
@mc.cas('balance', expire=60)
def update_balance(current):
if current is None:
return '100'
return str(int(current) + 10)
Python 完整封装
#!/usr/bin/env python3
"""Memcached 客户端封装(生产级)"""
import json
import logging
from typing import Any, Optional, List, Dict
from pymemcache.client.hash import HashClient
from pymemcache.serde import pickle_serde, compressed_serde
logger = logging.getLogger(__name__)
class CacheClient:
"""Memcached 客户端封装"""
def __init__(self, servers: List[str], namespace: str = 'app',
default_ttl: int = 3600, compress_threshold: int = 1024):
self.namespace = namespace
self.default_ttl = default_ttl
# 解析服务器地址
server_list = []
for s in servers:
if ':' in s:
host, port = s.split(':')
server_list.append((host, int(port)))
else:
server_list.append((s, 11211))
self.mc = HashClient(
server_list,
timeout=1.0,
connect_timeout=1.0,
retry_attempts=2,
retry_timeout=1,
dead_timeout=5,
serde=compressed_serde(compress_threshold),
)
def _key(self, key: str) -> str:
return f"{self.namespace}:{key}"
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
try:
data = self.mc.get(self._key(key))
if data is not None:
return json.loads(data)
return None
except Exception as e:
logger.error(f"Cache get error: {key}, {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""设置缓存"""
try:
ttl = ttl or self.default_ttl
data = json.dumps(value, ensure_ascii=False)
return self.mc.set(self._key(key), data, expire=ttl)
except Exception as e:
logger.error(f"Cache set error: {key}, {e}")
return False
def delete(self, key: str) -> bool:
"""删除缓存"""
try:
return self.mc.delete(self._key(key))
except Exception as e:
logger.error(f"Cache delete error: {key}, {e}")
return False
def get_multi(self, keys: List[str]) -> Dict[str, Any]:
"""批量获取"""
try:
prefixed_keys = [self._key(k) for k in keys]
results = self.mc.get_many(prefixed_keys)
return {
k.replace(f"{self.namespace}:", ""): json.loads(v)
for k, v in results.items()
}
except Exception as e:
logger.error(f"Cache get_multi error: {e}")
return {}
def set_multi(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""批量设置"""
try:
ttl = ttl or self.default_ttl
prefixed = {
self._key(k): json.dumps(v, ensure_ascii=False)
for k, v in data.items()
}
return self.mc.set_many(prefixed, expire=ttl)
except Exception as e:
logger.error(f"Cache set_multi error: {e}")
return False
def incr(self, key: str, delta: int = 1, default: int = 0, ttl: Optional[int] = None) -> int:
"""原子自增"""
try:
full_key = self._key(key)
result = self.mc.incr(full_key, delta)
if result is None:
self.mc.set(full_key, str(default + delta), expire=ttl or self.default_ttl)
return default + delta
return result
except Exception as e:
logger.error(f"Cache incr error: {key}, {e}")
return default
def cache_aside(self, key: str, loader, ttl: Optional[int] = None) -> Any:
"""Cache Aside 模式"""
data = self.get(key)
if data is not None:
return data
data = loader()
if data is not None:
self.set(key, data, ttl)
return data
def stats(self) -> Dict[str, str]:
"""获取统计信息"""
try:
return self.mc.stats()
except Exception as e:
logger.error(f"Cache stats error: {e}")
return {}
# 使用示例
cache = CacheClient(
servers=['mc1:11211', 'mc2:11211', 'mc3:11211'],
namespace='myapp',
default_ttl=3600,
)
# Cache Aside 模式
def get_user(user_id):
return cache.cache_aside(
f"user:{user_id}",
lambda: db.query_user(user_id),
ttl=1800,
)
# 设置/获取
cache.set("config:debug", True, ttl=300)
debug = cache.get("config:debug")
# 批量
cache.set_multi({"a": 1, "b": 2}, ttl=60)
results = cache.get_multi(["a", "b", "c"])
15.6 Node.js
安装
npm install memjs
# 或
npm install memcached
memjs 示例
const memjs = require('memjs');
// 创建客户端
const mc = memjs.Client.create('mc1:11211,mc2:11211', {
timeout: 1,
retries: 2,
failover: true,
});
// SET
async function setKey() {
await mc.set('user:1001', JSON.stringify({ name: 'Alice' }), { expires: 3600 });
}
// GET
async function getKey() {
const { value } = await mc.get('user:1001');
if (value) {
const user = JSON.parse(value.toString());
console.log(user.name);
}
}
// DELETE
async function deleteKey() {
await mc.delete('user:1001');
}
// 关闭
mc.close();
15.7 客户端选型指南
选择客户端:
PHP 项目?
→ php-memcached(PECL)✓
Java 项目?
→ Spymemcached ✓ 或 Xmemcached(NIO)
Go 项目?
→ gomemcache(官方推荐)✓
Python 项目?
→ pymemcache(推荐)✓
→ bmemcached(需要 SASL)
Node.js 项目?
→ memjs ✓
需要 SASL 认证?
→ PHP: php-memcached
→ Python: bmemcached
→ Java: Spymemcached
→ C/C++: libmemcached --enable-sasl
需要压缩?
→ PHP: php-memcached(内置)
→ Python: pymemcache serde
→ Java: Spymemcached(自定义 Transcoder)
扩展阅读
小结
| 语言 | 推荐库 | 关键特性 |
|---|
| PHP | php-memcached | 内置一致性哈希、压缩、二进制协议 |
| Java | Spymemcached | 异步操作、Ketama 一致性哈希 |
| Go | gomemcache | 轻量、内置一致性哈希 |
| Python | pymemcache | HashClient、serde、生产级 |
| Node.js | memjs | 简单易用、failover 支持 |