05 - 命令与请求
第 05 章:命令与请求
5.1 命令概述
握手认证完成后,客户端与服务器进入命令阶段(Command Phase)。客户端发送命令包,服务器返回响应包。
命令包格式
┌────────── 3 字节包长度 ──────────┐
│ 1 字节序列号 │
├──────────────────────────────────┤
│ 1 字节命令类型 (COM_xxx) │
│ 变长命令参数 │
└──────────────────────────────────┘
第一个字节标识命令类型,后续字节是命令的参数。
完整命令列表
| 命令 | 值 | 说明 | 引入版本 |
|---|---|---|---|
COM_SLEEP | 0x00 | 无操作(内部使用) | 3.x |
COM_QUIT | 0x01 | 关闭连接 | 3.x |
COM_INIT_DB | 0x02 | 切换数据库 | 3.x |
COM_QUERY | 0x03 | 执行 SQL 查询 | 3.x |
COM_FIELD_LIST | 0x04 | 获取字段信息(已废弃) | 3.x |
COM_REFRESH | 0x07 | 刷新操作 | 3.x |
COM_STATISTICS | 0x08 | 获取服务器统计信息 | 3.x |
COM_PROCESS_INFO | 0x0A | 获取进程列表(已废弃) | 3.x |
COM_PROCESS_KILL | 0x0C | 终止连接(已废弃,用 KILL SQL) | 3.x |
COM_DEBUG | 0x0D | 触发服务器 dump | 3.x |
COM_PING | 0x0E | 心跳检测 | 4.0 |
COM_CHANGE_USER | 0x11 | 更改用户 | 4.1 |
COM_RESET_CONNECTION | 0x1F | 重置连接状态 | 5.7 |
COM_SET_OPTION | 0x1B | 设置选项 | 4.1 |
COM_STMT_PREPARE | 0x16 | 预处理 SQL 语句 | 4.1 |
COM_STMT_EXECUTE | 0x17 | 执行预处理语句 | 4.1 |
COM_STMT_SEND_LONG_DATA | 0x18 | 发送长参数数据 | 4.1 |
COM_STMT_CLOSE | 0x19 | 关闭预处理语句 | 4.1 |
COM_STMT_RESET | 0x1A | 重置预处理语句 | 4.1 |
COM_STMT_FETCH | 0x1C | 获取预处理语句游标数据 | 5.0 |
5.2 COM_QUERY(0x03)
COM_QUERY 是最常用的命令,用于执行任意 SQL 语句。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x03 (命令类型)
1 变长 SQL 语句 (UTF-8 编码)
Python 实现
"""
mysql_com_query.py
发送 COM_QUERY 命令并解析响应
"""
import socket
import struct
def encode_packet(payload: bytes, seq: int) -> bytes:
return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload
def decode_packet(data: bytes):
if len(data) < 4:
return None, None, data
pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
seq = data[3]
if len(data) < 4 + pkt_len:
return None, None, data
payload = data[4:4 + pkt_len]
return payload, seq, data[4 + pkt_len:]
def send_com_query(sock: socket.socket, sql: str, seq: int = 0):
"""发送 COM_QUERY 命令"""
payload = b'\x03' + sql.encode('utf-8')
packet = encode_packet(payload, seq)
sock.send(packet)
def read_response(sock: socket.socket) -> dict:
"""读取并解析服务器响应"""
data = b''
while True:
chunk = sock.recv(4096)
if not chunk:
break
data += chunk
# 尝试解析第一个包来确定类型
if len(data) >= 4:
pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
if len(data) >= 4 + pkt_len:
break
payload, seq, remaining = decode_packet(data)
if payload is None:
return {'type': 'INCOMPLETE'}
first_byte = payload[0]
if first_byte == 0x00:
return parse_ok_packet(payload)
elif first_byte == 0xFF:
return parse_err_packet(payload)
elif first_byte == 0xFB:
return {'type': 'LOCAL_INFILE'}
else:
return parse_result_set(data)
def parse_ok_packet(payload: bytes) -> dict:
"""解析 OK 包"""
offset = 1 # 跳过 0x00 标识
affected_rows, offset = read_length_encoded_int(payload, offset)
last_insert_id, offset = read_length_encoded_int(payload, offset)
status_flags = struct.unpack('<H', payload[offset:offset+2])[0]
offset += 2
warnings = struct.unpack('<H', payload[offset:offset+2])[0]
return {
'type': 'OK',
'affected_rows': affected_rows,
'last_insert_id': last_insert_id,
'status_flags': status_flags,
'warnings': warnings,
}
def parse_err_packet(payload: bytes) -> dict:
"""解析 ERR 包"""
error_code = struct.unpack('<H', payload[1:3])[0]
# 检查 SQL state 标记
sql_state_marker = ''
sql_state = ''
message = ''
if len(payload) > 3:
if payload[3:4] == b'#':
sql_state = payload[4:9].decode('ascii', errors='replace')
message = payload[9:].decode('utf-8', errors='replace')
else:
message = payload[3:].decode('utf-8', errors='replace')
return {
'type': 'ERR',
'error_code': error_code,
'sql_state': sql_state,
'message': message,
}
def parse_result_set(data: bytes) -> dict:
"""解析结果集(简化版)"""
# 后续章节详解
return {'type': 'RESULT_SET', 'raw': data[:100].hex()}
def read_length_encoded_int(data: bytes, offset: int) -> tuple:
first = data[offset]
if first < 0xFB:
return first, offset + 1
elif first == 0xFC:
return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
elif first == 0xFD:
return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
elif first == 0xFE:
return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9
# 完整示例:连接、认证、执行查询
def full_example():
"""完整的连接-认证-查询流程"""
# 注意:此处省略了完整的认证流程
# 实际使用请参考 PyMySQL 或其他驱动
print("COM_QUERY 命令格式演示:")
print()
# 模拟 COM_QUERY 数据包
sql = "SELECT id, name FROM users WHERE age > 18"
payload = b'\x03' + sql.encode('utf-8')
packet = encode_packet(payload, seq=0)
print(f"SQL: {sql}")
print(f"命令字节: 0x{payload[0]:02X}")
print(f"完整 payload: {payload.hex()}")
print(f"数据包大小: {len(packet)} 字节")
print(f"数据包头: {packet[:4].hex()}")
print()
# 解析
print("解析 COM_QUERY 数据包:")
pkt_payload, pkt_seq, _ = decode_packet(packet)
command = pkt_payload[0]
sql_text = pkt_payload[1:].decode('utf-8')
print(f" 命令类型: COM_QUERY (0x{command:02X})")
print(f" 序列号: {pkt_seq}")
print(f" SQL: {sql_text}")
if __name__ == '__main__':
full_example()
COM_QUERY 的响应类型
| 响应 | 首字节 | 说明 |
|---|---|---|
| OK | 0x00 | DDL/DML 成功(INSERT、UPDATE、DELETE 等) |
| ERR | 0xFF | 执行出错 |
| Result Set | 列数 | SELECT、SHOW、DESCRIBE 等返回结果集的语句 |
| LOCAL_INFILE | 0xFB | LOAD DATA LOCAL 请求 |
5.3 COM_INIT_DB(0x02)
切换当前数据库,等价于 SQL 语句 USE database_name。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x02
1 变长 数据库名 (不带 null 终止符)
Python 实现
def send_com_init_db(sock, database_name, seq=0):
"""发送 COM_INIT_DB 命令"""
payload = b'\x02' + database_name.encode('utf-8')
packet = encode_packet(payload, seq)
sock.send(packet)
return read_response(sock)
5.4 COM_PING(0x0E)
心跳检测,不带任何参数。服务器返回 OK 包。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x0E
Python 实现
def send_com_ping(sock, seq=0):
"""发送 COM_PING 命令"""
payload = b'\x0E'
packet = encode_packet(payload, seq)
sock.send(packet)
return read_response(sock)
# 连接池心跳检测示例
def check_connection_alive(sock, timeout=2):
"""检查连接是否存活"""
import select
sock.setblocking(False)
try:
send_com_ping(sock)
ready = select.select([sock], [], [], timeout)
if ready[0]:
response = read_response(sock)
return response.get('type') == 'OK'
return False
except Exception:
return False
finally:
sock.setblocking(True)
5.5 COM_QUIT(0x01)
关闭连接,服务器收到后关闭连接。不返回响应。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x01
def send_com_quit(sock, seq=0):
"""发送 COM_QUIT 命令"""
payload = b'\x01'
packet = encode_packet(payload, seq)
sock.send(packet)
# COM_QUIT 不需要等待响应
5.6 COM_STMT_PREPARE(0x16)
预处理 SQL 语句,返回语句句柄和参数/列元数据。详细内容见第 08 章。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x16
1 变长 SQL 语句 (含 ? 占位符)
响应格式
成功时返回 COM_STMT_PREPARE Response:
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x00 (OK 标识)
1 4 字节 statement_id (语句句柄)
5 2 字节 num_columns (列数)
7 2 字节 num_params (参数数)
9 1 字节 0x00 (填充)
10 2 字节 warnings (如果有)
后续 变长 参数定义 + 列定义
def send_com_stmt_prepare(sock, sql, seq=0):
"""发送 COM_STMT_PREPARE 命令"""
payload = b'\x16' + sql.encode('utf-8')
packet = encode_packet(payload, seq)
sock.send(packet)
return read_response(sock)
# 示例
# response = send_com_stmt_prepare(sock, "SELECT * FROM users WHERE id = ? AND name = ?")
# statement_id = response['statement_id'] # 4 字节句柄
# num_params = 2 # 2 个参数
# num_columns = ... # 返回的列数
5.7 COM_STMT_EXECUTE(0x17)
执行已预处理的语句,绑定参数值。详细内容见第 07、08 章。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x17
1 4 字节 statement_id
5 1 字节 flags (cursor type 等)
6 4 字节 iteration_count (固定为 1)
10 变长 null_bitmap
变长 new_params_bound_flag
变长 参数类型 + 参数值
def send_com_stmt_execute(sock, statement_id, params=None, seq=0):
"""发送 COM_STMT_EXECUTE 命令"""
payload = bytearray()
payload.append(0x17) # 命令类型
payload.extend(struct.pack('<I', statement_id)) # 语句 ID
payload.append(0x00) # flags: CURSOR_TYPE_NO_CURSOR
payload.extend(struct.pack('<I', 1)) # iteration_count
if params:
num_params = len(params)
# null_bitmap: 每个参数占 1 bit
null_bitmap_size = (num_params + 7) // 8
null_bitmap = bytearray(null_bitmap_size)
for i, param in enumerate(params):
if param is None:
null_bitmap[i // 8] |= (1 << (i % 8))
payload.extend(null_bitmap)
payload.append(0x01) # new_params_bound_flag: 发送新参数
# 参数类型和值
for param in params:
if param is None:
continue
if isinstance(param, int):
payload.extend(struct.pack('<B', 0x08)) # MYSQL_TYPE_LONGLONG
payload.extend(struct.pack('<q', param))
elif isinstance(param, str):
encoded = param.encode('utf-8')
payload.extend(struct.pack('<B', 0x0F)) # MYSQL_TYPE_VAR_STRING
payload.extend(write_length_encoded_int(len(encoded)))
payload.extend(encoded)
elif isinstance(param, float):
payload.extend(struct.pack('<B', 0x05)) # MYSQL_TYPE_DOUBLE
payload.extend(struct.pack('<d', param))
else:
payload.extend(b'\x00' * 1) # 空 null_bitmap
payload.append(0x00) # new_params_bound_flag: 0
packet = encode_packet(bytes(payload), seq)
sock.send(packet)
return read_response(sock)
5.8 COM_STMT_FETCH(0x1C)
从预处理语句的游标中获取指定数量的行。需要在 COM_STMT_EXECUTE 时启用游标。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x1C
1 4 字节 statement_id
5 4 字节 num_rows (要获取的行数)
5.9 COM_STMT_CLOSE(0x19)
关闭预处理语句,释放服务器资源。不返回响应。
def send_com_stmt_close(sock, statement_id, seq=0):
"""关闭预处理语句"""
payload = b'\x19' + struct.pack('<I', statement_id)
packet = encode_packet(payload, seq)
sock.send(packet)
# COM_STMT_CLOSE 不返回响应
5.10 COM_RESET_CONNECTION(0x1F)
MySQL 5.7.3 引入,重置连接状态但不关闭连接:
def send_com_reset_connection(sock, seq=0):
"""重置连接状态"""
payload = b'\x1F'
packet = encode_packet(payload, seq)
sock.send(packet)
return read_response(sock)
重置内容
| 重置项 | 说明 |
|---|---|
| 事务状态 | 回滚未提交的事务 |
| 自动提交 | 恢复为服务器默认值 |
| 临时表 | 删除所有临时表 |
| 会话变量 | 恢复为全局默认值 |
| 预处理语句 | 关闭所有预处理语句 |
| 用户变量 | 清除所有用户变量 |
5.11 COM_CHANGE_USER(0x11)
在已建立的连接上切换用户:
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x11
1 变长 username (null 结尾)
变长 auth_response (长度编码)
变长 database (null 结尾)
1 字节 character_set
变长 auth_plugin_name (null 结尾)
5.12 COM_STATISTICS(0x08)
获取服务器运行统计信息,返回一个文本字符串:
def send_com_statistics(sock, seq=0):
"""获取服务器统计"""
payload = b'\x08'
packet = encode_packet(payload, seq)
sock.send(packet)
data = sock.recv(4096)
payload, _, _ = decode_packet(data)
return payload.decode('utf-8')
# 返回类似:
# Uptime: 12345 Threads: 2 Questions: 100 Slow queries: 0
# Opens: 50 Flush tables: 1 Open tables: 20 Queries per second avg: 0.008
5.13 完整的命令交互示例
"""
mysql_command_session.py
完整的 MySQL 命令会话示例
"""
import socket
import struct
class MySQLSession:
"""简化的 MySQL 会话(需要已认证的连接)"""
def __init__(self, sock):
self.sock = sock
self.seq = 0
def _send(self, payload: bytes):
"""发送一个数据包"""
header = struct.pack('<I', len(payload))[:3] + struct.pack('B', self.seq & 0xFF)
self.sock.send(header + payload)
self.seq = (self.seq + 1) & 0xFF
def _recv(self) -> bytes:
"""接收一个响应"""
data = b''
while True:
chunk = self.sock.recv(65536)
if not chunk:
break
data += chunk
if len(data) >= 4:
pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
if len(data) >= 4 + pkt_len:
break
self.seq = (self.seq + 1) & 0xFF
return data[4:] # 去掉包头
def query(self, sql: str) -> dict:
"""执行 SQL 查询"""
self.seq = 0
self._send(b'\x03' + sql.encode('utf-8'))
response = self._recv()
first_byte = response[0]
if first_byte == 0x00:
return {'type': 'OK', 'data': response}
elif first_byte == 0xFF:
error_code = struct.unpack('<H', response[1:3])[0]
message = response[9:].decode('utf-8', errors='replace')
return {'type': 'ERR', 'code': error_code, 'message': message}
else:
return {'type': 'RESULT', 'data': response}
def init_db(self, database: str):
"""切换数据库"""
self.seq = 0
self._send(b'\x02' + database.encode('utf-8'))
return self._recv()
def ping(self):
"""心跳检测"""
self.seq = 0
self._send(b'\x0E')
response = self._recv()
return response[0] == 0x00
def quit(self):
"""关闭连接"""
self.seq = 0
self._send(b'\x01')
self.sock.close()
def statistics(self):
"""获取统计信息"""
self.seq = 0
self._send(b'\x08')
return self._recv().decode('utf-8')
# 使用示例
def demo():
print("MySQL 命令会话演示")
print("请确保 MySQL 服务器已启动并完成认证")
print()
print("典型命令交互:")
print(" COM_QUERY → SELECT * FROM users")
print(" COM_INIT_DB → USE mydb")
print(" COM_PING → 心跳检测")
print(" COM_STAT → 获取统计信息")
print(" COM_QUIT → 关闭连接")
if __name__ == '__main__':
demo()
5.14 注意事项
重要提醒
COM_QUERY 的字符集:SQL 语句使用连接握手时协商的字符集编码。推荐 UTF-8。
COM_QUERY 不能多条语句:除非客户端在握手时设置了
CLIENT_MULTI_STATEMENTS能力标志,否则一次只能发送一条 SQL。COM_PING 的用途:连接池通常使用
COM_PING检测连接存活,比SELECT 1更轻量。COM_RESET_CONNECTION vs COM_CHANGE_USER:前者重置所有状态;后者只切换用户,保留数据库等。
废弃命令:
COM_FIELD_LIST、COM_PROCESS_INFO、COM_PROCESS_KILL在 MySQL 8.0 中已废弃,应使用对应的 SQL 语句替代。预处理语句资源泄漏:使用
COM_STMT_PREPARE后必须用COM_STMT_CLOSE释放,否则会导致服务器资源泄漏。
5.15 业务场景
场景一:连接池健康检查
连接池(如 HikariCP)在借出连接前先执行 COM_PING 检测:
def borrow_connection(pool):
conn = pool.get()
if not conn.ping():
conn = pool.replace(conn)
return conn
场景二:批量操作优化
使用 CLIENT_MULTI_STATEMENTS 在一次请求中执行多条语句:
# 启用多语句模式后
sql = "INSERT INTO t1 VALUES(1); INSERT INTO t1 VALUES(2); INSERT INTO t1 VALUES(3)"
sock.send(b'\x03' + sql.encode('utf-8'))
# 接收多个结果集...
场景三:连接状态重置
Web 应用的请求处理完成后,使用 COM_RESET_CONNECTION 将连接归还到干净状态,避免会话变量泄漏。
5.16 扩展阅读
上一章:04 - 数据包格式 下一章:06 - 文本协议 —— 深入理解 COM_QUERY 的文本结果集格式。