强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 03 - 认证机制

第 03 章:认证机制

3.1 MySQL 认证架构

MySQL 使用可插拔认证架构(Pluggable Authentication),允许不同的用户使用不同的认证方式。认证插件在握手阶段由服务器指定,客户端根据指定的插件计算认证响应。

认证流程总览

Client                                      Server
  │                                            │
  │ ←── HandshakeV10 ────────────────────── │
  │      auth_plugin: "caching_sha2_password"  │
  │      scramble: [32 字节随机数]              │
  │                                            │
  │ ──── HandshakeResponse41 ──────────────→ │
  │      auth_data: [加密后的认证数据]          │
  │                                            │
  │  ┌─────── 认证缓存命中? ────────────┐     │
  │  │  是 → OK_Packet                   │     │
  │  │  否 → AuthMoreData + RSA 公钥交换 │     │
  │  └───────────────────────────────────┘     │
  │                                            │
  │ ←── OK_Packet 或 ERR_Packet ────────── │

支持的认证插件

插件名称MySQL 版本默认版本安全性
mysql_native_password4.1+5.7 及之前中等(SHA1)
sha256_password5.6+-高(SHA256 + RSA)
caching_sha2_password8.0+8.0高(SHA256 + 缓存 + RSA)
auth_socket5.5+-高(OS 用户验证)
authentication_ldap_simple5.5+-取决于 LDAP
authentication_windows5.5+-高(Windows 域认证)
mysql_clear_password5.5+-低(明文,需 TLS)
-- 查看当前支持的认证插件
SELECT PLUGIN_NAME, PLUGIN_STATUS
FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_TYPE = 'AUTHENTICATION';

-- 查看用户的认证方式
SELECT user, host, plugin FROM mysql.user;

3.2 mysql_native_password

这是 MySQL 最经典的认证插件,从 4.1 版本开始引入。

认证算法

客户端计算:
  hash1 = SHA1(password)
  hash2 = SHA1(scramble + SHA1(hash1))
  response = hash1 XOR hash2

服务端验证:
  stored_hash = SHA1(SHA1(password))  -- 存储在 mysql.user 表中
  expected = SHA1(scramble + stored_hash)
  验证: SHA1(response) == expected

详细步骤

  1. 服务器存储mysql.user 表的 authentication_string 字段):
存储值 = HEX(SHA1(SHA1("password")))
示例: 对于密码 "root"
  SHA1("root") = dc76e9f0c0006e8f919e0c51efc6f2030ffd32ec
  SHA1(SHA1("root")) = 66cd9b2dfba670265aa8760ca0972aba44173971
  存储值 = "66CD9B2DFBA670265AA8760CA0972ABA44173971"
  1. 认证过程
"""
mysql_native_password 完整认证实现
"""
import hashlib
import struct

def sha1_binary(data: bytes) -> bytes:
    """计算 SHA1 哈希,返回 20 字节二进制"""
    return hashlib.sha1(data).digest()

def sha1_hex(data: bytes) -> str:
    """计算 SHA1 哈希,返回 40 字符十六进制"""
    return hashlib.sha1(data).hexdigest().upper()

def mysql_native_password_auth(password: str, scramble: bytes) -> bytes:
    """
    mysql_native_password 客户端认证计算
    输入:
      password  - 明文密码
      scramble  - 服务器发送的 20 字节随机数 (取前 20 字节)
    输出:
      20 字节的认证响应
    """
    if not password:
        return b''

    # Step 1: SHA1(password)
    stage1 = sha1_binary(password.encode('utf-8'))

    # Step 2: SHA1(SHA1(password))
    stage2 = sha1_binary(stage1)

    # Step 3: SHA1(scramble + SHA1(SHA1(password)))
    stage3 = sha1_binary(scramble + stage2)

    # Step 4: XOR(SHA1(password), SHA1(scramble + SHA1(SHA1(password))))
    result = bytes(a ^ b for a, b in zip(stage1, stage3))

    return result  # 20 字节


def mysql_native_password_verify(password: str, scramble: bytes, response: bytes) -> bool:
    """
    服务器端验证
    输入:
      password  - 明文密码 (用于演示; 实际服务器存储的是 hash)
      scramble  - 发送给客户端的随机数
      response  - 客户端返回的认证响应
    """
    # 服务器端的验证逻辑 (使用存储的 hash)
    stored_hash = sha1_binary(sha1_binary(password.encode('utf-8')))

    # 预期的客户端响应
    expected_stage3 = sha1_binary(scramble + stored_hash)
    expected_stage1 = sha1_binary(password.encode('utf-8'))
    expected_response = bytes(a ^ b for a, b in zip(expected_stage1, expected_stage3))

    # 验证方式: SHA1(response) 应该等于 SHA1(expected_response)
    # 实际上服务器更高效地比较: SHA1(response) == SHA1(scramble + stored_hash)
    return sha1_binary(response) == expected_stage3


