强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 5 章:NNG 现代 API 详解

5.1 NNG 与 nanomsg 的设计差异

NNG 不仅是 nanomsg 的升级,而是一次架构重写。核心改进包括:

特性nanomsgNNG
I/O 模型同步阻塞异步事件驱动
并发模型一 Socket 一线程共享 Socket + Context
内存管理固定分配器可插拔分配器
TLS不支持原生支持
WebSocket不支持原生支持
零拷贝有限完整 AIO 零拷贝
API 风格POSIX-like现代 C(返回错误码)
错误处理nn_errno()函数返回值 nng_err

5.2 NNG 核心 API

5.2.1 API 函数分类

Socket 管理:

函数用途
nng_pair0_open()创建 PAIR Socket
nng_pub0_open() / nng_sub0_open()创建 PUB/SUB Socket
nng_req0_open() / nng_rep0_open()创建 REQ/REP Socket
nng_push0_open() / nng_pull0_open()创建 PUSH/PULL Socket
nng_bus0_open()创建 BUS Socket
nng_surveyor0_open() / nng_respondent0_open()创建 SURVEY Socket
nng_close()关闭 Socket

连接管理:

函数用途
nng_listen()监听地址
nng_dial()连接地址
nng_listener_create()创建 Listener 对象
nng_dialer_create()创建 Dialer 对象
nng_listener_start()启动 Listener
nng_dialer_start()启动 Dialer
nng_listener_close()关闭 Listener
nng_dialer_close()关闭 Dialer

消息收发:

函数用途
nng_send()发送消息
nng_recv()接收消息
nng_sendmsg()发送 nng_msg 对象
nng_recvmsg()接收 nng_msg 对象
nng_msg_alloc()分配消息
nng_msg_free()释放消息
nng_msg_body()获取消息体指针
nng_msg_len()获取消息长度
nng_msg_append()追加数据到消息
nng_msg_insert()在消息头部插入数据
nng_msg_trim()从消息头部裁剪数据
nng_msg_chop()从消息尾部裁剪数据

选项配置:

函数用途
nng_setopt()设置 Socket 选项
nng_getopt()获取 Socket 选项
nng_setopt_ms()设置毫秒选项
nng_setopt_int()设置整数选项
nng_setopt_string()设置字符串选项
nng_setopt_size()设置 size_t 选项

AIO(异步 I/O):

函数用途
nng_aio_alloc()分配 AIO 对象
nng_aio_free()释放 AIO 对象
nng_aio_begin()开始异步操作
nng_aio_finish()完成异步操作
nng_aio_wait()等待异步操作完成
nng_aio_result()获取异步操作结果
nng_aio_set_timeout()设置超时
nng_aio_set_cb()设置完成回调
nng_recv_aio()异步接收
nng_send_aio()异步发送

5.3 Socket 创建与连接

5.3.1 基本创建

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>

int main() {
    nng_socket sock;
    int rv;

    // 创建 Socket
    if ((rv = nng_pair0_open(&sock)) != 0) {
        fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    // 监听
    if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    // 使用 Socket ...
    nng_close(sock);
    return 0;
}

5.3.2 nng_listen() vs nng_dial()

// listen: 监听传入连接(服务端模式)
nng_listener listener;
rv = nng_listen(sock, "tcp://*:5555", &listener, 0);

// dial: 主动连接(客户端模式)
nng_dialer dialer;
rv = nng_dial(sock, "tcp://server:5555", &dialer, 0);

listenerdialer 参数可以传 NULL(不需要引用),也可以传指针以获取对象引用用于后续管理。


5.4 Listener 与 Dialer 对象

5.4.1 Listener 精细控制

nng_listener listener;
nng_socket sock;

nng_rep0_open(&sock);

// 创建 Listener(不立即启动)
nng_listener_create(&listener, sock);

// 配置 Listener 选项
nng_listener_setopt(listener, NNG_OPT_TCP_NODELAY, &(int){1}, sizeof(int));
nng_listener_setopt_string(listener, NNG_OPT_LISTEN_URL, "tcp://0.0.0.0:5555");

// 启动
nng_listener_start(listener, 0);

// 后续可关闭 Listener(不影响已建立的连接)
nng_listener_close(listener);

5.4.2 Dialer 精细控制

nng_dialer dialer;
nng_socket sock;

nng_req0_open(&sock);

// 创建 Dialer(不立即连接)
nng_dialer_create(&dialer, sock);

// 配置重连策略
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMINT, 100);   // 最小重连间隔
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMAXT, 30000); // 最大重连间隔

