rqlite 完全指南 / 第 10 章:客户端开发
第 10 章:客户端开发
使用 Python、Go、Java 和 CLI 工具操作 rqlite 集群。
10.1 客户端概览
rqlite 通过 HTTP API 暴露所有功能,因此任何能发送 HTTP 请求的语言都可以作为客户端。官方和社区提供了多种客户端库:
| 语言 | 客户端库 | 维护状态 |
|---|---|---|
| CLI | rqlite(官方) | ✅ 活跃 |
| Python | pyrqlite / requests | ✅ 活跃 |
| Go | rqlite/client(官方) | ✅ 活跃 |
| Java | jqlite / 原生 HTTP | ⚠️ 社区维护 |
| Node.js | 原生 HTTP | — |
| Rust | 原生 HTTP | — |
10.2 CLI 客户端(官方)
10.2.1 安装
# 随 rqlite 一起安装(参见第 2 章)
# rqlite CLI 已包含在发布包中
# 验证
rqlite -h
10.2.2 基本使用
# 连接到本地节点
rqlite
# 连接到远程节点
rqlite 192.168.1.100:4001
# 使用认证
rqlite -u admin:password 192.168.1.100:4001
# 使用 TLS(自签名证书)
rqlite -s -ca /etc/rqlite/certs/ca.crt 192.168.1.100:4001
10.2.3 交互式命令
进入交互模式后,可以直接输入 SQL:
192.168.1.100:4001> CREATE TABLE demo (id INTEGER PRIMARY KEY, name TEXT);
0 rows affected
192.168.1.100:4001> INSERT INTO demo (name) VALUES ("hello");
1 rows affected
192.168.1.100:4001> SELECT * FROM demo;
+----+-------+
| id | name |
+----+-------+
| 1 | hello |
+----+-------+
CLI 支持的特殊命令:
| 命令 | 说明 |
|---|---|
.help | 显示帮助信息 |
.tables | 列出所有表 |
.schema | 显示表结构 |
.status | 显示节点状态 |
.nodes | 显示集群节点 |
.timer on/off | 显示执行时间 |
.consistency <level> | 设置一致性级别 |
.quit | 退出 |
10.3 Python 客户端
10.3.1 使用 requests 库(推荐)
"""
rqlite Python 客户端 — 基于 requests
"""
import json
import requests
from typing import Any, Optional
class RqliteClient:
"""rqlite HTTP API 客户端封装"""
def __init__(
self,
host: str = "localhost",
port: int = 4001,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
timeout: int = 30,
):
self.base_url = f"{scheme}://{host}:{port}"
self.session = requests.Session()
self.session.timeout = timeout
# 认证
if username and password:
self.session.auth = (username, password)
# 连接池配置
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=3,
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def query(
self,
sql: str,
params: Optional[list] = None,
level: str = "weak",
timeout: Optional[str] = None,
) -> dict:
"""执行查询(SELECT)"""
statements = self._build_statements(sql, params)
payload = {
"statements": statements,
"level": level,
}
if timeout:
payload["timeout"] = timeout
resp = self.session.post(
f"{self.base_url}/db/query",
json=payload,
)
resp.raise_for_status()
return resp.json()
def execute(
self,
sql: str,
params: Optional[list] = None,
) -> dict:
"""执行写入(INSERT/UPDATE/DELETE/DDL)"""
statements = self._build_statements(sql, params)
resp = self.session.post(
f"{self.base_url}/db/execute",
json=statements,
)
resp.raise_for_status()
return resp.json()
def batch_execute(self, statements: list[list]) -> dict:
"""批量执行多条语句(同一事务)"""
resp = self.session.post(
f"{self.base_url}/db/execute",
json=statements,
)
resp.raise_for_status()
return resp.json()
def request(self, statements: list[dict]) -> dict:
"""混合请求(查询+执行)"""
resp = self.session.post(
f"{self.base_url}/db/request",
json={"statements": statements},
)
resp.raise_for_status()
return resp.json()
def status(self) -> dict:
"""获取节点状态"""
resp = self.session.get(f"{self.base_url}/status")
resp.raise_for_status()
return resp.json()
def nodes(self) -> dict:
"""获取集群节点列表"""
resp = self.session.get(f"{self.base_url}/nodes")
resp.raise_for_status()
return resp.json()
def backup(self, path: str, fmt: str = "sql") -> None:
"""备份数据库"""
params = {}
if fmt == "binary":
params["fmt"] = "binary"
resp = self.session.get(
f"{self.base_url}/db/backup",
params=params,
stream=True,
)
resp.raise_for_status()
with open(path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
def load(self, path: str) -> dict:
"""加载 SQL dump 恢复数据"""
with open(path, "r") as f:
sql = f.read()
resp = self.session.post(
f"{self.base_url}/db/load",
data=sql,
headers={"Content-Type": "text/plain"},
)
resp.raise_for_status()
return resp.json() if resp.text else {}
def _build_statements(
self, sql: str, params: Optional[list]
) -> list:
"""构建语句列表"""
stmt = {"q": sql}
if params:
stmt["v"] = params
return [stmt]
# ---- 使用示例 ----
if __name__ == "__main__":
client = RqliteClient(
host="localhost",
port=4001,
username="admin",
password="password",
)
# 查看状态
status = client.status()
print(f"Raft State: {status['store']['raft_state']}")
# 创建表
client.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER DEFAULT 0
)
""")
# 批量插入
client.batch_execute([
["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "笔记本", 5999.0, 100],
["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "键盘", 399.0, 200],
["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "鼠标", 99.0, 500],
])
# 查询
result = client.query("SELECT * FROM products WHERE price > ?", params=[100])
for row in result["results"][0].get("values", []):
print(f" {row[1]}: ¥{row[2]}, 库存: {row[3]}")
# 备份
client.backup("/tmp/rqlite_backup.sql")
print("备份完成")
10.3.2 使用 pyrqlite 库
pip install pyrqlite
import pyrqlite.dbapi2 as dbapi
# 连接
conn = dbapi.connect(
host='localhost',
port=4001,
# username='admin',
# password='password',
)
cursor = conn.cursor()
# 执行 SQL
cursor.execute('CREATE TABLE IF NOT EXISTS demo (id INTEGER PRIMARY KEY, name TEXT)')
cursor.execute('INSERT INTO demo (name) VALUES (?)', ('hello',))
conn.commit()
# 查询
cursor.execute('SELECT * FROM demo')
rows = cursor.fetchall()
for row in rows:
print(row)
conn.close()
10.4 Go 客户端
10.4.1 使用官方客户端库
package main
import (
"fmt"
"log"
"time"
rqlite "github.com/rqlite/rqlite-go"
)
func main() {
// 创建客户端
client := rqlite.NewClient("http://localhost:4001")
client.SetBasicAuth("admin", "password")
client.SetTimeout(30 * time.Second)
// 创建表
_, err := client.Execute(`CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`)
if err != nil {
log.Fatal(err)
}
// 插入数据
result, err := client.Execute(`INSERT INTO tasks (title) VALUES (?)`, "完成项目文档")
if err != nil {
log.Fatal(err)
}
fmt.Printf("插入成功, ID: %d\n", result[0].LastInsertID)
// 查询数据
rows, err := client.Query("SELECT * FROM tasks WHERE status = ?", "pending")
if err != nil {
log.Fatal(err)
}
fmt.Printf("查询结果: %d 行\n", len(rows[0].Values))
}
10.4.2 使用标准库
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)
type RqliteClient struct {
BaseURL string
HTTPClient *http.Client
Username string
Password string
}
func NewRqliteClient(host string, port int) *RqliteClient {
return &RqliteClient{
BaseURL: fmt.Sprintf("http://%s:%d", host, port),
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
IdleConnTimeout: 90 * time.Second,
},
},
}
}
func (c *RqliteClient) Execute(stmts []interface{}) (map[string]interface{}, error) {
body, err := json.Marshal(stmts)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", c.BaseURL+"/db/execute", bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if c.Username != "" {
req.SetBasicAuth(c.Username, c.Password)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result, nil
}
func (c *RqliteClient) Query(sql string, level string) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/db/query?q=%s&level=%s", c.BaseURL, sql, level)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if c.Username != "" {
req.SetBasicAuth(c.Username, c.Password)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return result, nil
}
func main() {
client := NewRqliteClient("localhost", 4001)
client.Username = "admin"
client.Password = "password"
// 执行写入
stmts := []interface{}{
[]interface{}{"INSERT INTO tasks (title) VALUES (?)", "学习 Go"},
}
result, err := client.Execute(stmts)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Execute result: %+v\n", result)
// 查询
result, err = client.Query("SELECT * FROM tasks", "weak")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Query result: %+v\n", result)
}
10.5 Java 客户端
package com.example.rqlite;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Base64;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* rqlite Java 客户端
*/
public class RqliteClient {
private final String baseUrl;
private final HttpClient httpClient;
private final ObjectMapper mapper;
private final String authHeader;
public RqliteClient(String host, int port, String username, String password) {
this.baseUrl = String.format("http://%s:%d", host, port);
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
this.mapper = new ObjectMapper();
if (username != null && password != null) {
String credentials = Base64.getEncoder()
.encodeToString((username + ":" + password).getBytes());
this.authHeader = "Basic " + credentials;
} else {
this.authHeader = null;
}
}
/**
* 执行查询
*/
public String query(String sql, String level) throws Exception {
String url = String.format("%s/db/query?q=%s&level=%s",
baseUrl,
java.net.URLEncoder.encode(sql, "UTF-8"),
level);
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET();
if (authHeader != null) {
builder.header("Authorization", authHeader);
}
HttpResponse<String> response = httpClient.send(
builder.build(), HttpResponse.BodyHandlers.ofString());
return response.body();
}
/**
* 执行写入
*/
public String execute(String sql, Object... params) throws Exception {
ArrayNode stmts = mapper.createArrayNode();
ArrayNode stmt = mapper.createArrayNode();
stmt.add(sql);
for (Object param : params) {
stmt.add(param.toString());
}
stmts.add(stmt);
String body = mapper.writeValueAsString(stmts);
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/db/execute"))
.POST(HttpRequest.BodyPublishers.ofString(body))
.header("Content-Type", "application/json");
if (authHeader != null) {
builder.header("Authorization", authHeader);
}
HttpResponse<String> response = httpClient.send(
builder.build(), HttpResponse.BodyHandlers.ofString());
return response.body();
}
public static void main(String[] args) throws Exception {
RqliteClient client = new RqliteClient("localhost", 4001, "admin", "password");
// 创建表
client.execute("CREATE TABLE IF NOT EXISTS items (id INTEGER PRIMARY KEY, name TEXT)");
// 插入
client.execute("INSERT INTO items (name) VALUES (?)", "Java Item");
// 查询
String result = client.query("SELECT * FROM items", "weak");
System.out.println(result);
}
}
10.6 Node.js 客户端
/**
* rqlite Node.js 客户端
*/
const http = require('http');
class RqliteClient {
constructor(host = 'localhost', port = 4001, options = {}) {
this.host = host;
this.port = port;
this.auth = options.auth || null;
this.timeout = options.timeout || 30000;
}
async execute(statements) {
return this._post('/db/execute', statements);
}
async query(sql, params = [], level = 'weak') {
const stmt = { q: sql };
if (params.length > 0) stmt.v = params;
return this._post('/db/query', { statements: [stmt], level });
}
async request(statements) {
return this._post('/db/request', { statements });
}
async status() {
return this._get('/status');
}
async nodes() {
return this._get('/nodes');
}
_get(path) {
return new Promise((resolve, reject) => {
const options = {
hostname: this.host,
port: this.port,
path: path,
method: 'GET',
timeout: this.timeout,
};
if (this.auth) {
options.headers = {
'Authorization': 'Basic ' + Buffer.from(this.auth).toString('base64'),
};
}
const req = http.request(options, (res) => {
let data = '';
res.on('data', chunk => data += chunk);
res.on('end', () => {
try { resolve(JSON.parse(data)); }
catch (e) { resolve(data); }
});
});
req.on('error', reject);
req.end();
});
}
_post(path, body) {
return new Promise((resolve, reject) => {
const data = JSON.stringify(body);
const options = {
hostname: this.host,
port: this.port,
path: path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(data),
},
timeout: this.timeout,
};
if (this.auth) {
options.headers['Authorization'] = 'Basic ' + Buffer.from(this.auth).toString('base64');
}
const req = http.request(options, (res) => {
let data = '';
res.on('data', chunk => data += chunk);
res.on('end', () => {
try { resolve(JSON.parse(data)); }
catch (e) { resolve(data); }
});
});
req.on('error', reject);
req.write(data);
req.end();
});
}
}
// 使用示例
async function main() {
const client = new RqliteClient('localhost', 4001, { auth: 'admin:password' });
// 创建表
await client.execute([
['CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)']
]);
// 插入
await client.execute([
['INSERT INTO notes (content) VALUES (?)', '第一条笔记']
]);
// 查询
const result = await client.query('SELECT * FROM notes ORDER BY id DESC');
console.log(JSON.stringify(result, null, 2));
}
main().catch(console.error);
10.7 客户端对比
| 特性 | CLI | Python (requests) | Go (官方) | Java (原生) | Node.js |
|---|---|---|---|---|---|
| 安装难度 | ⭐ | ⭐ | ⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| 交互式使用 | ✅ | ❌ | ❌ | ❌ | ❌ |
| 参数绑定 | ✅ | ✅ | ✅ | ✅ | ✅ |
| 连接池 | — | ✅ | ✅ | ✅ | ❌(基础实现) |
| 批量操作 | ✅ | ✅ | ✅ | ✅ | ✅ |
| TLS 支持 | ✅ | ✅ | ✅ | ✅ | ✅ |
| 适用场景 | 运维调试 | 脚本/后端 | 后端服务 | 企业应用 | 全栈应用 |
10.8 业务场景:用户注册服务
"""
用户注册服务示例 — 使用 rqlite 存储用户数据
"""
from flask import Flask, request, jsonify
from rqlite_client import RqliteClient
app = Flask(__name__)
db = RqliteClient(host="rqlite-host", port=4001, username="app", password="secure_pass")
# 初始化表
db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email ON users(email)")
@app.route("/register", methods=["POST"])
def register():
data = request.json
username = data.get("username")
email = data.get("email")
password = data.get("password")
if not all([username, email, password]):
return jsonify({"error": "缺少必填字段"}), 400
# 检查用户名是否已存在
existing = db.query(
"SELECT id FROM users WHERE username = ? OR email = ?",
params=[username, email],
level="strong",
)
if existing["results"][0].get("values"):
return jsonify({"error": "用户名或邮箱已存在"}), 409
# 插入新用户
import hashlib
password_hash = hashlib.sha256(password.encode()).hexdigest()
result = db.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
params=[username, email, password_hash],
)
user_id = result["results"][0]["last_insert_id"]
return jsonify({"id": user_id, "username": username, "email": email}), 201
@app.route("/users/<int:user_id>", methods=["GET"])
def get_user(user_id):
result = db.query(
"SELECT id, username, email, created_at FROM users WHERE id = ?",
params=[user_id],
)
values = result["results"][0].get("values")
if not values:
return jsonify({"error": "用户不存在"}), 404
row = values[0]
return jsonify({
"id": row[0],
"username": row[1],
"email": row[2],
"created_at": row[3],
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
10.9 本章小结
| 要点 | 内容 |
|---|---|
| CLI 客户端 | 官方 rqlite 工具,适合运维和调试 |
| Python | requests 库最灵活,pyrqlite 提供 DB-API 2.0 接口 |
| Go | 官方库 rqlite-go,也可使用标准库直接调用 HTTP API |
| Java | 使用 java.net.http 或 Apache HttpClient |
| Node.js | 使用内置 http 模块或 node-fetch |
| 通用原则 | 所有语言都通过 HTTP API 操作 rqlite |
上一章:第 9 章:性能优化 下一章:第 11 章:容器化部署