强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

10 - 复制协议

第 10 章:复制协议

10.1 复制协议概述

MySQL 复制协议(Replication Protocol)是主从复制(Master-Slave Replication)的底层通信协议。从库(Slave/Replica)作为特殊的"客户端"连接主库(Master/Source),请求并接收二进制日志事件。

复制架构

┌──────────────┐       复制协议        ┌──────────────┐
│   Master     │ ───────────────────→ │   Slave      │
│  (Source)    │   Binlog Events      │  (Replica)   │
│              │                       │              │
│  - Binlog    │   COM_BINLOG_DUMP    │  - SQL Thread │
│  - IO Thread │ ←───────────────── │  - IO Thread  │
└──────────────┘                      └──────────────┘

复制流程

Slave                                      Master
  │                                          │
  │ ──── TCP 连接 (端口 3306) ─────────────→ │
  │                                          │
  │ ←─── HandshakeV10 ──────────────────── │
  │ ──── HandshakeResponse (特殊用户) ───→ │
  │ ←─── OK ───────────────────────────── │
  │                                          │
  │ ──── COM_REGISTER_SLAVE ─────────────→ │  注册从库
  │ ←─── OK ───────────────────────────── │
  │                                          │
  │ ──── COM_BINLOG_DUMP ────────────────→ │  请求 Binlog
  │      binlog_file = "mysql-bin.000001"   │
  │      binlog_pos = 4                     │
  │                                          │
  │ ←─── Binlog Event ─────────────────── │  事件流
  │ ←─── Binlog Event ─────────────────── │
  │ ←─── Binlog Event ─────────────────── │
  │       ... (持续接收) ...                │

10.2 COM_REGISTER_SLAVE(0x15)

从库向主库注册自己。

命令格式

偏移量   大小      字段                  说明
────────────────────────────────────────────────
0        1 字节    0x15                 命令类型
1        4 字节    server_id            从库的唯一服务器 ID
5        变长      slaves_hostname      从库主机名 (长度编码)
         变长      slaves_user          用户名 (长度编码)
         变长      slaves_password      密码 (长度编码)
         2 字节    slaves_port          从库端口
         4 字节    replication_rank     复制优先级
         4 字节    master_id            主库 ID (通常为 0)

Python 实现

"""
mysql_replication.py
MySQL 复制协议实现
"""
import socket
import struct
import hashlib


def encode_packet(payload: bytes, seq: int) -> bytes:
    return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload


def encode_length_string(s: str) -> bytes:
    encoded = s.encode('utf-8')
    if len(encoded) < 0xFB:
        return struct.pack('B', len(encoded)) + encoded
    elif len(encoded) < 0x10000:
        return b'\xFC' + struct.pack('<H', len(encoded)) + encoded


def send_register_slave(sock, server_id, hostname='', user='',
                        password='', port=3306, rank=0, master_id=0):
    """发送 COM_REGISTER_SLAVE"""
    payload = bytearray()
    payload.append(0x15)
    payload.extend(struct.pack('<I', server_id))
    payload.extend(encode_length_string(hostname))
    payload.extend(encode_length_string(user))
    payload.extend(encode_length_string(password))
    payload.extend(struct.pack('<H', port))
    payload.extend(struct.pack('<I', rank))
    payload.extend(struct.pack('<I', master_id))

    sock.send(encode_packet(bytes(payload), 0))
    response = read_response(sock)
    return response


def read_response(sock):
    """读取简单响应"""
    header = b''
    while len(header) < 4:
        header += sock.recv(4 - len(header))
    pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
    payload = sock.recv(pkt_len)
    return payload

10.3 COM_BINLOG_DUMP(0x12)

从库请求主库发送二进制日志事件。

命令格式

偏移量   大小      字段                  说明
────────────────────────────────────────────────
0        1 字节    0x12                 命令类型
1        4 字节    binlog_pos           起始位置
5        2 字节    flags                标志 (0x0001=非阻塞)
7        4 字节    server_id            从库服务器 ID
11       变长      binlog_filename      Binlog 文件名

Python 实现

def send_binlog_dump(sock, binlog_pos=4, server_id=2,
                     binlog_filename='mysql-bin.000001', flags=0):
    """发送 COM_BINLOG_DUMP"""
    payload = bytearray()
    payload.append(0x12)
    payload.extend(struct.pack('<I', binlog_pos))
    payload.extend(struct.pack('<H', flags))
    payload.extend(struct.pack('<I', server_id))
    payload.extend(binlog_filename.encode('utf-8'))

    sock.send(encode_packet(bytes(payload), 0))
    print(f"[+] 发送 COM_BINLOG_DUMP: file={binlog_filename}, pos={binlog_pos}")

COM_BINLOG_DUMP_GTID(0x1E)

