强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 6 章:集群搭建与管理

第 6 章:集群搭建与管理

本章介绍如何搭建 dqlite 多节点集群,包括节点管理、Leader 选举、故障转移和成员变更。


6.1 集群架构总览

dqlite 集群由多个节点组成,通过 Raft 协议保证数据一致性:

┌───────────────────────────────────────────────────┐
│                dqlite Cluster                      │
│                                                    │
│   ┌──────────┐     ┌──────────┐     ┌──────────┐ │
│   │  Node 1  │◀───▶│  Node 2  │◀───▶│  Node 3  │ │
│   │ (Leader) │     │(Follower)│     │(Follower) │ │
│   │  :9001   │     │  :9002   │     │  :9003   │ │
│   └──────────┘     └──────────┘     └──────────┘ │
│        │                                      │    │
│        │         Raft Consensus               │    │
│        │     ┌──────────────────┐             │    │
│        └────▶│  Log Replication │◀────────────┘    │
│              └──────────────────┘                  │
│                                                    │
│   Client ──────▶ Leader (写)                       │
│   Client ──────▶ Any Node (读,可配置)              │
└───────────────────────────────────────────────────┘

6.1.1 节点角色

角色说明数量
Leader处理所有写请求,协调日志复制1
Follower接收并复制 Leader 的日志N-1
Candidate正在竞选 Leader 的临时状态0-1

6.1.2 集群规模建议

场景推荐节点数可用性
开发测试1无冗余
小型生产3可容忍 1 节点故障
中型生产5可容忍 2 节点故障
高可用7可容忍 3 节点故障

6.2 三节点集群搭建

6.2.1 本地三节点(开发测试)

在同一台机器上启动三个节点:

#!/bin/bash
# start-cluster.sh - 本地三节点集群

DATA_BASE="/tmp/dqlite-cluster"
mkdir -p ${DATA_BASE}/node{1,2,3}

# 启动 Node 1
echo "Starting Node 1..."
./dqlite-node 1 "127.0.0.1:9001" "${DATA_BASE}/node1" &
PID1=$!

sleep 2

# 启动 Node 2
echo "Starting Node 2..."
./dqlite-node 2 "127.0.0.1:9002" "${DATA_BASE}/node2" &
PID2=$!

sleep 2

# 启动 Node 3
echo "Starting Node 3..."
./dqlite-node 3 "127.0.0.1:9003" "${DATA_BASE}/node3" &
PID3=$!

sleep 2

echo "Cluster started:"
echo "  Node 1: PID=$PID1, address=127.0.0.1:9001"
echo "  Node 2: PID=$PID2, address=127.0.0.1:9002"
echo "  Node 3: PID=$PID3, address=127.0.0.1:9003"

# 将节点添加到集群(由第一个节点执行)
# 具体方式取决于 go-dqlite 或 C API

停止集群:

#!/bin/bash
# stop-cluster.sh
kill $PID1 $PID2 $PID3
wait
echo "Cluster stopped"

6.2.2 Go 实现:集群初始化

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

type ClusterConfig struct {
    Nodes []NodeConfig
}

type NodeConfig struct {
    ID      uint64
    Address string
    DataDir string
}

func startClusterNode(cfg NodeConfig, allNodes []driver.NodeInfo) (*dqlite.Node, *sql.DB, error) {
    // 确保数据目录
    if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
        return nil, nil, fmt.Errorf("mkdir: %w", err)
    }

    // 创建节点
    node, err := dqlite.New(cfg.ID, cfg.Address, cfg.DataDir, nil)
    if err != nil {
        return nil, nil, fmt.Errorf("new node: %w", err)
    }

    // 创建驱动(需要知道所有节点地址)
    store := driver.NewInmemNodeStore()
    store.Set(context.Background(), allNodes)

    drv, err := driver.New(store)
    if err != nil {
        node.Close()
        return nil, nil, fmt.Errorf("new driver: %w", err)
    }

    db := sql.OpenDB(drv)
    db.SetMaxOpenConns(5)

    return node, db, nil
}

func main() {
    // 集群配置
    nodes := []NodeConfig{
        {ID: 1, Address: "127.0.0.1:9001", DataDir: "/tmp/dqlite-cluster/node1"},
        {ID: 2, Address: "127.0.0.1:9002", DataDir: "/tmp/dqlite-cluster/node2"},
        {ID: 3, Address: "127.0.0.1:9003", DataDir: "/tmp/dqlite-cluster/node3"},
    }

    allNodeInfos := []driver.NodeInfo{
        {ID: 1, Address: "127.0.0.1:9001"},
        {ID: 2, Address: "127.0.0.1:9002"},
        {ID: 3, Address: "127.0.0.1:9003"},
    }

    // 启动所有节点
    var (
        nodeInstances []*dqlite.Node
        databases     []*sql.DB
    )

    for _, cfg := range nodes {
        node, db, err := startClusterNode(cfg, allNodeInfos)
        if err != nil {
            log.Fatalf("Failed to start node %d: %v", cfg.ID, err)
        }
        nodeInstances = append(nodeInstances, node)
        databases = append(databases, db)
        fmt.Printf("Node %d started on %s\n", cfg.ID, cfg.Address)
    }

    defer func() {
        for i, node := range nodeInstances {
            databases[i].Close()
            node.Close()
        }
    }()

    // 等待集群稳定
    time.Sleep(3 * time.Second)

    // 使用第一个节点的数据库进行操作
    db := databases[0]
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := db.PingContext(ctx); err != nil {
        log.Fatalf("Ping failed: %v", err)
    }

    // 创建测试表
    _, err := db.Exec(`CREATE TABLE IF NOT EXISTS cluster_test (
        id INTEGER PRIMARY KEY,
        node_id INTEGER,
        created_at TEXT DEFAULT (datetime('now'))
    )`)
    if err != nil {
        log.Fatalf("Create table failed: %v", err)
    }

    fmt.Println("Cluster is ready!")

    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    fmt.Println("\nShutting down cluster...")
}