// 启动连接
nng_dialer_start(dialer, 0);

// Dialer 会自动重连

5.4.3 Listener/Dialer 选项

选项类型说明
NNG_OPT_URLstring完整 URL
NNG_OPT_MAXTTLint最大跳数
NNG_OPT_RECVMAXSZsize_t最大接收消息大小
NNG_OPT_TCP_NODELAYboolNagle 算法开关
NNG_OPT_TCP_KEEPALIVEboolTCP Keepalive
NNG_OPT_RECONNMINTms最小重连间隔
NNG_OPT_RECONNMAXTms最大重连间隔

5.5 消息操作

5.5.1 简单发送接收

// 发送
const char *msg = "Hello";
nng_send(sock, (void *)msg, strlen(msg), 0);

// 接收(需分配缓冲区)
char buf[256];
size_t sz = sizeof(buf);
nng_recv(sock, buf, &sz, 0);
printf("Received %zu bytes: %.*s\n", sz, (int)sz, buf);

5.5.2 零拷贝接收

void *buf = NULL;
size_t sz;
int rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == 0) {
    printf("Received: %.*s\n", (int)sz, buf);
    nng_free(buf, sz);  // 必须释放
}

5.5.3 nng_msg 对象操作

nng_msg 是 NNG 的消息对象,支持头部追加(header)和消息体操作:

nng_msg *msg;

// 分配消息
nng_msg_alloc(&msg, 0);

// 追加消息体
nng_msg_append(msg, "Hello", 5);

// 追加头部(协议头)
nng_msg_header(msg);  // 获取头部指针

// 发送消息对象
nng_sendmsg(sock, msg, 0);
// 注意:发送后 msg 被接管,不要再操作

// 接收消息对象
nng_msg *rmsg;
nng_recvmsg(sock, &rmsg, 0);

// 获取消息体
void *body = nng_msg_body(rmsg);
size_t len = nng_msg_len(rmsg);
printf("Body: %.*s\n", (int)len, (char *)body);

// 释放
nng_msg_free(rmsg);

5.5.4 消息管道操作(Pipeline)

nng_msg *msg;
nng_msg_alloc(&msg, 0);

// 在消息头部插入路由信息
nng_msg_insert(msg, "ROUTE", 5);

// 追加消息体
nng_msg_append(msg, "Payload", 7);

// 从头部裁剪(消费掉路由信息)
nng_msg_trim(msg, 5);

// 从尾部裁剪
nng_msg_chop(msg, 3);

5.6 异步 I/O(AIO)

NNG 的 AIO 是其最核心的现代特性,允许非阻塞地执行 I/O 操作。

5.6.1 AIO 基本使用

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>

// AIO 完成回调
void recv_callback(void *arg) {
    nng_aio *aio = (nng_aio *)arg;
    int rv = nng_aio_result(aio);

    if (rv != 0) {
        fprintf(stderr, "Async recv failed: %s\n", nng_strerror(rv));
        return;
    }

    // 获取接收到的消息
    nng_msg *msg = nng_aio_get_msg(aio);
    void *body = nng_msg_body(msg);
    size_t len = nng_msg_len(msg);
    printf("Async received: %.*s\n", (int)len, (char *)body);
    nng_msg_free(msg);
}

int main() {
    nng_socket sock;
    nng_aio *aio;
    int rv;

    nng_rep0_open(&sock);
    nng_listen(sock, "tcp://*:5555", NULL, 0);

    // 分配 AIO
    nng_aio_alloc(&aio, recv_callback, NULL);

    // 设置超时
    nng_aio_set_timeout(aio, 5000);  // 5 秒

    // 发起异步接收
    nng_recv_aio(sock, aio);

    // 主线程可以做其他事情...
    // nng_aio_wait(aio);  // 如果需要等待完成

    // 清理
    nng_aio_free(aio);
    nng_close(sock);
    return 0;
}

