强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 13 章:客户端开发

第 13 章:客户端开发

13.1 客户端连接概览

AgensGraph 基于 PostgreSQL 协议,因此可以使用 PostgreSQL 的标准驱动连接。各语言的连接方式如下:

语言驱动Cypher 支持安装
Pythonpsycopg2 / agensgraph-driverpip install psycopg2-binary
JavaJDBC (PostgreSQL JDBC)Maven/Gradle 依赖
JavaScriptnode-postgres (pg)npm install pg
Golib/pq / pgxgo get
C/C++libpq系统自带
Rubypggem install pg

13.2 Python 客户端

13.2.1 psycopg2 连接

# 安装依赖
# pip install psycopg2-binary

import psycopg2
from psycopg2.extras import RealDictCursor

class AgensGraphClient:
    """AgensGraph Python 客户端"""

    def __init__(self, host='localhost', port=5432,
                 dbname='agens', user='agens', password='agens123'):
        self.conn_params = {
            'host': host,
            'port': port,
            'dbname': dbname,
            'user': user,
            'password': password
        }
        self.conn = None
        self.cursor = None

    def connect(self):
        """建立连接"""
        self.conn = psycopg2.connect(**self.conn_params)
        self.conn.autocommit = True
        self.cursor = self.conn.cursor()
        print(f"Connected to AgensGraph at {self.conn_params['host']}")

    def set_graph_path(self, graph_name):
        """设置当前图路径"""
        self.cursor.execute(f"SET graph_path = {graph_name}")

    def execute_cypher(self, query, params=None):
        """执行 Cypher 查询"""
        self.cursor.execute(query, params)
        if self.cursor.description:
            return self.cursor.fetchall()
        return None

    def close(self):
        """关闭连接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        print("Connection closed")

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

13.2.2 基本 CRUD 操作

# 使用示例
with AgensGraphClient(password='agens123') as client:
    client.set_graph_path('social_network')

    # 创建顶点
    client.execute_cypher("""
        CREATE (p:Person {name: %s, age: %s, city: %s})
    """, ('Alice', 30, '北京'))

    # 查询顶点
    results = client.execute_cypher("""
        MATCH (p:Person) WHERE p.name = %s
        RETURN p.name, p.age, p.city
    """, ('Alice',))
    for row in results:
        print(f"Name: {row[0]}, Age: {row[1]}, City: {row[2]}")

    # 更新属性
    client.execute_cypher("""
        MATCH (p:Person {name: %s})
        SET p.age = %s
    """, ('Alice', 31))

    # 创建关系
    client.execute_cypher("""
        MATCH (a:Person {name: %s}), (b:Person {name: %s})
        CREATE (a)-[:KNOWS {since: %s}]->(b)
    """, ('Alice', 'Bob', 2020))

    # 删除
    client.execute_cypher("""
        MATCH (p:Person {name: %s})
        DETACH DELETE p
    """, ('Alice',))

13.2.3 使用 ORM 风格封装

class GraphModel:
    """图数据模型基类"""

    def __init__(self, client, label):
        self.client = client
        self.label = label

    def create(self, **props):
        """创建顶点"""
        prop_str = ', '.join(f'{k}: %s' for k in props.keys())
        query = f"CREATE (n:{self.label} {{{prop_str}}}) RETURN n"
        return self.client.execute_cypher(query, list(props.values()))

    def find(self, **conditions):
        """查找顶点"""
        where = ' AND '.join(f'n.{k} = %s' for k in conditions.keys())
        query = f"MATCH (n:{self.label}) WHERE {where} RETURN n"
        return self.client.execute_cypher(query, list(conditions.values()))

    def find_all(self, limit=100):
        """查找所有"""
        query = f"MATCH (n:{self.label}) RETURN n LIMIT %s"
        return self.client.execute_cypher(query, (limit,))

    def update(self, match_props, **set_props):
        """更新顶点"""
        where = ' AND '.join(f'n.{k} = %s' for k in match_props.keys())
        set_clause = ', '.join(f'n.{k} = %s' for k in set_props.keys())
        query = f"MATCH (n:{self.label}) WHERE {where} SET {set_clause}"
        params = list(match_props.values()) + list(set_props.values())
        return self.client.execute_cypher(query, params)

    def delete(self, **conditions):
        """删除顶点"""
        where = ' AND '.join(f'n.{k} = %s' for k in conditions.keys())
        query = f"MATCH (n:{self.label}) WHERE {where} DETACH DELETE n"
        return self.client.execute_cypher(query, list(conditions.values()))

# 使用示例
with AgensGraphClient(password='agens123') as client:
    client.set_graph_path('social_network')
    person = GraphModel(client, 'Person')

    person.create(name='张三', age=30, city='北京')
    person.create(name='李四', age=28, city='上海')

    results = person.find(city='北京')
    print(results)

    person.update({'name': '张三'}, age=31)
    person.delete(name='李四')

13.2.4 连接池管理

from psycopg2 import pool

class AgensGraphPool:
    """连接池管理"""

    def __init__(self, min_conn=5, max_conn=20, **conn_params):
        self.pool = pool.ThreadedConnectionPool(
            min_conn, max_conn,
            **conn_params
        )

    def get_connection(self):
        return self.pool.getconn()

    def release_connection(self, conn):
        self.pool.putconn(conn)

    def close_all(self):
        self.pool.closeall()

# 使用连接池
pool = AgensGraphPool(
    min_conn=5, max_conn=20,
    host='localhost', port=5432,
    dbname='agens', user='agens', password='agens123'
)

conn = pool.get_connection()
try:
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("SET graph_path = social_network")
    cur.execute("MATCH (p:Person) RETURN p.name")
    results = cur.fetchall()
    for row in results:
        print(row)
    cur.close()
finally:
    pool.release_connection(conn)

13.3 Java 客户端

13.3.1 Maven 依赖

<!-- pom.xml -->
<dependencies>
    <!-- PostgreSQL JDBC 驱动 -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.7.1</version>
    </dependency>

    <!-- HikariCP 连接池 -->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>5.0.1</version>
    </dependency>

    <!-- SLF4J 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.9</version>
    </dependency>
</dependencies>

13.3.2 基本连接与查询

import java.sql.*;

public class AgensGraphClient {

    private static final String URL = "jdbc:postgresql://localhost:5432/agens";
    private static final String USER = "agens";
    private static final String PASSWORD = "agens123";

    public static void main(String[] args) {
        try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
            conn.setAutoCommit(true);

            // 设置图路径
            try (Statement stmt = conn.createStatement()) {
                stmt.execute("SET graph_path = social_network");
            }

            // 创建顶点
            String createCypher = "CREATE (p:Person {name: $1, age: $2})";
            try (PreparedStatement pstmt = conn.prepareStatement(createCypher)) {
                pstmt.setString(1, "Alice");
                pstmt.setInt(2, 30);
                pstmt.execute();
            }

            // 查询
            String queryCypher = "MATCH (p:Person) RETURN p.name, p.age";
            try (Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery(queryCypher)) {
                while (rs.next()) {
                    String name = rs.getString(1);
                    int age = rs.getInt(2);
                    System.out.printf("Name: %s, Age: %d%n", name, age);
                }
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

13.3.3 HikariCP 连接池

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.*;

public class AgensGraphPool {

    private static HikariDataSource dataSource;

    static {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/agens");
        config.setUsername("agens");
        config.setPassword("agens123");
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        config.setMaxLifetime(1800000);

        // PostgreSQL 特定配置
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");

        dataSource = new HikariDataSource(config);
    }

    public static Connection getConnection() throws SQLException {
        return dataSource.getConnection();
    }

    public static void close() {
        if (dataSource != null) {
            dataSource.close();
        }
    }

    // 使用示例
    public static List<String> getFriends(String personName) throws SQLException {
        List<String> friends = new ArrayList<>();
        String cypher = "MATCH (p:Person {name: ?})-[:KNOWS]->(f:Person) RETURN f.name";

        try (Connection conn = getConnection();
             PreparedStatement ps = conn.prepareStatement(cypher)) {
            ps.setString(1, personName);
            try (ResultSet rs = ps.executeQuery()) {
                while (rs.next()) {
                    friends.add(rs.getString(1));
                }
            }
        }
        return friends;
    }
}

13.3.4 事务管理

public class TransactionExample {

    public static void transferCredits(String from, String to, int amount)
            throws SQLException {
        try (Connection conn = AgensGraphPool.getConnection()) {
            conn.setAutoCommit(false);
            try {
                // 设置图路径
                try (Statement stmt = conn.createStatement()) {
                    stmt.execute("SET graph_path = social_network");
                }

                // 扣除发送方积分
                String debit = "MATCH (p:Person {name: ?}) SET p.credits = p.credits - ?";
                try (PreparedStatement ps = conn.prepareStatement(debit)) {
                    ps.setString(1, from);
                    ps.setInt(2, amount);
                    ps.executeUpdate();
                }

                // 增加接收方积分
                String credit = "MATCH (p:Person {name: ?}) SET p.credits = p.credits + ?";
                try (PreparedStatement ps = conn.prepareStatement(credit)) {
                    ps.setString(1, to);
                    ps.setInt(2, amount);
                    ps.executeUpdate();
                }

                conn.commit();
                System.out.println("Transfer successful");

            } catch (SQLException e) {
                conn.rollback();
                throw new RuntimeException("Transfer failed", e);
            }
        }
    }
}

13.4 JavaScript / Node.js 客户端

13.4.1 安装依赖

# 安装 node-postgres
npm install pg

# 安装 TypeScript 类型(可选)
npm install -D @types/pg

13.4.2 基本连接

// agensgraph-client.js
const { Pool, Client } = require('pg');

// 连接池配置
const pool = new Pool({
  host: 'localhost',
  port: 5432,
  database: 'agens',
  user: 'agens',
  password: 'agens123',
  max: 20,          // 最大连接数
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});

// 基本查询
async function executeCypher(query, params = []) {
  const client = await pool.connect();
  try {
    // 设置图路径
    await client.query('SET graph_path = social_network');
    const result = await client.query(query, params);
    return result.rows;
  } finally {
    client.release();
  }
}

// 创建顶点
async function createPerson(name, age, city) {
  const cypher = `
    CREATE (p:Person {name: $1, age: $2, city: $3})
    RETURN p.name AS name, p.age AS age
  `;
  const rows = await executeCypher(cypher, [name, age, city]);
  return rows[0];
}

// 查询顶点
async function findPerson(name) {
  const cypher = `
    MATCH (p:Person {name: $1})
    RETURN p.name AS name, p.age AS age, p.city AS city
  `;
  const rows = await executeCypher(cypher, [name]);
  return rows[0] || null;
}

// 查询朋友
async function getFriends(name) {
  const cypher = `
    MATCH (p:Person {name: $1})-[r:KNOWS]->(f:Person)
    RETURN f.name AS name, r.since AS since
    ORDER BY r.since DESC
  `;
  return executeCypher(cypher, [name]);
}

// 导出模块
module.exports = {
  pool,
  executeCypher,
  createPerson,
  findPerson,
  getFriends
};

13.4.3 Express.js 集成示例

// app.js
const express = require('express');
const { createPerson, findPerson, getFriends } = require('./agensgraph-client');

const app = express();
app.use(express.json());

// 创建人物
app.post('/api/persons', async (req, res) => {
  try {
    const { name, age, city } = req.body;
    const person = await createPerson(name, age, city);
    res.status(201).json(person);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: 'Failed to create person' });
  }
});

// 查询人物
app.get('/api/persons/:name', async (req, res) => {
  try {
    const person = await findPerson(req.params.name);
    if (!person) {
      return res.status(404).json({ error: 'Person not found' });
    }
    res.json(person);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: 'Query failed' });
  }
});

// 查询社交关系
app.get('/api/persons/:name/friends', async (req, res) => {
  try {
    const friends = await getFriends(req.params.name);
    res.json(friends);
  } catch (err) {
    console.error(err);
    res.status(500).json({ error: 'Query failed' });
  }
});

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

13.5 连接池最佳实践

13.5.1 连接池参数对照

参数Python (psycopg2)Java (HikariCP)Node.js (pg)
最小连接minconnminimumIdlemin (通过 pool 模式)
最大连接maxconnmaximumPoolSizemax
连接超时-connectionTimeoutconnectionTimeoutMillis
空闲超时-idleTimeoutidleTimeoutMillis
最大存活-maxLifetime-

13.5.2 连接池大小计算

推荐连接池大小:

  连接数 = (CPU 核心数 × 2) + 有效磁盘数

  示例:
    8 核 CPU, 1 块 SSD
    连接数 = (8 × 2) + 1 = 17

  一般建议:
    小型应用: 5-10
    中型应用: 10-30
    大型应用: 30-50 (配合 PgBouncer)

13.5.3 连接健康检查

# Python 连接健康检查
def check_connection(pool):
    """检查连接池健康状态"""
    try:
        conn = pool.getconn()
        cur = conn.cursor()
        cur.execute("SELECT 1")
        result = cur.fetchone()
        cur.close()
        pool.putconn(conn)
        return result[0] == 1
    except Exception as e:
        print(f"Connection check failed: {e}")
        return False
// Java HikariCP 健康检查
HikariConfig config = new HikariConfig();
config.setConnectionTestQuery("SELECT 1");
config.setValidationTimeout(5000);

13.6 批量操作优化

13.6.1 Python 批量导入

import psycopg2
from psycopg2.extras import execute_values

def bulk_create_persons(persons):
    """批量创建人物节点"""
    conn = psycopg2.connect(host='localhost', dbname='agens',
                           user='agens', password='agens123')
    conn.autocommit = True
    cur = conn.cursor()

    cur.execute("SET graph_path = social_network")

    # 使用 UNWIND 批量创建
    cypher = """
        UNWIND $persons AS person
        CREATE (p:Person {
            name: person.name,
            age: person.age,
            city: person.city
        })
        RETURN count(p) AS created
    """

    # 使用参数化批量插入
    persons_data = [
        {'name': f'Person_{i}', 'age': 20 + (i % 50), 'city': '北京'}
        for i in range(10000)
    ]

    cur.execute(cypher, {'persons': persons_data})
    result = cur.fetchone()
    print(f"Created {result[0]} persons")

    cur.close()
    conn.close()

# 调用
bulk_create_persons([])

13.7 错误处理与重试

13.7.1 Python 错误处理

import psycopg2
import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except psycopg2.OperationalError as e:
                    if attempt < max_retries - 1:
                        print(f"Attempt {attempt + 1} failed: {e}")
                        time.sleep(delay * (attempt + 1))
                    else:
                        raise
                except psycopg2.Error as e:
                    print(f"Database error: {e}")
                    raise
        return wrapper
    return decorator

@retry_on_failure(max_retries=3, delay=2)
def safe_query(query, params=None):
    """带重试的安全查询"""
    conn = psycopg2.connect(host='localhost', dbname='agens',
                           user='agens', password='agens123')
    try:
        conn.autocommit = True
        cur = conn.cursor()
        cur.execute("SET graph_path = social_network")
        cur.execute(query, params)
        if cur.description:
            return cur.fetchall()
        return None
    finally:
        conn.close()

13.7.2 常见错误代码

错误代码说明处理策略
08001连接失败检查网络和服务状态
08003连接不存在重新建立连接
08006连接关闭重新连接
23505唯一约束违反使用 MERGE 或更新
42601SQL 语法错误检查查询语句
42P01关系不存在检查表/图名称
57014查询取消查询超时

13.8 本章小结

要点说明
Pythonpsycopg2,支持连接池和批量操作
JavaJDBC + HikariCP,适合企业应用
JavaScriptnode-postgres (pg),适合 Web 应用
连接池大小 = (CPU × 2) + 磁盘数
错误处理重试机制 + 异常捕获
批量操作UNWIND + 参数化查询

13.9 练习

  1. 使用 Python 实现一个简单的图数据库 CRUD 封装类。
  2. 为 Node.js Express 应用添加完整的 AgensGraph 中间件。
  3. 实现一个带有重试逻辑和连接池的 Java 服务类。
  4. 编写一个批量数据导入脚本,比较逐条插入与批量插入的性能差异。

13.10 扩展阅读