# 完整演示
def demo():
    password = "MySecurePass123!"
    scramble = bytes.fromhex("4a5b6c7d8e9fa0b1c2d3e4f5a6b7c8d9e0f1a2b3")

    print(f"密码: {password}")
    print(f"Scramble: {scramble.hex()}")
    print()

    # 客户端计算
    response = mysql_native_password_auth(password, scramble)
    print(f"认证响应: {response.hex()}")

    # 服务器验证
    is_valid = mysql_native_password_verify(password, scramble, response)
    print(f"验证结果: {'✓ 通过' if is_valid else '✗ 失败'}")

    # 错误密码测试
    wrong_response = mysql_native_password_auth("WrongPassword", scramble)
    is_valid_wrong = mysql_native_password_verify("WrongPassword", scramble, wrong_response)
    print(f"错误密码验证: {'✓ 通过' if is_valid_wrong else '✗ 失败'}")

    # 交叉验证(错误密码的响应不应该通过正确密码的验证)
    is_cross = mysql_native_password_verify(password, scramble, wrong_response)
    print(f"交叉验证: {'✓ 通过' if is_cross else '✗ 失败(符合预期)'}")


if __name__ == '__main__':
    demo()

安全性分析

方面评估
密码传输不传输明文,使用挑战-应答
哈希强度SHA1,已被认为不够安全
中间人攻击不防中间人(无双向认证)
彩虹表攻击存储的是 double-SHA1,略有防护
暴力破解如果数据库被拖库,SHA1 容易被暴力破解

3.3 caching_sha2_password

MySQL 8.0 的默认认证插件,在安全性与性能之间取得了良好平衡。

认证算法

客户端计算:
  hash1 = SHA256(password)
  hash2 = SHA256(SHA256(hash1))
  response = SHA256(hash1) XOR SHA256(scramble + hash2)

服务端验证:
  stored_hash = SHA256(SHA256(password))  -- 存储在 mysql.user 表
  验证: 对比计算结果

两种认证模式

caching_sha2_password 有两种工作模式,取决于服务器是否缓存了认证结果:

模式一:快速认证(Fast Auth)— 缓存命中

Client → Server:  认证响应 (SHA256 挑战-应答)
Server → Client:  OK_Packet  (缓存命中, 1 个 RTT)

模式二:完整认证(Full Auth)— 缓存未命中

Client → Server:  认证响应 (SHA256 挑战-应答)
Server → Client:  AuthMoreData + 快速认证失败标志
Client → Server:  请求 RSA 公钥 (或使用本地公钥)
Server → Client:  RSA 公钥
Client → Server:  RSA(password XOR SHA256(hash), 公钥加密)
Server → Client:  OK_Packet 或 ERR_Packet

Python 完整实现

"""
caching_sha2_password 完整认证实现
"""
import hashlib
import os
import struct

try:
    from cryptography.hazmat.primitives.asymmetric import rsa, padding
    from cryptography.hazmat.primitives import hashes, serialization
    HAS_CRYPTO = True
except ImportError:
    HAS_CRYPTO = False
    print("[!] 请安装 cryptography: pip install cryptography")


def sha256_binary(data: bytes) -> bytes:
    return hashlib.sha256(data).digest()


def xor_bytes(a: bytes, b: bytes) -> bytes:
    """两个等长字节序列的异或"""
    return bytes(x ^ y for x, y in zip(a, b))


def caching_sha2_password_fast_auth(password: str, scramble: bytes) -> bytes:
    """
    caching_sha2_password 快速认证(挑战-应答模式)
    这是客户端发送的第一轮认证数据

    算法:
      hash1 = SHA256(password)
      hash2 = SHA256(hash1)
      hash3 = SHA256(hash2 + scramble)
      response = hash1 XOR hash3

    注意: 响应长度为 32 字节 (SHA256 输出长度)
    """
    if not password:
        return b''

    hash1 = sha256_binary(password.encode('utf-8'))       # SHA256(password)
    hash2 = sha256_binary(hash1)                           # SHA256(SHA256(password))
    hash3 = sha256_binary(hash2 + scramble[:])             # SHA256(SHA256(SHA256(password)) + scramble)
    response = xor_bytes(hash1, hash3)                     # XOR

    return response  # 32 字节