MySQL 5.6 引入基于 GTID 的复制:

偏移量   大小      字段
────────────────────────────────────────
0        1 字节    0x1E
1        2 字节    flags
3        4 字节    server_id
7        4 字节    binlog_name_len
11       变长      binlog_filename
         8 字节    binlog_pos
         4 字节    data_size
         变长      UUID_SET (GTID 集合)

10.4 二进制日志事件格式

主库通过复制连接发送的每个**二进制日志事件(Binlog Event)**都有统一的格式。

事件头格式(19 字节,MySQL 5.0+)

偏移量   大小      字段                    说明
──────────────────────────────────────────────────────
0        4 字节    timestamp              事件时间戳 (Unix 时间)
4        1 字节    type_code              事件类型代码
5        4 字节    server_id              产生事件的服务器 ID
9        4 字节    event_length           事件总长度(含头和数据)
13       4 字节    next_position          下一个事件的位置
17       2 字节    flags                  事件标志

常见事件类型

类型码事件名称说明
0UNKNOWN未知事件
1START_EVENT_V3Binlog 开始 (旧版本)
2QUERY_EVENTSQL 语句事件
3STOP_EVENTBinlog 停止
4ROTATE_EVENTBinlog 文件切换
5INTVAR_EVENT自增值
7APPEND_BLOCK_EVENT数据块追加
8DELETE_FILE_EVENT文件删除
9RAND_EVENTRAND() 种子
10XID_EVENT事务提交
11TABLE_MAP_EVENT表定义映射
13WRITE_ROWS_EVENTv1行写入 (v1)
14UPDATE_ROWS_EVENTv1行更新 (v1)
15DELETE_ROWS_EVENTv1行删除 (v1)
16INCIDENT_EVENT异常事件
17HEARTBEAT_EVENT心跳事件
19WRITE_ROWS_EVENTv2行写入 (v2)
20UPDATE_ROWS_EVENTv2行更新 (v2)
21DELETE_ROWS_EVENTv2行删除 (v2)
33GTID_LOG_EVENTGTID 事件
34ANONYMOUS_GTID_LOG_EVENT匿名 GTID
35PREVIOUS_GTIDS_LOG_EVENT前序 GTID 集合

Python 事件解析

from dataclasses import dataclass
from typing import Optional
from enum import IntEnum


class BinlogEventType(IntEnum):
    UNKNOWN = 0
    START_EVENT_V3 = 1
    QUERY_EVENT = 2
    STOP_EVENT = 3
    ROTATE_EVENT = 4
    INTVAR_EVENT = 5
    TABLE_MAP_EVENT = 19  # 实际为 11, 此处用 v2
    WRITE_ROWS_EVENT = 23  # 实际为 19
    UPDATE_ROWS_EVENT = 24  # 实际为 20
    DELETE_ROWS_EVENT = 25  # 实际为 21
    XID_EVENT = 16  # 实际为 10
    GTID_LOG_EVENT = 33
    HEARTBEAT_EVENT = 27  # 实际为 17


@dataclass
class BinlogEventHeader:
    timestamp: int
    type_code: int
    server_id: int
    event_length: int
    next_position: int
    flags: int


@dataclass
class BinlogEvent:
    header: BinlogEventHeader
    data: bytes


def parse_event_header(data: bytes) -> BinlogEventHeader:
    """解析事件头(19 字节)"""
    if len(data) < 19:
        raise ValueError(f"事件头数据不足: {len(data)} < 19")

    return BinlogEventHeader(
        timestamp=struct.unpack('<I', data[0:4])[0],
        type_code=data[4],
        server_id=struct.unpack('<I', data[5:9])[0],
        event_length=struct.unpack('<I', data[9:13])[0],
        next_position=struct.unpack('<I', data[13:17])[0],
        flags=struct.unpack('<H', data[17:19])[0],
    )


def read_binlog_event(sock) -> Optional[BinlogEvent]:
    """从复制连接读取一个 Binlog 事件"""
    # 复制事件以普通 MySQL 数据包格式传输
    header = b''
    while len(header) < 4:
        chunk = sock.recv(4 - len(header))
        if not chunk:
            return None
        header += chunk

    pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
    seq = header[3]

    payload = b''
    while len(payload) < pkt_len:
        chunk = sock.recv(pkt_len - len(payload))
        if not chunk:
            return None
        payload += chunk

    # 检查是否是 ERR 包
    if payload[0] == 0xFF:
        error_code = struct.unpack('<H', payload[1:3])[0]
        message = payload[9:].decode('utf-8', errors='replace')
        print(f"[!] 错误: [{error_code}] {message}")
        return None

    # 解析事件头
    event_header = parse_event_header(payload)

    return BinlogEvent(
        header=event_header,
        data=payload,
    )