5.6.2 异步发送

void send_callback(void *arg) {
    nng_aio *aio = (nng_aio *)arg;
    int rv = nng_aio_result(aio);
    if (rv == 0) {
        printf("Message sent successfully\n");
    } else {
        fprintf(stderr, "Send failed: %s\n", nng_strerror(rv));
    }
}

int main() {
    nng_socket sock;
    nng_aio *aio;

    nng_req0_open(&sock);
    nng_dial(sock, "tcp://server:5555", NULL, 0);

    nng_aio_alloc(&aio, send_callback, NULL);
    nng_aio_set_timeout(aio, 3000);

    // 准备消息
    nng_msg *msg;
    nng_msg_alloc(&msg, 0);
    nng_msg_append(msg, "Hello async!", 12);
    nng_aio_set_msg(aio, msg);

    // 异步发送
    nng_send_aio(sock, aio);

    nng_aio_wait(aio);
    nng_aio_free(aio);
    nng_close(sock);
    return 0;
}

5.6.3 AIO 状态机模式

实现一个简单的异步消息处理循环:

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>

typedef struct {
    nng_socket sock;
    nng_aio   *aio;
} server_ctx;

void server_recv_cb(void *arg);

void server_send_cb(void *arg) {
    server_ctx *ctx = (server_ctx *)arg;
    int rv = nng_aio_result(ctx->aio);
    if (rv != 0) {
        fprintf(stderr, "Send error: %s\n", nng_strerror(rv));
        return;
    }
    // 发送完成,继续接收下一个
    nng_recv_aio(ctx->sock, ctx->aio);
}

void server_recv_cb(void *arg) {
    server_ctx *ctx = (server_ctx *)arg;
    int rv = nng_aio_result(ctx->aio);

    if (rv != 0) {
        fprintf(stderr, "Recv error: %s\n", nng_strerror(rv));
        return;
    }

    // 处理消息
    nng_msg *msg = nng_aio_get_msg(ctx->aio);
    void *body = nng_msg_body(msg);
    size_t len = nng_msg_len(msg);
    printf("Received: %.*s\n", (int)len, (char *)body);

    // 构造响应(复用消息对象)
    nng_msg_clear(msg);
    nng_msg_append(msg, "OK", 2);
    nng_aio_set_msg(ctx->aio, msg);

    // 设置发送回调
    nng_aio_set_cb(ctx->aio, server_send_cb, ctx);
    nng_send_aio(ctx->sock, ctx->aio);
}