6.2.3 多机器集群部署

在三台机器上分别部署:

机器 1 (192.168.1.101):
  Node ID: 1
  Address: 192.168.1.101:9001
  DataDir: /var/lib/dqlite

机器 2 (192.168.1.102):
  Node ID: 2
  Address: 192.168.1.102:9001
  DataDir: /var/lib/dqlite

机器 3 (192.168.1.103):
  Node ID: 3
  Address: 192.168.1.103:9001
  DataDir: /var/lib/dqlite

每台机器的 systemd 服务文件:

# /etc/systemd/system/dqlite.service
[Unit]
Description=dqlite Node
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=dqlite
Group=dqlite
ExecStart=/usr/local/bin/dqlite-node --id %i --address :9001 --data-dir /var/lib/dqlite
Restart=always
RestartSec=5
LimitNOFILE=65536

# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/var/lib/dqlite

[Install]
WantedBy=multi-user.target

6.3 Leader 选举

6.3.1 选举触发条件

触发条件说明
初始启动集群首次启动时
Leader 崩溃Follower 超时未收到心跳
网络分区Leader 在少数派中
手动转移管理员主动触发 Leader 转移

6.3.2 选举时间线

t=0ms     Leader 崩溃(或网络断开)
t=150ms   Follower A 选举超时,转为 Candidate
t=151ms   Candidate A 增加任期,投自己一票
t=152ms   Candidate A 向 B、C 发送 RequestVote
t=153ms   B 投赞成票(A 的日志最新)
t=154ms   C 投赞成票
t=155ms   A 获得多数票,成为 Leader
t=156ms   A 开始发送心跳

总耗时:约 150-300ms(取决于随机超时)

6.3.3 手动 Leader 转移

在计划性维护前,建议先转移 Leader:

// 通过 SQL 执行 Leader 转移(如果 go-dqlite 支持)
// 注意:具体的 API 可能随版本变化

// 方式 1:停止当前 Leader,集群自动选举新 Leader
func gracefulLeaderTransfer(db *sql.DB) error {
    // 1. 停止接受新写入
    _, err := db.Exec("PRAGMA dqlite_stop_writes")
    if err != nil {
        return err
    }

    // 2. 等待所有写入完成
    time.Sleep(2 * time.Second)

    // 3. 新的 Leader 会在选举超时后自动产生
    return nil
}

// 方式 2:使用 dqlite 客户端的 Transfer 命令
// (需要连接到当前 Leader)

6.3.4 选举问题排查

问题原因解决方案
无法选出 Leader网络不通或多数节点故障检查网络连通性
频繁 Leader 切换网络延迟或不稳定调整选举超时参数
选举时间过长时钟不同步使用 NTP 同步时钟
脑裂网络分区(Raft 保证不会发生)检查是否有误配置

6.4 故障转移

6.4.1 自动故障转移

当 Leader 故障时,dqlite 自动处理:

正常状态:
  Node 1 (Leader) ──▶ Node 2 (Follower)
         │
         └──────────▶ Node 3 (Follower)

Node 1 故障后:
  ✗ Node 1 (Down)
  
  Node 2 (Candidate) ──▶ 选举 ──▶ Node 2 (New Leader)
         │
         └──────────────▶ Node 3 (Follower)

6.4.2 故障转移时间

阶段耗时说明
故障检测150-300ms选举超时
选举1-10ms投票过程
日志同步0-100ms新 Leader 确认日志最新
总计150-400ms从故障到新 Leader 就绪

6.4.3 客户端处理故障转移

// 客户端需要处理 Leader 切换期间的错误
func executeWithFailover(ctx context.Context, dbs []*sql.DB, query string, args ...interface{}) error {
    var lastErr error

    for _, db := range dbs {
        _, err := db.ExecContext(ctx, query, args...)
        if err == nil {
            return nil
        }

        // 如果是 Leader 相关错误,尝试下一个节点
        if isLeaderError(err) {
            lastErr = err
            continue
        }

        // 其他错误直接返回
        return err
    }

    return fmt.Errorf("all nodes failed: %w", lastErr)
}

