强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

06 - D-Bus 信号机制

第 06 章:D-Bus 信号机制


6.1 信号概述

信号(Signal)是 D-Bus 的 发布/订阅 通信模式。与方法调用不同:

维度方法调用信号
模式请求-响应(1:1)发布-订阅(1:N)
方向客户端 → 服务端 → 客户端发送方 → 总线 → 所有订阅者
返回值必须回复(或 Error)无回复
接收者明确指定(destination所有匹配的订阅者
时序严格匹配 serial松耦合,异步分发

信号在系统中无处不在:

场景信号来源
网络状态变化StateChangedNetworkManager
设备插入/移除DeviceAdded / DeviceRemovedudev / UPower
用户登录/注销SessionNew / SessionRemovedsystemd-logind
属性变更PropertiesChanged任意服务
系统即将关机PrepareForShutdownsystemd-logind
播放器状态变化PlaybackStatusChangedMPRIS
桌面通知关闭NotificationClosedNotificationDaemon

6.2 信号消息格式

信号消息的结构:

┌────────────────────────────────────────────────┐
│ Header                                         │
│   Type:        SIGNAL                          │
│   Path:        /org/freedesktop/NetworkManager │
│   Interface:   org.freedesktop.NetworkManager  │
│   Member:      StateChanged                    │
│   Sender:      :1.5                            │
│   Serial:      42                              │
│   Signature:   u                               │
├────────────────────────────────────────────────┤
│ Body                                             │
│   arg0:    uint32  (70 = NM_STATE_CONNECTED_GLOBAL) │
└────────────────────────────────────────────────┘

关键区别:信号消息 没有 Destination 字段,由总线负责分发给所有匹配的订阅者。


6.3 发布信号

6.3.1 Python 发布信号

#!/usr/bin/env python3
"""D-Bus 信号发布示例 - 自定义服务"""

import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

class MyService(dbus.service.Object):
    """自定义 D-Bus 服务,发布信号"""
    
    def __init__(self, bus_name, object_path):
        super().__init__(bus_name, object_path)
        self._counter = 0
    
    # 声明信号(使用 dbus.service.signal 装饰器)
    @dbus.service.signal(dbus_interface='org.example.Signals', signature='si')
    def StatusChanged(self, status, code):
        """状态变化信号,参数: (string status, int32 code)"""
        print(f"发送信号: StatusChanged({status}, {code})")
    
    @dbus.service.signal(dbus_interface='org.example.Signals', signature='a{sv}')
    def DataUpdated(self, data):
        """数据更新信号,参数: (dict data)"""
        print(f"发送信号: DataUpdated({data})")
    
    @dbus.service.method(dbus_interface='org.example.Signals', in_signature='', out_signature='')
    def TriggerSignal(self):
        """方法:触发信号发送"""
        self._counter += 1
        self.StatusChanged("running", self._counter)
        self.DataUpdated({
            'timestamp': dbus.Int64(1234567890),
            'value': dbus.Double(3.14),
            'name': 'test',
        })

# 设置总线
bus = dbus.SessionBus()
bus_name = dbus.service.BusName('org.example.Signals', bus)
service = MyService(bus_name, '/org/example/Signals')

print("信号服务已启动,等待客户端触发...")
print("在另一个终端运行: busctl call --user org.example.Signals /org/example/Signals org.example.Signals TriggerSignal")

loop = GLib.MainLoop()
loop.run()

6.3.2 GDBus (C) 发布信号

#include <gio/gio.h>

/* 服务端发布信号 */
static void emit_signal(GDBusConnection *conn) {
    GError *error = NULL;
    
    g_dbus_connection_emit_signal(
        conn,
        NULL,                            /* destination (NULL = broadcast) */
        "/org/example/Signals",          /* object path */
        "org.example.Signals",           /* interface */
        "StatusChanged",                 /* signal name */
        g_variant_new("(si)", "running", 42),  /* parameters */
        &error
    );
    
    if (error) {
        g_printerr("发送信号失败: %s\n", error->message);
        g_error_free(error);
    }
}

6.3.3 命令行发送信号

# 使用 dbus-send 发送自定义信号
dbus-send --session \
  --type=signal \
  /org/example/Signals \
  org.example.Signals.StatusChanged \
  string:"running" \
  int32:42

# 注意:dbus-send 发送的信号没有总线名称前缀
# 因此 sender 字段将是发送者的唯一名称(如 :1.99)

6.4 订阅信号

6.4.1 Python 订阅信号

#!/usr/bin/env python3
"""D-Bus 信号订阅示例"""

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

bus = dbus.SessionBus()
loop = GLib.MainLoop()

def on_status_changed(status, code):
    """处理 StatusChanged 信号"""
    print(f"收到信号: StatusChanged(status={status}, code={code})")

def on_data_updated(data):
    """处理 DataUpdated 信号"""
    print(f"收到信号: DataUpdated(data={data})")

def on_properties_changed(interface, changed, invalidated):
    """处理 PropertiesChanged 信号(标准接口)"""
    print(f"属性变化: interface={interface}")
    for key, value in changed.items():
        print(f"  {key} = {value}")
    if invalidated:
        print(f"  已失效: {invalidated}")

# 方式 1:通过信号名匹配
bus.add_signal_receiver(
    on_status_changed,
    signal_name='StatusChanged',
    dbus_interface='org.example.Signals',
    bus_name='org.example.Signals',
    path='/org/example/Signals'
)

bus.add_signal_receiver(
    on_data_updated,
    signal_name='DataUpdated',
    dbus_interface='org.example.Signals',
    bus_name='org.example.Signals',
    path='/org/example/Signals'
)

# 方式 2:监听所有 PropertiesChanged 信号
bus.add_signal_receiver(
    on_properties_changed,
    signal_name='PropertiesChanged',
    dbus_interface='org.freedesktop.DBus.Properties',
)

print("正在监听信号,按 Ctrl+C 退出...")
loop.run()

6.4.2 命令行监听

# 方式 1:使用 busctl monitor
busctl monitor --user

# 方式 2:使用 busctl monitor + 匹配规则
busctl monitor --user \
  --match="type='signal',interface='org.example.Signals'"

# 方式 3:使用 dbus-monitor
dbus-monitor --session \
  "type='signal',interface='org.example.Signals'"

# 方式 4:使用 gdbus
gdbus monitor --session --dest org.example.Signals

6.4.3 GDBus (C) 订阅信号

#include <gio/gio.h>

static void on_signal(GDBusConnection *conn,
                      const gchar *sender,
                      const gchar *object_path,
                      const gchar *interface_name,
                      const gchar *signal_name,
                      GVariant *parameters,
                      gpointer user_data) {
    const gchar *status;
    gint32 code;
    g_variant_get(parameters, "(&si)", &status, &code);
    g_print("收到信号: StatusChanged(%s, %d)\n", status, code);
}

int main(void) {
    GMainLoop *loop = g_main_loop_new(NULL, FALSE);
    GError *error = NULL;
    
    GDBusConnection *conn = g_bus_get_sync(G_BUS_TYPE_SESSION, NULL, &error);
    
    /* 订阅信号 */
    guint sub_id = g_dbus_connection_signal_subscribe(
        conn,
        "org.example.Signals",       /* sender */
        "org.example.Signals",       /* interface */
        "StatusChanged",             /* signal name */
        "/org/example/Signals",      /* object path */
        NULL,                        /* arg0 */
        G_DBUS_SIGNAL_FLAGS_NONE,
        on_signal,
        loop,                        /* user_data */
        NULL                         /* GDestroyNotify */
    );
    
    g_print("正在监听信号...\n");
    g_main_loop_run(loop);
    
    g_dbus_connection_signal_unsubscribe(conn, sub_id);
    g_object_unref(conn);
    g_main_loop_unref(loop);
    return 0;
}

6.5 匹配规则详解

匹配规则控制哪些信号会被投递到订阅者。

6.5.1 规则语法

type='signal',sender='...',interface='...',member='...',path='...',arg0='...'

6.5.2 匹配字段

字段说明示例
type消息类型type='signal'
sender发送方名称sender='org.freedesktop.NetworkManager'
interface接口名称interface='org.freedesktop.DBus.Properties'
member信号名称member='StateChanged'
path精确路径匹配path='/org/freedesktop/NetworkManager'
path_namespace路径前缀匹配path_namespace='/org/freedesktop'
arg0 ~ arg63参数值精确匹配arg0='Connected'
arg0namespace参数为总线名时前缀匹配arg0namespace='org.freedesktop'
eavesdrop接收所有消息(需特权)eavesdrop='true'

6.5.3 添加/删除匹配规则

# 通过 D-Bus 方法添加匹配规则
dbus-send --session --dest=org.freedesktop.DBus \
  --type=method_call \
  /org/freedesktop/DBus \
  org.freedesktop.DBus.AddMatch \
  string:"type='signal',sender='org.freedesktop.NetworkManager',member='StateChanged'"

# 删除匹配规则
dbus-send --session --dest=org.freedesktop.DBus \
  --type=method_call \
  /org/freedesktop/DBus \
  org.freedesktop.DBus.RemoveMatch \
  string:"type='signal',sender='org.freedesktop.NetworkManager',member='StateChanged'"

6.5.4 匹配规则示例

# 匹配所有 PropertiesChanged 信号
"type='signal',member='PropertiesChanged'"

# 匹配 NetworkManager 的所有信号
"type='signal',sender='org.freedesktop.NetworkManager'"

# 匹配特定路径下的接口变化
"type='signal',path_namespace='/org/freedesktop/NetworkManager',member='DeviceAdded'"

# 匹配 arg0 为特定值的信号
"type='signal',interface='org.freedesktop.login1.Manager',member='SessionNew',arg0='c2'"

# 匹配某个路径上所有接口的所有信号
"type='signal',path='/org/example/MyObject'"

6.6 信号过滤

6.6.1 总线侧过滤

守护进程在总线侧根据匹配规则进行过滤:

发送方 → [dbus-daemon] → 匹配规则引擎 → 只投递给匹配的订阅者
                                    ↓
                              不匹配的消息被丢弃

6.6.2 客户端侧过滤

在客户端代码中进行二次过滤:

#!/usr/bin/env python3
"""客户端侧信号过滤"""

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
loop = GLib.MainLoop()

def on_state_changed(status, code):
    # 客户端侧过滤:只处理 code > 0 的信号
    if code > 0:
        print(f"处理状态变化: {status} (code={code})")
    else:
        print(f"忽略信号: code={code}")

# 订阅时使用路径前缀过滤
bus.add_signal_receiver(
    on_state_changed,
    signal_name='StatusChanged',
    dbus_interface='org.example.Signals',
    path_keyword='path',     # 将路径作为 keyword 参数传入回调
    sender_keyword='sender', # 将发送方作为 keyword 参数传入回调
)

print("监听中(带过滤)...")
loop.run()

6.6.3 使用 eavesdrop 监听

# eavesdrop 允许监听所有消息(包括发给其他客户端的)
# 需要特权(通常是 root 或 dbus-daemon 配置允许)
dbus-monitor --session "eavesdrop='true',type='signal'"

注意dbus-broker 默认禁止 eavesdrop,需要在策略文件中显式授权。


6.7 信号风暴防护

当信号频率过高时(如传感器数据),可能造成性能问题。

6.7.1 信号风暴的症状

症状原因
接收方 CPU 占用飙升每条信号都触发回调函数
消息队列溢出信号发送速度 > 处理速度
dbus-daemon 高内存大量消息在队列中等待
应用响应迟缓主循环被信号处理占满

6.7.2 防护策略

策略 1:节流(Throttle)

#!/usr/bin/env python3
"""信号接收端节流"""

import time
import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
loop = GLib.MainLoop()

last_time = 0
MIN_INTERVAL = 0.1  # 最小间隔 100ms

def on_data_changed(data):
    global last_time
    now = time.monotonic()
    if now - last_time < MIN_INTERVAL:
        return  # 跳过过于频繁的信号
    last_time = now
    print(f"处理数据: {data}")

bus.add_signal_receiver(
    on_data_changed,
    signal_name='DataUpdated',
    dbus_interface='org.example.Signals',
)

print("节流监听中...")
loop.run()

策略 2:批量合并(Debounce)

#!/usr/bin/env python3
"""信号防抖(只处理最后一次)"""

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
loop = GLib.MainLoop()

pending_data = None
timer_id = None

def process_batch():
    """定时处理批量数据"""
    global pending_data, timer_id
    if pending_data is not None:
        print(f"批量处理: {pending_data}")
        pending_data = None
    timer_id = None
    return False  # 不重复定时器

def on_data_changed(data):
    global pending_data, timer_id
    pending_data = data
    # 重置定时器
    if timer_id is not None:
        GLib.source_remove(timer_id)
    timer_id = GLib.timeout_add(200, process_batch)  # 200ms 后处理

bus.add_signal_receiver(
    on_data_changed,
    signal_name='DataUpdated',
    dbus_interface='org.example.Signals',
)

print("防抖监听中...")
loop.run()

策略 3:服务端聚合发送

# 服务端:定期批量发送,而不是每次变化都发送
class BatchService(dbus.service.Object):
    def __init__(self, bus_name, object_path):
        super().__init__(bus_name, object_path)
        self._pending_updates = {}
        GLib.timeout_add(500, self._flush_updates)  # 每 500ms 发送一次
    
    @dbus.service.signal(dbus_interface='org.example.Signals', signature='a{sv}')
    def BatchUpdated(self, data):
        pass
    
    def update(self, key, value):
        self._pending_updates[key] = value
    
    def _flush_updates(self):
        if self._pending_updates:
            self.BatchUpdated(self._pending_updates)
            self._pending_updates = {}
        return True  # 继续定时器

6.8 实战场景

场景 1:监控 USB 设备插拔

#!/usr/bin/env python3
"""监控 USB 设备插拔"""

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
loop = GLib.MainLoop()

def interfaces_added(path, interfaces):
    for iface_name, props in interfaces.items():
        if 'org.freedesktop.login1.Seat' in iface_name:
            continue
        print(f"设备添加: {path}")
        for k, v in props.items():
            print(f"  {k} = {v}")

def interfaces_removed(path, interfaces):
    print(f"设备移除: {path}")
    print(f"  接口: {interfaces}")

# 使用 ObjectManager 的信号
bus.add_signal_receiver(
    interfaces_added,
    signal_name='InterfacesAdded',
    dbus_interface='org.freedesktop.DBus.ObjectManager',
    bus_name='org.freedesktop.login1',
)

bus.add_signal_receiver(
    interfaces_removed,
    signal_name='InterfacesRemoved',
    dbus_interface='org.freedesktop.DBus.ObjectManager',
    bus_name='org.freedesktop.login1',
)

print("监控设备插拔...")
loop.run()

场景 2:监控系统关机/挂起

#!/usr/bin/env python3
"""监控系统电源事件"""

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
loop = GLib.MainLoop()

def on_prepare_for_shutdown(starting):
    if starting:
        print("⚠️ 系统即将关机!正在执行清理...")
        # 在这里执行清理逻辑
    else:
        print("关机准备取消")

def on_prepare_for_sleep(starting):
    if starting:
        print("💤 系统即将休眠!正在保存状态...")
        # 在这里保存状态
    else:
        print("系统唤醒")

bus.add_signal_receiver(
    on_prepare_for_shutdown,
    signal_name='PrepareForShutdown',
    dbus_interface='org.freedesktop.login1.Manager',
)

bus.add_signal_receiver(
    on_prepare_for_sleep,
    signal_name='PrepareForSleep',
    dbus_interface='org.freedesktop.login1.Manager',
)

print("监控电源事件...")
loop.run()

本章小结

概念说明
信号1:N 发布/订阅模型,无回复
匹配规则控制信号分发范围的过滤表达式
AddMatch / RemoveMatch运行时管理匹配规则
PropertiesChanged最常用的信号,监听属性变化
eavesdrop监听所有消息(需特权)
信号风暴高频信号的性能问题,需要节流/防抖

扩展阅读