def parse_query_event(data: bytes) -> dict:
    """
    解析 QUERY_EVENT

    QUERY_EVENT 包含执行的 SQL 语句

    格式:
      4 bytes: thread_id
      4 bytes: exec_time
      1 byte:  db_name_length
      2 bytes: error_code
      2 bytes: status_vars_length
      变长:    status_vars
      变长:    db_name (null terminated)
      变长:    sql_statement
    """
    offset = 19  # 跳过事件头

    thread_id = struct.unpack('<I', data[offset:offset+4])[0]
    offset += 4

    exec_time = struct.unpack('<I', data[offset:offset+4])[0]
    offset += 4

    db_name_length = data[offset]
    offset += 1

    error_code = struct.unpack('<H', data[offset:offset+2])[0]
    offset += 2

    status_vars_length = struct.unpack('<H', data[offset:offset+2])[0]
    offset += 2

    # 跳过状态变量
    offset += status_vars_length

    # 数据库名 (null 结尾)
    db_name_end = data.index(b'\x00', offset)
    db_name = data[offset:db_name_end].decode('utf-8', errors='replace')
    offset = db_name_end + 1

    # SQL 语句 (剩余部分)
    sql = data[offset:].decode('utf-8', errors='replace')

    return {
        'thread_id': thread_id,
        'exec_time': exec_time,
        'db_name': db_name,
        'error_code': error_code,
        'sql': sql,
    }


def parse_rotate_event(data: bytes) -> dict:
    """
    解析 ROTATE_EVENT

    格式:
      8 bytes: position
      变长:    new_binlog_filename
    """
    offset = 19  # 跳过事件头
    position = struct.unpack('<Q', data[offset:offset+8])[0]
    offset += 8
    filename = data[offset:].decode('utf-8', errors='replace').rstrip('\x00')

    return {
        'position': position,
        'filename': filename,
    }


def parse_xid_event(data: bytes) -> dict:
    """
    解析 XID_EVENT (事务提交)

    格式:
      8 bytes: xid (事务 ID)
    """
    offset = 19
    xid = struct.unpack('<Q', data[offset:offset+8])[0]
    return {'xid': xid}


def parse_heartbeat_event(data: bytes) -> dict:
    """
    解析 HEARTBEAT_EVENT

    格式:
      4 bytes: timestamp
      8 bytes: current_binlog_pos
      2 bytes: flags
      变长:    server_uuid (16 bytes)
      变长:    binlog_filename
    """
    offset = 19
    return {'type': 'heartbeat'}


# 事件名称映射
EVENT_NAMES = {
    0: "UNKNOWN", 1: "START_V3", 2: "QUERY", 3: "STOP",
    4: "ROTATE", 5: "INTVAR", 7: "APPEND_BLOCK", 8: "DELETE_FILE",
    9: "RAND", 10: "XID", 11: "TABLE_MAP", 13: "WRITE_ROWS",
    14: "UPDATE_ROWS", 15: "DELETE_ROWS", 16: "INCIDENT",
    17: "HEARTBEAT", 19: "WRITE_ROWS_V2", 20: "UPDATE_ROWS_V2",
    21: "DELETE_ROWS_V2", 33: "GTID", 34: "ANONYMOUS_GTID",
    35: "PREVIOUS_GTIDS",
}

10.5 复制过滤

主库可以通过 binlog-do-dbbinlog-ignore-db 过滤事件,但这在事件产生时生效,而非在复制传输时过滤。


10.6 半同步复制

MySQL 5.5 引入半同步复制(Semi-Synchronous Replication),确保至少一个从库确认收到事件后才返回客户端。

协议差异

普通异步复制:
  Client → Master → OK (立即返回)
  Master → Slave (异步发送 Binlog)

半同步复制:
  Client → Master → 执行事务
  Master → Slave → 发送 Binlog
  Slave → Master → ACK (确认收到)
  Master → Client → OK (收到 ACK 后返回)

半同步复制使用额外的插件协议交互:

Master → Slave:  Binlog Event
Slave → Master:  Semi-sync ACK
  包格式: [1 字节标识 (0xE0)] + [8 字节 binlog_pos] + [binlog_filename]

10.7 GTID 复制

MySQL 5.6 引入全局事务标识符(GTID),每个事务有全局唯一标识。

GTID 格式

source_id:transaction_id
示例: 3E11FA47-71CA-11E1-9E33-C80AA9429562:23

GTID 事件

GTID_LOG_EVENT(类型码 33)格式:

偏移量   大小      字段
────────────────────────────────────────
19       1 字节    flags (commit flag)
20       16 字节   UUID (source_id)
36       8 字节    transaction_id (小端序)
44       4 字节    logical_clock (组提交)

10.8 心跳机制