func isLeaderError(err error) bool {
    if err == nil {
        return false
    }
    msg := err.Error()
    return strings.Contains(msg, "server is not the leader") ||
           strings.Contains(msg, "no leader") ||
           strings.Contains(msg, "connection refused")
}

6.4.4 健康检查

// 集群健康检查
type ClusterHealth struct {
    NodeID    uint64
    Address   string
    IsLeader  bool
    IsHealthy bool
    Lag       time.Duration
}

func checkClusterHealth(nodeInfos []driver.NodeInfo) []ClusterHealth {
    var results []ClusterHealth

    for _, info := range nodeInfos {
        health := ClusterHealth{
            NodeID:  info.ID,
            Address: info.Address,
        }

        // 尝试连接
        store := driver.NewInmemNodeStore()
        store.Set(context.Background(), []driver.NodeInfo{info})

        drv, err := driver.New(store)
        if err != nil {
            health.IsHealthy = false
            results = append(results, health)
            continue
        }

        db := sql.OpenDB(drv)
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)

        start := time.Now()
        err = db.PingContext(ctx)
        health.Lag = time.Since(start)

        health.IsHealthy = (err == nil)

        db.Close()
        drv.Close()
        cancel()

        results = append(results, health)
    }

    return results
}

6.5 成员变更

6.5.1 添加节点

// 添加新节点到集群
func addNodeToCluster(ctx context.Context, leaderDB *sql.DB,
    newNodeID uint64, newNodeAddr string) error {

    // 注意:具体的 SQL 或 API 调用取决于 go-dqlite 版本
    // 以下为概念性代码

    // 使用 dqlite 客户端的 Add 命令
    // 在实际使用中,需要通过 go-dqlite 的 Client API

    log.Printf("Adding node %d at %s to cluster...", newNodeID, newNodeAddr)

    // 方式 1:通过 go-dqlite Client API
    // client.Add(ctx, dqlite.NodeInfo{ID: newNodeID, Address: newNodeAddr})

    // 方式 2:通过自定义 SQL 命令
    // _, err := leaderDB.Exec("ALTER CLUSTER ADD NODE ?", newNodeID)

    return nil
}

6.5.2 移除节点

// 从集群移除节点
func removeNodeFromCluster(ctx context.Context, leaderDB *sql.DB,
    nodeID uint64) error {

    log.Printf("Removing node %d from cluster...", nodeID)

    // 类似添加节点,通过客户端 API 执行

    return nil
}

6.5.3 成员变更脚本

#!/bin/bash
# cluster-manage.sh - 集群成员管理脚本

set -euo pipefail

ACTION="$1"
NODE_ID="$2"
NODE_ADDR="${3:-}"

case "$ACTION" in
    add)
        if [ -z "$NODE_ADDR" ]; then
            echo "Usage: $0 add <node-id> <node-address>"
            exit 1
        fi
        echo "Adding node $NODE_ID ($NODE_ADDR) to cluster..."
        # 通过 go-dqlite CLI 或 API 添加
        # go-dqlite cluster add $NODE_ID $NODE_ADDR
        ;;
    remove)
        echo "Removing node $NODE_ID from cluster..."
        # go-dqlite cluster remove $NODE_ID
        ;;
    list)
        echo "Listing cluster members..."
        # go-dqlite cluster list
        ;;
    *)
        echo "Usage: $0 {add|remove|list} <node-id> [node-address]"
        exit 1
        ;;
esac

6.5.4 成员变更最佳实践

规则说明
一次只变更一个节点避免 Quorum 计算混乱
先添加再移除替换节点时,先加入新节点再移除旧节点
等待同步完成新节点完全同步后再进行下一次变更
保持奇数节点3→5→3 而不是 3→4→3
变更前备份关键操作前创建快照

6.6 集群运维命令

6.6.1 检查集群状态

// 获取集群信息
func getClusterInfo(db *sql.DB) error {
    // 查询集群信息(具体 SQL 取决于版本)
    rows, err := db.Query("SELECT * FROM dqlite_cluster")
    if err != nil {
        return err
    }
    defer rows.Close()

    fmt.Println("=== Cluster Info ===")
    // 遍历结果...
    return nil
}

6.6.2 查看 Raft 日志状态

// 查看 Raft 状态
type RaftStatus struct {
    Term         uint64
    CommitIndex  uint64
    LastLogIndex uint64
    Role         string
}

6.7 常见集群问题

问题原因解决方案
集群无法启动节点配置不一致检查所有节点配置
写入失败Leader 不可达检查集群健康状态
数据不一致时钟偏差过大使用 NTP 同步时钟
节点反复加入退出网络不稳定检查网络质量
日志增长过快快照未触发调整快照阈值
新节点同步慢数据量大使用快照而非日志重放

本章小结

要点说明
集群规模推荐 3、5、7 个节点(奇数)
Leader 选举150-300ms 超时后自动选举
故障转移自动,150-400ms 完成
成员变更一次一个节点,等待同步完成
客户端处理需要实现重试和 Leader 发现

下一章

第 7 章:性能优化 — 学习批量写入、读优化、同步策略和日志压缩等性能优化技巧。