def caching_sha2_password_full_auth(password: str, rsa_public_key_pem: bytes) -> bytes:
    """
    caching_sha2_password 完整认证(RSA 加密模式)
    当快速认证失败(缓存未命中)时使用

    算法:
      xor_key = SHA256(password)
      encrypted = RSA_OAEP_Encrypt(xor_key XOR password_bytes, rsa_public_key)
    """
    if not HAS_CRYPTO:
        raise RuntimeError("需要 cryptography 库")

    # XOR 密码与 SHA256(password)
    password_bytes = password.encode('utf-8')
    xor_key = sha256_binary(password_bytes)

    # 对齐到 8 字节边界
    padded = password_bytes + b'\x00' * (8 - len(password_bytes) % 8) if len(password_bytes) % 8 != 0 else password_bytes

    # XOR 操作(循环使用 xor_key)
    xor_result = bytearray(len(padded))
    for i in range(len(padded)):
        xor_result[i] = padded[i] ^ xor_key[i % len(xor_key)]

    # RSA OAEP 加密
    public_key = serialization.load_pem_public_key(rsa_public_key_pem)
    encrypted = public_key.encrypt(
        bytes(xor_result),
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

    return encrypted


def simulate_full_auth_flow():
    """模拟完整的认证流程(含缓存未命中场景)"""
    print("=" * 60)
    print("模拟 caching_sha2_password 完整认证流程")
    print("=" * 60)

    password = "MySecurePass123!"
    scramble = os.urandom(20)  # 20 字节随机数

    print(f"\n密码: {password}")
    print(f"Scramble: {scramble.hex()}")

    # === 第一步:快速认证尝试 ===
    print("\n--- 第一步:快速认证 ---")
    fast_response = caching_sha2_password_fast_auth(password, scramble)
    print(f"快速认证响应: {fast_response.hex()}")
    print(f"响应长度: {len(fast_response)} 字节")

    # 假设缓存未命中
    print("\n[服务器] 缓存未命中, 请求完整认证...")

    # === 第二步:完整认证 ===
    print("\n--- 第二步:RSA 公钥交换 ---")

    if HAS_CRYPTO:
        # 生成 RSA 密钥对(演示用)
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        public_key = private_key.public_key()

        # 导出公钥 PEM
        public_key_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        print(f"RSA 公钥 (PEM, {len(public_key_pem)} 字节):")
        print(public_key_pem.decode()[:200] + "...")

        # 客户端使用公钥加密
        encrypted = caching_sha2_password_full_auth(password, public_key_pem)
        print(f"加密后的认证数据: {encrypted[:32].hex()}...")
        print(f"加密数据长度: {len(encrypted)} 字节")

        # 服务端使用私钥解密
        decrypted = private_key.decrypt(
            encrypted,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

        # 提取密码(去掉 padding)
        decrypted_password = decrypted.rstrip(b'\x00')
        print(f"\n[服务器] 解密得到: {decrypted_password}")
        print(f"验证: {'✓ 成功' if decrypted_password == password.encode() else '✗ 失败'}")
    else:
        print("[!] 跳过 RSA 演示 (需要 cryptography 库)")


def compare_auth_methods():
    """对比两种认证方法"""
    print("\n" + "=" * 60)
    print("认证方法对比")
    print("=" * 60)

    password = "TestPassword!"
    scramble = os.urandom(20)

    # mysql_native_password
    import hashlib
    h1 = hashlib.sha1(password.encode()).digest()
    h2 = hashlib.sha1(h1).digest()
    h3 = hashlib.sha1(scramble + h2).digest()
    native_response = bytes(a ^ b for a, b in zip(h1, h3))

    # caching_sha2_password
    sha2_response = caching_sha2_password_fast_auth(password, scramble)

    print(f"\n密码: {password}")
    print(f"Scramble: {scramble.hex()}")
    print(f"\nmysql_native_password:")
    print(f"  响应长度: {len(native_response)} 字节")
    print(f"  哈希算法: SHA1")
    print(f"  响应值: {native_response.hex()}")
    print(f"\ncaching_sha2_password:")
    print(f"  响应长度: {len(sha2_response)} 字节")
    print(f"  哈希算法: SHA256")
    print(f"  响应值: {sha2_response.hex()}")


if __name__ == '__main__':
    compare_auth_methods()
    simulate_full_auth_flow()

认证缓存机制

caching_sha2_password 的缓存存储在内存中,由以下参数控制:

-- 查看缓存大小
SHOW VARIABLES LIKE 'caching_sha2_password%';
-- caching_sha2_password_auto_generate_rsa_keys  ON
-- caching_sha2_password_private_key_path        private_key.pem
-- caching_sha2_password_public_key_path         public_key.pem
-- caching_sha2_password_rsa_key_size            2048

-- 手动刷新缓存(通常在密码变更后)
FLUSH PRIVILEGES;

缓存何时被清除:

  • 用户密码被修改
  • 用户被删除
  • FLUSH PRIVILEGES 执行
  • 服务器重启

3.4 sha256_password

MySQL 5.6 引入的 SHA256 认证插件,是 caching_sha2_password 的前身。

与 caching_sha2_password 的区别

特性sha256_passwordcaching_sha2_password
引入版本MySQL 5.6MySQL 8.0
快速认证不支持支持(缓存)
每次连接都需要 RSA仅缓存未命中时
性能较慢较快
存储格式SHA256(SHA256(password)) + saltSHA256(SHA256(password))

认证流程

Client → Server:  初始认证响应 (可能为空或 "mysql_native_password" 格式)
Server → Client:  AuthMoreData + 0x04 (请求公钥) 或 RSA 公钥
Client → Server:  RSA 加密的密码
Server → Client:  OK / ERR

3.5 auth_socket

auth_socket 插件通过操作系统用户身份进行认证,不使用密码

工作原理

1. 客户端通过 Unix socket 连接 MySQL
2. MySQL 获取 socket 连接的 UID
3. 将 UID 映射为操作系统用户名
4. 验证该用户名是否匹配 MySQL 用户名

配置示例

-- 创建 socket 认证用户
CREATE USER 'dbadmin'@'localhost' IDENTIFIED WITH auth_socket;

-- 允许 dbadmin 用户以 root 身份连接
CREATE USER 'root'@'localhost' IDENTIFIED WITH auth_socket AS 'root';
# 只有 OS root 用户可以以 root 身份连接 MySQL
sudo mysql -u root

# OS 用户 dbadmin 可以连接
mysql -u dbadmin   # 使用 dbadmin 的 OS 身份

适用场景

  • 本地系统管理脚本
  • 无需暴露密码的自动化运维
  • 安全敏感的特权账户管理

3.6 其他认证插件

LDAP 认证

-- 安装 LDAP 认证插件
INSTALL PLUGIN authentication_ldap_simple SONAME 'authentication_ldap_simple.so';

-- 配置 LDAP 服务器
SET GLOBAL authentication_ldap_simple_server_host = 'ldap.example.com';
SET GLOBAL authentication_ldap_simple_server_port = 389;
SET GLOBAL authentication_ldap_simple_bind_base_dn = 'ou=users,dc=example,dc=com';

-- 创建 LDAP 认证用户
CREATE USER 'ldapuser'@'%'
  IDENTIFIED WITH authentication_ldap_simple
  AS 'uid=ldapuser,ou=users,dc=example,dc=com';

PAM 认证

-- 安装 PAM 认证插件
INSTALL PLUGIN authentication_pam SONAME 'authentication_pam.so';

-- 创建 PAM 认证用户
CREATE USER 'pamuser'@'%'
  IDENTIFIED WITH authentication_pam
  AS 'mysql';  -- 'mysql' 是 PAM 服务名

3.7 认证协议消息格式

AuthMoreData 包

当服务器需要发送额外的认证数据时使用:

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0xFE (标识 AuthMoreData)
1          变长      认证数据

不同认证插件使用不同的 AuthMoreData 格式:

插件数据内容
caching_sha2_password0x01 (快速认证失败标志) 或 RSA 公钥
sha256_password0x04 (请求公钥) 或 RSA 公钥
mysql_native_password不使用

RSA 公钥传输

当客户端没有预配置 RSA 公钥时,可以从服务器获取:

def handle_auth_more_data(data, password, scramble):
    """处理 AuthMoreData 响应"""
    status = data[0]

    if status == 0x01:
        # caching_sha2_password: 快速认证失败
        # 需要进行 RSA 完整认证
        print("[*] 快速认证失败,需要 RSA 公钥")

        # 发送公钥请求
        request_public_key = b'\x02'  # 特殊命令
        return request_public_key

    elif status == 0x04:
        # sha256_password: 服务器要求 RSA 加密
        print("[*] 需要 RSA 加密密码")

    else:
        # 后续字节是 RSA 公钥
        public_key_pem = data  # 整个数据是公钥
        print(f"[*] 收到 RSA 公钥 ({len(public_key_pem)} 字节)")
        # 使用公钥加密密码...

3.8 认证方式切换与兼容性

切换认证插件

-- 将用户切换到 mysql_native_password(兼容旧客户端)
ALTER USER 'app_user'@'%'
  IDENTIFIED WITH mysql_native_password BY 'password123';

-- 将用户切换到 caching_sha2_password(推荐)
ALTER USER 'app_user'@'%'
  IDENTIFIED WITH caching_sha2_password BY 'password123';

-- 修改默认认证插件(全局)
-- 在 my.cnf 中设置:
-- [mysqld]
-- default_authentication_plugin = mysql_native_password

客户端兼容性矩阵

客户端驱动mysql_native_passwordcaching_sha2_passwordsha256_password
MySQL Connector/Python 8.0+
PyMySQL 1.0+
mysql-connector-java 8.0+
Go go-sql-driver 1.5+
Node.js mysql2
PHP mysqlnd (PHP 7.4+)
PHP mysqlnd (PHP < 7.4)
旧版客户端/驱动

迁移建议:升级 MySQL 到 8.0 之前,先升级所有客户端驱动以支持 caching_sha2_password,或在服务器端为特定用户保留 mysql_native_password


3.9 安全最佳实践

密码策略配置

-- 安装密码验证组件
INSTALL COMPONENT 'file://component_validate_password';

-- 配置密码策略
SET GLOBAL validate_password.policy = MEDIUM;
SET GLOBAL validate_password.length = 12;
SET GLOBAL validate_password.mixed_case_count = 1;
SET GLOBAL validate_password.number_count = 1;
SET GLOBAL validate_password.special_char_count = 1;

TLS 强制

-- 要求用户使用 TLS 连接
ALTER USER 'app_user'@'%' REQUIRE SSL;

-- 要求 TLS 版本
ALTER USER 'app_user'@'%' REQUIRE X509;

-- 全局 TLS 配置
-- my.cnf:
-- [mysqld]
-- require_secure_transport = ON
-- tls_version = TLSv1.2,TLSv1.3

认证安全清单

检查项建议
认证插件生产环境使用 caching_sha2_password
密码强度启用 validate_password 组件
密码轮换设置 default_password_lifetime
连接加密强制 SSL/TLS
公钥交换预配置 RSA 公钥(禁用 allowPublicKeyRetrieval
权限最小化按需授权,避免使用 GRANT ALL
远程 root禁止 root 远程登录
匿名用户删除所有匿名用户
-- 安全加固脚本
DELETE FROM mysql.user WHERE User = '';
DELETE FROM mysql.user WHERE User = 'root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
FLUSH PRIVILEGES;

3.10 业务场景

场景一:应用升级后连接失败

问题:MySQL 升级到 8.0 后,应用报错 Authentication plugin 'caching_sha2_password' cannot be loaded

分析:旧的客户端驱动不支持 caching_sha2_password 插件。

解决方案

-- 临时方案:将用户切换回旧的认证方式
ALTER USER 'app'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

-- 长期方案:升级客户端驱动

场景二:密码安全性审计

通过查询 mysql.user 表审计所有用户的认证配置:

SELECT
    User,
    Host,
    plugin AS '认证插件',
    authentication_string AS '认证数据',
    password_expired AS '密码过期',
    password_lifetime AS '密码有效期',
    account_locked AS '账户锁定',
    CASE
        WHEN plugin = 'mysql_native_password' THEN '⚠ 建议升级到 caching_sha2_password'
        WHEN plugin = 'caching_sha2_password' THEN '✓ 推荐'
        WHEN plugin = 'auth_socket' THEN '✓ 本地安全'
        ELSE '?'
    END AS '安全评估'
FROM mysql.user
WHERE User NOT LIKE 'mysql.%'
ORDER BY User, Host;

场景三:多因素认证

MySQL Enterprise 支持多因素认证(MFA),协议层面通过多次认证交换实现:

Client → Server:  HandshakeResponse (第一因素)
Server → Client:  AuthMoreData (要求第二因素)
Client → Server:  AuthResponse (第二因素)
Server → Client:  OK / ERR

3.11 扩展阅读


上一章02 - 握手过程 下一章04 - 数据包格式 —— 深入理解 MySQL 数据包的字节级结构。