主库定期发送 HEARTBEAT_EVENT(类型码 17)以保持连接活跃并通知从库当前位置。

def handle_heartbeat(event: BinlogEvent):
    """处理心跳事件"""
    offset = 19
    timestamp = struct.unpack('<I', event.data[offset:offset+4])[0]
    binlog_pos = struct.unpack('<Q', event.data[offset+4:offset+12])[0]
    print(f"[心跳] timestamp={timestamp}, pos={binlog_pos}")

10.9 完整的从库复制客户端

"""
mysql_replica_client.py
简化版 MySQL 从库客户端
"""


def connect_as_replica(host, port, user, password, server_id,
                       binlog_file, binlog_pos):
    """连接到主库并接收 Binlog 事件"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))

    # 握手认证
    handshake = read_response(sock)
    print(f"[+] 收到 HandshakeV10")

    # 发送认证(需要有 REPLICATION SLAVE 权限的用户)
    auth_response = build_auth_response(handshake, user, password)
    sock.send(encode_packet(auth_response, 1))
    auth_result = read_response(sock)

    if auth_result[0] != 0x00:
        print("[!] 认证失败")
        return
    print("[+] 认证成功")

    # 注册从库
    send_register_slave(sock, server_id)
    print("[+] 注册从库成功")

    # 请求 Binlog
    send_binlog_dump(sock, binlog_pos, server_id, binlog_file)
    print(f"[+] 请求 Binlog: {binlog_file}:{binlog_pos}")

    # 接收事件
    event_count = 0
    while True:
        event = read_binlog_event(sock)
        if event is None:
            print("[!] 连接断开")
            break

        event_count += 1
        event_name = EVENT_NAMES.get(event.header.type_code,
                                      f"UNKNOWN({event.header.type_code})")

        print(f"  事件 #{event_count}: {event_name} "
              f"ts={event.header.timestamp} "
              f"len={event.header.event_length} "
              f"pos={event.header.next_position}")

        # 解析特定事件类型
        if event.header.type_code == 2:  # QUERY_EVENT
            info = parse_query_event(event.data)
            print(f"    SQL: {info['sql'][:80]}")
        elif event.header.type_code == 4:  # ROTATE_EVENT
            info = parse_rotate_event(event.data)
            print(f"    轮转到: {info['filename']}:{info['position']}")
        elif event.header.type_code == 10:  # XID_EVENT
            info = parse_xid_event(event.data)
            print(f"    事务提交: XID={info['xid']}")
        elif event.header.type_code == 17:  # HEARTBEAT_EVENT
            print("    [心跳]")

    sock.close()


if __name__ == '__main__':
    connect_as_replica(
        host='127.0.0.1',
        port=3306,
        user='repl_user',
        password='repl_pass',
        server_id=100,
        binlog_file='mysql-bin.000001',
        binlog_pos=4
    )

10.10 注意事项

重要提醒

  1. 复制用户权限:从库连接需要 REPLICATION SLAVEREPLICATION CLIENT 权限。

  2. server_id 唯一性:复制拓扑中的每个服务器必须有唯一的 server_id

  3. Binlog 格式:推荐使用 ROW 格式(binlog_format=ROW),避免 STATEMENT 格式的不确定性。

  4. 网络稳定性:复制对网络稳定性敏感,短暂的网络中断可能导致复制延迟或中断。

  5. GTID 限制:GTID 不支持 CREATE TABLE ... SELECTCREATE TEMPORARY TABLE 在事务中的操作。

  6. 半同步降级:半同步复制在超时后会自动降级为异步复制,可能导致数据丢失。


10.11 业务场景

场景一:数据库迁移

使用复制协议将数据从旧主库迁移到新主库,实现零停机迁移。

场景二:数据变更捕获(CDC)

通过解析 Binlog 事件实现实时数据变更捕获,常见的 CDC 工具包括:

  • Debezium:基于 Kafka Connect 的 CDC 平台
  • Canal:阿里巴巴开源的 Binlog 增量订阅组件
  • Maxwell:将 Binlog 转为 JSON 的守护进程

场景二的代码示例

# 使用复制协议实现简单的 CDC
def capture_changes(host, port, user, password, tables):
    """捕获指定表的数据变更"""
    connect_as_replica(host, port, user, password,
                       server_id=999,
                       binlog_file='mysql-bin.000001',
                       binlog_pos=4)
    # 在事件循环中过滤 TABLE_MAP_EVENT 匹配目标表
    # 解析 WRITE/UPDATE/DELETE_ROWS_EVENT 获取变更数据

场景三:延迟从库

通过设置 relay_log_recovery 和延迟 SQL 线程,实现数据回滚的安全网。


10.12 扩展阅读


上一章09 - 结果集详解 下一章11 - 代理与中间件 —— 深入了解如何实现 MySQL 代理和中间件。