强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

10-Socket API 基础

10 - Socket API 基础

10.1 Socket 概述

Socket 是网络编程的 API 接口,提供端到端的通信抽象。

Socket 类型:
• SOCK_STREAM (TCP) - 可靠、有序、字节流
• SOCK_DGRAM (UDP) - 不可靠、无序、数据报
• SOCK_RAW (RAW) - 直接访问 IP 层

地址族:
• AF_INET - IPv4
• AF_INET6 - IPv6
• AF_UNIX - 本地通信

10.2 Socket 创建与基本操作

import socket

# 创建 TCP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建 UDP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 常用选项
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 地址重用
sock.setblocking(False)  # 非阻塞模式
sock.settimeout(10)      # 超时时间

# 关闭
sock.close()

# 推荐使用 with 语句
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect(('example.com', 80))
    sock.sendall(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
    data = sock.recv(4096)

10.3 bind 绑定地址

"""绑定到指定地址和端口"""
import socket

# 创建服务器 Socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定地址
# 格式:(host, port)
server_sock.bind(('0.0.0.0', 8080))   # 所有网卡,端口 8080
server_sock.bind(('127.0.0.1', 8080)) # 仅本地
server_sock.bind(('192.168.1.100', 8080)) # 特定网卡

# 绑定到随机端口
server_sock.bind(('0.0.0.0', 0))
print(f"绑定到: {server_sock.getsockname()}")  # ('0.0.0.0', 随机端口)
特殊地址:
• '0.0.0.0' - 监听所有接口
• '127.0.0.1' - 仅本地
• '' - 等同于 '0.0.0.0'

端口 0:由系统分配随机端口

10.4 listen 监听连接

"""TCP 服务器监听"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(('0.0.0.0', 8080))

# 开始监听
# backlog: 等待连接的队列长度
server_sock.listen(128)

print("服务器启动,等待连接...")
backlog 参数:

               ┌─────────────────┐
    SYN ───→  │   半连接队列      │ (SYN Queue)
               └────────┬────────┘
                        │ 三次握手完成
                        ▼
               ┌─────────────────┐
    accept()← │   全连接队列      │ (Accept Queue)
               └─────────────────┘

• 半连接队列:SYN_RECV 状态的连接
• 全连接队列:ESTABLISHED 等待 accept()

backlog 参数控制全连接队列大小
实际值 = min(backlog, somaxconn)

10.5 accept 接受连接

"""阻塞式接受连接"""
import socket
import threading

def handle_client(client_sock, addr):
    """处理客户端连接"""
    print(f"新连接: {addr}")
    try:
        while True:
            data = client_sock.recv(1024)
            if not data:
                break
            client_sock.sendall(data)  # 回显
    except ConnectionResetError:
        pass
    finally:
        client_sock.close()
        print(f"连接关闭: {addr}")

def tcp_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 8080))
    server.listen(128)
    
    while True:
        client_sock, addr = server.accept()  # 阻塞等待
        
        # 多线程处理
        t = threading.Thread(target=handle_client, args=(client_sock, addr))
        t.daemon = True
        t.start()

if __name__ == '__main__':
    tcp_server()

10.6 connect 连接服务器

"""TCP 客户端连接"""
import socket

def tcp_client():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(10)
        
        # 连接服务器
        try:
            sock.connect(('127.0.0.1', 8080))
        except ConnectionRefusedError:
            print("连接被拒绝")
            return
        except socket.timeout:
            print("连接超时")
            return
        
        # 发送数据
        sock.sendall(b"Hello Server")
        
        # 接收响应
        data = sock.recv(1024)
        print(f"收到: {data.decode()}")

tcp_client()

10.7 send 和 recv

"""
TCP 发送注意事项:
• send() 不保证发送所有数据
• sendall() 保证发送所有数据
• 返回实际发送的字节数

TCP 接收注意事项:
• recv() 返回实际接收到的数据
• 返回空 bytes 表示连接关闭
• 需要循环接收直到收到完整消息
"""

def send_all(sock, data):
    """确保发送所有数据"""
    total_sent = 0
    while total_sent < len(data):
        sent = sock.send(data[total_sent:])
        if sent == 0:
            raise ConnectionError("发送失败")
        total_sent += sent
    return total_sent

def recv_all(sock, size):
    """确保接收所有数据"""
    data = bytearray()
    while len(data) < size:
        chunk = sock.recv(size - len(data))
        if not chunk:
            raise ConnectionError("连接关闭")
        data.extend(chunk)
    return bytes(data)

10.8 UDP 收发

"""UDP 发送和接收"""
import socket

def udp_sender():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    # 发送数据报
    sock.sendto(b"Hello UDP", ('127.0.0.1', 9999))
    
    # 接收响应
    data, addr = sock.recvfrom(1024)
    print(f"来自 {addr}: {data.decode()}")
    
    sock.close()

def udp_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('0.0.0.0', 9999))
    
    while True:
        data, addr = sock.recvfrom(1024)  # 接收数据报和来源地址
        print(f"来自 {addr}: {data.decode()}")
        
        # 回复
        sock.sendto(b"ACK", addr)

# sendto(data, address) - 发送到指定地址
# recvfrom(bufsize) - 返回 (data, address)

10.9 IO 多路复用

select(全平台)

"""select 模型"""
import socket
import select

def select_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 8080))
    server.listen(128)
    server.setblocking(False)
    
    inputs = [server]   # 监听读事件
    outputs = []        # 监听写事件
    
    while True:
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        
        for sock in readable:
            if sock is server:
                # 新连接
                client, addr = server.accept()
                client.setblocking(False)
                inputs.append(client)
            else:
                # 数据到达
                data = sock.recv(1024)
                if data:
                    print(f"收到: {data}")
                    if sock not in outputs:
                        outputs.append(sock)
                else:
                    # 连接关闭
                    if sock in outputs:
                        outputs.remove(sock)
                    inputs.remove(sock)
                    sock.close()
        
        for sock in writable:
            # 可以发送数据
            sock.sendall(b"ACK")
            outputs.remove(sock)
        
        for sock in exceptional:
            # 异常
            inputs.remove(sock)
            if sock in outputs:
                outputs.remove(sock)
            sock.close()

poll(Linux/Unix)

"""poll 模型"""
import socket
import select

def poll_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 8080))
    server.listen(128)
    
    poll = select.poll()
    poll.register(server, select.POLLIN)
    
    fd_to_socket = {server.fileno(): server}
    
    while True:
        events = poll.poll()
        
        for fd, event in events:
            sock = fd_to_socket[fd]
            
            if sock is server:
                client, addr = server.accept()
                poll.register(client, select.POLLIN)
                fd_to_socket[client.fileno()] = client
            elif event & select.POLLIN:
                data = sock.recv(1024)
                if data:
                    sock.sendall(data)
                else:
                    poll.unregister(sock)
                    sock.close()
                    del fd_to_socket[fd]

epoll(Linux 推荐)

"""epoll 模型"""
import socket
import select

def epoll_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 8080))
    server.listen(128)
    server.setblocking(False)
    
    epoll = select.epoll()
    epoll.register(server.fileno(), select.EPOLLIN)
    
    fd_to_socket = {server.fileno(): server}
    
    try:
        while True:
            events = epoll.poll()
            
            for fd, event in events:
                sock = fd_to_socket[fd]
                
                if sock is server:
                    client, addr = server.accept()
                    client.setblocking(False)
                    epoll.register(client.fileno(), select.EPOLLIN)
                    fd_to_socket[client.fileno()] = client
                
                elif event & select.EPOLLIN:
                    data = sock.recv(1024)
                    if data:
                        # 修改为监听写事件
                        epoll.modify(fd, select.EPOLLOUT)
                    else:
                        epoll.unregister(fd)
                        sock.close()
                        del fd_to_socket[fd]
                
                elif event & select.EPOLLOUT:
                    sock.sendall(b"ACK")
                    epoll.modify(fd, select.EPOLLIN)
    finally:
        epoll.unregister(server.fileno())
        epoll.close()
        server.close()

IO 多路复用对比

方法最大连接数时间复杂度平台特点
select1024O(n)全平台简单,有数量限制
poll无限O(n)Unix无数量限制
epoll无限O(1)Linux高性能,边缘触发
kqueue无限O(1)BSD/macOS类似 epoll

10.10 非阻塞 IO

"""非阻塞 Socket"""
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)

try:
    sock.connect(('example.com', 80))
except BlockingIOError:
    pass  # 连接正在进行中

# 使用 select 等待连接完成
import select
_, writable, _ = select.select([], [sock], [], 5)

if writable:
    # 连接已建立
    sock.sendall(b"GET / HTTP/1.0\r\n\r\n")

10.11 注意事项

⚠️ SO_REUSEADDR:服务器必须设置,否则重启时可能绑定失败

⚠️ 阻塞 vs 非阻塞:阻塞模式简单但不支持并发,需要多线程或多路复用

⚠️ epoll 事件处理:边缘触发必须一次性读完所有数据,否则可能丢失事件

⚠️ Socket 关闭顺序:先 shutdown 再 close,确保数据发送完毕

10.12 扩展阅读


下一章11 - TCP 编程实战 - 客户端/服务器、并发模型、粘包处理