int main() {
    server_ctx ctx;
    int rv;

    if ((rv = nng_rep0_open(&ctx.sock)) != 0) {
        fprintf(stderr, "open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(ctx.sock, "tcp://*:5555", NULL, 0)) != 0) {
        fprintf(stderr, "listen: %s\n", nng_strerror(rv));
        return 1;
    }

    nng_aio_alloc(&ctx.aio, server_recv_cb, &ctx);
    nng_aio_set_timeout(ctx.aio, 60000);

    // 启动异步接收循环
    nng_recv_aio(ctx.sock, ctx.aio);

    printf("Async REP server running on tcp://*:5555\n");

    // 主线程阻塞(或做其他工作)
    nng_aio_wait(ctx.aio);

    nng_aio_free(ctx.aio);
    nng_close(ctx.sock);
    return 0;
}

5.7 Context(上下文)

Context 是 NNG 的重要创新,允许在单个 Socket 上创建多个独立的会话上下文,常用于多线程共享 Socket。

5.7.1 为什么需要 Context

nanomsg: 每个线程一个 Socket(资源开销大)

NNG:     多个线程共享一个 Socket,每个线程有自己的 Context
         ┌─────────┐
         │  Socket  │  (单个,线程安全)
         ├─────────┤
         │ Context1 │  (线程 A 的会话)
         │ Context2 │  (线程 B 的会话)
         │ Context3 │  (线程 C 的会话)
         └─────────┘

5.7.2 Context API

函数用途
nng_ctx_open()在 Socket 上创建 Context
nng_ctx_close()关闭 Context
nng_ctx_send()通过 Context 发送
nng_ctx_recv()通过 Context 接收
nng_ctx_sendmsg()通过 Context 发送消息对象
nng_ctx_recvmsg()通过 Context 接收消息对象
nng_ctx_get()获取 Context 选项
nng_ctx_set()设置 Context 选项
nng_ctx_id()获取 Context ID

5.7.3 Context 示例(多线程 REP 服务)

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

typedef struct {
    nng_ctx ctx;
    int     id;
} worker_args;

void *worker_thread(void *arg) {
    worker_args *wa = (worker_args *)arg;
    int id = wa->id;

    printf("Worker %d started\n", id);

    while (1) {
        nng_msg *msg;
        int rv;

        // 通过 Context 接收(独立的接收队列)
        rv = nng_ctx_recvmsg(wa->ctx, &msg, 0);
        if (rv != 0) {
            fprintf(stderr, "Worker %d recv error: %s\n", id, nng_strerror(rv));
            break;
        }

        void *body = nng_msg_body(msg);
        size_t len = nng_msg_len(msg);
        printf("Worker %d received: %.*s\n", id, (int)len, (char *)body);

        // 处理请求并准备响应
        nng_msg_clear(msg);
        char reply[64];
        snprintf(reply, sizeof(reply), "Response from worker %d", id);
        nng_msg_append(msg, reply, strlen(reply));

        // 通过 Context 发送
        rv = nng_ctx_sendmsg(wa->ctx, msg, 0);
        if (rv != 0) {
            fprintf(stderr, "Worker %d send error: %s\n", id, nng_strerror(rv));
            nng_msg_free(msg);
            break;
        }
    }

    return NULL;
}

int main() {
    nng_socket sock;
    int rv;
    int num_workers = 4;
    pthread_t threads[4];
    worker_args args[4];

    if ((rv = nng_rep0_open(&sock)) != 0) {
        fprintf(stderr, "nng_rep0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    // 创建 Context 并启动工作线程
    for (int i = 0; i < num_workers; i++) {
        rv = nng_ctx_open(&args[i].ctx, sock);
        if (rv != 0) {
            fprintf(stderr, "nng_ctx_open: %s\n", nng_strerror(rv));
            return 1;
        }
        args[i].id = i;
        pthread_create(&threads[i], NULL, worker_thread, &args[i]);
    }

    printf("REP server with %d workers on tcp://*:5555\n", num_workers);

    // 等待所有线程
    for (int i = 0; i < num_workers; i++) {
        pthread_join(threads[i], NULL);
    }

    nng_close(sock);
    return 0;
}

重要:Context 只在 REQ/REP 协议中工作,因为 REQ/REP 是有状态的协议(需要匹配请求和响应)。PAIR、PUB、PUSH 等无状态协议不需要 Context。


5.8 常用 Socket 选项

5.8.1 选项一览

选项类型说明默认值
NNG_OPT_RECVTIMEOms接收超时无限
NNG_OPT_SENDTIMEOms发送超时无限
NNG_OPT_RECONNMINTms最小重连间隔100ms
NNG_OPT_RECONNMAXTms最大重连间隔0
NNG_OPT_RECVBUFint接收队列大小128
NNG_OPT_SENDBUFint发送队列大小128
NNG_OPT_RECVMAXSZsize_t最大接收消息大小1MB
NNG_OPT_TCP_NODELAYbool禁用 Nagletrue
NNG_OPT_TCP_KEEPALIVEboolTCP Keepalivefalse
NNG_OPT_URLstring当前 URL(自动)
NNG_OPT_PEERNAMEstring对端名称(自动)
NNG_OPT_SUB_SUBSCRIBEbytesSUB 订阅(无)
NNG_OPT_SUB_UNSUBSCRIBEbytesSUB 取消订阅(无)

5.8.2 设置示例

// 设置接收超时 5 秒
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 5000);

// 设置发送超时 3 秒
nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, 3000);

// 设置最大接收消息 10MB
size_t maxsz = 10 * 1024 * 1024;
nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));

