第 6 章:集群搭建与管理
第 6 章:集群搭建与管理
本章介绍如何搭建 dqlite 多节点集群,包括节点管理、Leader 选举、故障转移和成员变更。
6.1 集群架构总览
dqlite 集群由多个节点组成,通过 Raft 协议保证数据一致性:
┌───────────────────────────────────────────────────┐
│ dqlite Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Node 1 │◀───▶│ Node 2 │◀───▶│ Node 3 │ │
│ │ (Leader) │ │(Follower)│ │(Follower) │ │
│ │ :9001 │ │ :9002 │ │ :9003 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │
│ │ Raft Consensus │ │
│ │ ┌──────────────────┐ │ │
│ └────▶│ Log Replication │◀────────────┘ │
│ └──────────────────┘ │
│ │
│ Client ──────▶ Leader (写) │
│ Client ──────▶ Any Node (读,可配置) │
└───────────────────────────────────────────────────┘
6.1.1 节点角色
| 角色 | 说明 | 数量 |
|---|---|---|
| Leader | 处理所有写请求,协调日志复制 | 1 |
| Follower | 接收并复制 Leader 的日志 | N-1 |
| Candidate | 正在竞选 Leader 的临时状态 | 0-1 |
6.1.2 集群规模建议
| 场景 | 推荐节点数 | 可用性 |
|---|---|---|
| 开发测试 | 1 | 无冗余 |
| 小型生产 | 3 | 可容忍 1 节点故障 |
| 中型生产 | 5 | 可容忍 2 节点故障 |
| 高可用 | 7 | 可容忍 3 节点故障 |
6.2 三节点集群搭建
6.2.1 本地三节点(开发测试)
在同一台机器上启动三个节点:
#!/bin/bash
# start-cluster.sh - 本地三节点集群
DATA_BASE="/tmp/dqlite-cluster"
mkdir -p ${DATA_BASE}/node{1,2,3}
# 启动 Node 1
echo "Starting Node 1..."
./dqlite-node 1 "127.0.0.1:9001" "${DATA_BASE}/node1" &
PID1=$!
sleep 2
# 启动 Node 2
echo "Starting Node 2..."
./dqlite-node 2 "127.0.0.1:9002" "${DATA_BASE}/node2" &
PID2=$!
sleep 2
# 启动 Node 3
echo "Starting Node 3..."
./dqlite-node 3 "127.0.0.1:9003" "${DATA_BASE}/node3" &
PID3=$!
sleep 2
echo "Cluster started:"
echo " Node 1: PID=$PID1, address=127.0.0.1:9001"
echo " Node 2: PID=$PID2, address=127.0.0.1:9002"
echo " Node 3: PID=$PID3, address=127.0.0.1:9003"
# 将节点添加到集群(由第一个节点执行)
# 具体方式取决于 go-dqlite 或 C API
停止集群:
#!/bin/bash
# stop-cluster.sh
kill $PID1 $PID2 $PID3
wait
echo "Cluster stopped"
6.2.2 Go 实现:集群初始化
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
dqlite "github.com/canonical/go-dqlite/v2"
"github.com/canonical/go-dqlite/v2/driver"
)
type ClusterConfig struct {
Nodes []NodeConfig
}
type NodeConfig struct {
ID uint64
Address string
DataDir string
}
func startClusterNode(cfg NodeConfig, allNodes []driver.NodeInfo) (*dqlite.Node, *sql.DB, error) {
// 确保数据目录
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return nil, nil, fmt.Errorf("mkdir: %w", err)
}
// 创建节点
node, err := dqlite.New(cfg.ID, cfg.Address, cfg.DataDir, nil)
if err != nil {
return nil, nil, fmt.Errorf("new node: %w", err)
}
// 创建驱动(需要知道所有节点地址)
store := driver.NewInmemNodeStore()
store.Set(context.Background(), allNodes)
drv, err := driver.New(store)
if err != nil {
node.Close()
return nil, nil, fmt.Errorf("new driver: %w", err)
}
db := sql.OpenDB(drv)
db.SetMaxOpenConns(5)
return node, db, nil
}
func main() {
// 集群配置
nodes := []NodeConfig{
{ID: 1, Address: "127.0.0.1:9001", DataDir: "/tmp/dqlite-cluster/node1"},
{ID: 2, Address: "127.0.0.1:9002", DataDir: "/tmp/dqlite-cluster/node2"},
{ID: 3, Address: "127.0.0.1:9003", DataDir: "/tmp/dqlite-cluster/node3"},
}
allNodeInfos := []driver.NodeInfo{
{ID: 1, Address: "127.0.0.1:9001"},
{ID: 2, Address: "127.0.0.1:9002"},
{ID: 3, Address: "127.0.0.1:9003"},
}
// 启动所有节点
var (
nodeInstances []*dqlite.Node
databases []*sql.DB
)
for _, cfg := range nodes {
node, db, err := startClusterNode(cfg, allNodeInfos)
if err != nil {
log.Fatalf("Failed to start node %d: %v", cfg.ID, err)
}
nodeInstances = append(nodeInstances, node)
databases = append(databases, db)
fmt.Printf("Node %d started on %s\n", cfg.ID, cfg.Address)
}
defer func() {
for i, node := range nodeInstances {
databases[i].Close()
node.Close()
}
}()
// 等待集群稳定
time.Sleep(3 * time.Second)
// 使用第一个节点的数据库进行操作
db := databases[0]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Fatalf("Ping failed: %v", err)
}
// 创建测试表
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS cluster_test (
id INTEGER PRIMARY KEY,
node_id INTEGER,
created_at TEXT DEFAULT (datetime('now'))
)`)
if err != nil {
log.Fatalf("Create table failed: %v", err)
}
fmt.Println("Cluster is ready!")
// 等待信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
fmt.Println("\nShutting down cluster...")
}
6.2.3 多机器集群部署
在三台机器上分别部署:
机器 1 (192.168.1.101):
Node ID: 1
Address: 192.168.1.101:9001
DataDir: /var/lib/dqlite
机器 2 (192.168.1.102):
Node ID: 2
Address: 192.168.1.102:9001
DataDir: /var/lib/dqlite
机器 3 (192.168.1.103):
Node ID: 3
Address: 192.168.1.103:9001
DataDir: /var/lib/dqlite
每台机器的 systemd 服务文件:
# /etc/systemd/system/dqlite.service
[Unit]
Description=dqlite Node
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=dqlite
Group=dqlite
ExecStart=/usr/local/bin/dqlite-node --id %i --address :9001 --data-dir /var/lib/dqlite
Restart=always
RestartSec=5
LimitNOFILE=65536
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/var/lib/dqlite
[Install]
WantedBy=multi-user.target
6.3 Leader 选举
6.3.1 选举触发条件
| 触发条件 | 说明 |
|---|---|
| 初始启动 | 集群首次启动时 |
| Leader 崩溃 | Follower 超时未收到心跳 |
| 网络分区 | Leader 在少数派中 |
| 手动转移 | 管理员主动触发 Leader 转移 |
6.3.2 选举时间线
t=0ms Leader 崩溃(或网络断开)
t=150ms Follower A 选举超时,转为 Candidate
t=151ms Candidate A 增加任期,投自己一票
t=152ms Candidate A 向 B、C 发送 RequestVote
t=153ms B 投赞成票(A 的日志最新)
t=154ms C 投赞成票
t=155ms A 获得多数票,成为 Leader
t=156ms A 开始发送心跳
总耗时:约 150-300ms(取决于随机超时)
6.3.3 手动 Leader 转移
在计划性维护前,建议先转移 Leader:
// 通过 SQL 执行 Leader 转移(如果 go-dqlite 支持)
// 注意:具体的 API 可能随版本变化
// 方式 1:停止当前 Leader,集群自动选举新 Leader
func gracefulLeaderTransfer(db *sql.DB) error {
// 1. 停止接受新写入
_, err := db.Exec("PRAGMA dqlite_stop_writes")
if err != nil {
return err
}
// 2. 等待所有写入完成
time.Sleep(2 * time.Second)
// 3. 新的 Leader 会在选举超时后自动产生
return nil
}
// 方式 2:使用 dqlite 客户端的 Transfer 命令
// (需要连接到当前 Leader)
6.3.4 选举问题排查
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 无法选出 Leader | 网络不通或多数节点故障 | 检查网络连通性 |
| 频繁 Leader 切换 | 网络延迟或不稳定 | 调整选举超时参数 |
| 选举时间过长 | 时钟不同步 | 使用 NTP 同步时钟 |
| 脑裂 | 网络分区(Raft 保证不会发生) | 检查是否有误配置 |
6.4 故障转移
6.4.1 自动故障转移
当 Leader 故障时,dqlite 自动处理:
正常状态:
Node 1 (Leader) ──▶ Node 2 (Follower)
│
└──────────▶ Node 3 (Follower)
Node 1 故障后:
✗ Node 1 (Down)
Node 2 (Candidate) ──▶ 选举 ──▶ Node 2 (New Leader)
│
└──────────────▶ Node 3 (Follower)
6.4.2 故障转移时间
| 阶段 | 耗时 | 说明 |
|---|---|---|
| 故障检测 | 150-300ms | 选举超时 |
| 选举 | 1-10ms | 投票过程 |
| 日志同步 | 0-100ms | 新 Leader 确认日志最新 |
| 总计 | 150-400ms | 从故障到新 Leader 就绪 |
6.4.3 客户端处理故障转移
// 客户端需要处理 Leader 切换期间的错误
func executeWithFailover(ctx context.Context, dbs []*sql.DB, query string, args ...interface{}) error {
var lastErr error
for _, db := range dbs {
_, err := db.ExecContext(ctx, query, args...)
if err == nil {
return nil
}
// 如果是 Leader 相关错误,尝试下一个节点
if isLeaderError(err) {
lastErr = err
continue
}
// 其他错误直接返回
return err
}
return fmt.Errorf("all nodes failed: %w", lastErr)
}
func isLeaderError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "server is not the leader") ||
strings.Contains(msg, "no leader") ||
strings.Contains(msg, "connection refused")
}
6.4.4 健康检查
// 集群健康检查
type ClusterHealth struct {
NodeID uint64
Address string
IsLeader bool
IsHealthy bool
Lag time.Duration
}
func checkClusterHealth(nodeInfos []driver.NodeInfo) []ClusterHealth {
var results []ClusterHealth
for _, info := range nodeInfos {
health := ClusterHealth{
NodeID: info.ID,
Address: info.Address,
}
// 尝试连接
store := driver.NewInmemNodeStore()
store.Set(context.Background(), []driver.NodeInfo{info})
drv, err := driver.New(store)
if err != nil {
health.IsHealthy = false
results = append(results, health)
continue
}
db := sql.OpenDB(drv)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
start := time.Now()
err = db.PingContext(ctx)
health.Lag = time.Since(start)
health.IsHealthy = (err == nil)
db.Close()
drv.Close()
cancel()
results = append(results, health)
}
return results
}
6.5 成员变更
6.5.1 添加节点
// 添加新节点到集群
func addNodeToCluster(ctx context.Context, leaderDB *sql.DB,
newNodeID uint64, newNodeAddr string) error {
// 注意:具体的 SQL 或 API 调用取决于 go-dqlite 版本
// 以下为概念性代码
// 使用 dqlite 客户端的 Add 命令
// 在实际使用中,需要通过 go-dqlite 的 Client API
log.Printf("Adding node %d at %s to cluster...", newNodeID, newNodeAddr)
// 方式 1:通过 go-dqlite Client API
// client.Add(ctx, dqlite.NodeInfo{ID: newNodeID, Address: newNodeAddr})
// 方式 2:通过自定义 SQL 命令
// _, err := leaderDB.Exec("ALTER CLUSTER ADD NODE ?", newNodeID)
return nil
}
6.5.2 移除节点
// 从集群移除节点
func removeNodeFromCluster(ctx context.Context, leaderDB *sql.DB,
nodeID uint64) error {
log.Printf("Removing node %d from cluster...", nodeID)
// 类似添加节点,通过客户端 API 执行
return nil
}
6.5.3 成员变更脚本
#!/bin/bash
# cluster-manage.sh - 集群成员管理脚本
set -euo pipefail
ACTION="$1"
NODE_ID="$2"
NODE_ADDR="${3:-}"
case "$ACTION" in
add)
if [ -z "$NODE_ADDR" ]; then
echo "Usage: $0 add <node-id> <node-address>"
exit 1
fi
echo "Adding node $NODE_ID ($NODE_ADDR) to cluster..."
# 通过 go-dqlite CLI 或 API 添加
# go-dqlite cluster add $NODE_ID $NODE_ADDR
;;
remove)
echo "Removing node $NODE_ID from cluster..."
# go-dqlite cluster remove $NODE_ID
;;
list)
echo "Listing cluster members..."
# go-dqlite cluster list
;;
*)
echo "Usage: $0 {add|remove|list} <node-id> [node-address]"
exit 1
;;
esac
6.5.4 成员变更最佳实践
| 规则 | 说明 |
|---|---|
| 一次只变更一个节点 | 避免 Quorum 计算混乱 |
| 先添加再移除 | 替换节点时,先加入新节点再移除旧节点 |
| 等待同步完成 | 新节点完全同步后再进行下一次变更 |
| 保持奇数节点 | 3→5→3 而不是 3→4→3 |
| 变更前备份 | 关键操作前创建快照 |
6.6 集群运维命令
6.6.1 检查集群状态
// 获取集群信息
func getClusterInfo(db *sql.DB) error {
// 查询集群信息(具体 SQL 取决于版本)
rows, err := db.Query("SELECT * FROM dqlite_cluster")
if err != nil {
return err
}
defer rows.Close()
fmt.Println("=== Cluster Info ===")
// 遍历结果...
return nil
}
6.6.2 查看 Raft 日志状态
// 查看 Raft 状态
type RaftStatus struct {
Term uint64
CommitIndex uint64
LastLogIndex uint64
Role string
}
6.7 常见集群问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 集群无法启动 | 节点配置不一致 | 检查所有节点配置 |
| 写入失败 | Leader 不可达 | 检查集群健康状态 |
| 数据不一致 | 时钟偏差过大 | 使用 NTP 同步时钟 |
| 节点反复加入退出 | 网络不稳定 | 检查网络质量 |
| 日志增长过快 | 快照未触发 | 调整快照阈值 |
| 新节点同步慢 | 数据量大 | 使用快照而非日志重放 |
本章小结
| 要点 | 说明 |
|---|---|
| 集群规模 | 推荐 3、5、7 个节点(奇数) |
| Leader 选举 | 150-300ms 超时后自动选举 |
| 故障转移 | 自动,150-400ms 完成 |
| 成员变更 | 一次一个节点,等待同步完成 |
| 客户端处理 | 需要实现重试和 Leader 发现 |
下一章
→ 第 7 章:性能优化 — 学习批量写入、读优化、同步策略和日志压缩等性能优化技巧。