// 设置 TCP NoDelay
int nodelay = 1;
nng_setopt(sock, NNG_OPT_TCP_NODELAY, &nodelay, sizeof(nodelay));

// 设置接收队列大小
int bufsize = 256;
nng_setopt(sock, NNG_OPT_RECVBUF, &bufsize, sizeof(bufsize));

// SUB 订阅
const char *topic = "weather";
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, topic, strlen(topic));

5.9 错误处理模式

5.9.1 错误码

NNG 的函数返回 int 类型的错误码(nng_err 枚举),成功返回 0。

常见错误码:

错误码说明常见原因
0成功
NNG_ECLOSEDSocket 已关闭在关闭的 Socket 上操作
NNG_ETIMEDOUT超时recv/send 超时
NNG_ECONNREFUSED连接被拒目标服务未启动
NNG_ECONNRESET连接重置对端断开
NNG_ENOMEM内存不足系统资源不足
NNG_ESTATE状态错误协议状态不匹配(如连续 send)
NNG_EINVAL无效参数参数错误
NNG_ENOTSUP不支持操作不被当前协议支持
NNG_EADDRINUSE地址被占用端口已被绑定
NNG_EAGAIN资源暂不可用非阻塞模式下无数据

5.9.2 错误处理宏

推荐的错误处理模式:

#define CHECK(rv, msg) do {         \
    if ((rv) != 0) {                \
        fprintf(stderr, "%s: %s\n", \
                msg, nng_strerror(rv)); \
        exit(1);                    \
    }                               \
} while (0)

int main() {
    nng_socket sock;
    int rv;

    rv = nng_req0_open(&sock);
    CHECK(rv, "nng_req0_open");

    rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0);
    CHECK(rv, "nng_dial");

    // ... 使用 Socket ...

    nng_close(sock);
    return 0;
}

5.10 完整 NNG 服务器示例

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>

static volatile int running = 1;

void handle_signal(int sig) {
    running = 0;
}

int main() {
    nng_socket sock;
    int rv;

    signal(SIGINT, handle_signal);
    signal(SIGTERM, handle_signal);

    // 创建 REP Socket
    if ((rv = nng_rep0_open(&sock)) != 0) {
        fprintf(stderr, "Socket: %s\n", nng_strerror(rv));
        return 1;
    }

    // 设置选项
    nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);

    // 监听
    if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
        fprintf(stderr, "Listen: %s\n", nng_strerror(rv));
        nng_close(sock);
        return 1;
    }

    printf("NNG REP server running. Ctrl+C to stop.\n");

    while (running) {
        char *buf = NULL;
        size_t sz;

        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
        if (rv == NNG_ETIMEDOUT) {
            continue;  // 超时,检查 running 标志
        }
        if (rv != 0) {
            fprintf(stderr, "Recv: %s\n", nng_strerror(rv));
            break;
        }

        printf("Request: %.*s\n", (int)sz, buf);
        nng_free(buf, sz);

        const char *reply = "OK";
        if ((rv = nng_send(sock, (void *)reply, strlen(reply), 0)) != 0) {
            fprintf(stderr, "Send: %s\n", nng_strerror(rv));
            break;
        }
    }

    printf("\nShutting down...\n");
    nng_close(sock);
    return 0;
}

5.11 注意事项

Context 限制:Context 仅支持 REQ/REP 和 SURVEYOR/RESPONDENT 协议,不支持 PAIR/PUB/SUB/PUSH/PULL。

AIO 回调线程:AIO 回调在 NNG 内部线程池执行,不要在回调中做耗时操作。

Socket 线程安全:NNG Socket 是线程安全的,但 Context 的收发操作是独立的。

消息所有权nng_sendmsg()nng_aio_set_msg() 会接管消息所有权,不要再调用 nng_msg_free()


5.12 扩展阅读


上一章第 4 章:nanomsg C API 详解 | 下一章第 6 章:可扩展性与性能