第 13 章:客户端开发
第 13 章:客户端开发
13.1 客户端连接概览
AgensGraph 基于 PostgreSQL 协议,因此可以使用 PostgreSQL 的标准驱动连接。各语言的连接方式如下:
| 语言 | 驱动 | Cypher 支持 | 安装 |
|---|
| Python | psycopg2 / agensgraph-driver | ✅ | pip install psycopg2-binary |
| Java | JDBC (PostgreSQL JDBC) | ✅ | Maven/Gradle 依赖 |
| JavaScript | node-postgres (pg) | ✅ | npm install pg |
| Go | lib/pq / pgx | ✅ | go get |
| C/C++ | libpq | ✅ | 系统自带 |
| Ruby | pg | ✅ | gem install pg |
13.2 Python 客户端
13.2.1 psycopg2 连接
# 安装依赖
# pip install psycopg2-binary
import psycopg2
from psycopg2.extras import RealDictCursor
class AgensGraphClient:
"""AgensGraph Python 客户端"""
def __init__(self, host='localhost', port=5432,
dbname='agens', user='agens', password='agens123'):
self.conn_params = {
'host': host,
'port': port,
'dbname': dbname,
'user': user,
'password': password
}
self.conn = None
self.cursor = None
def connect(self):
"""建立连接"""
self.conn = psycopg2.connect(**self.conn_params)
self.conn.autocommit = True
self.cursor = self.conn.cursor()
print(f"Connected to AgensGraph at {self.conn_params['host']}")
def set_graph_path(self, graph_name):
"""设置当前图路径"""
self.cursor.execute(f"SET graph_path = {graph_name}")
def execute_cypher(self, query, params=None):
"""执行 Cypher 查询"""
self.cursor.execute(query, params)
if self.cursor.description:
return self.cursor.fetchall()
return None
def close(self):
"""关闭连接"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("Connection closed")
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
13.2.2 基本 CRUD 操作
# 使用示例
with AgensGraphClient(password='agens123') as client:
client.set_graph_path('social_network')
# 创建顶点
client.execute_cypher("""
CREATE (p:Person {name: %s, age: %s, city: %s})
""", ('Alice', 30, '北京'))
# 查询顶点
results = client.execute_cypher("""
MATCH (p:Person) WHERE p.name = %s
RETURN p.name, p.age, p.city
""", ('Alice',))
for row in results:
print(f"Name: {row[0]}, Age: {row[1]}, City: {row[2]}")
# 更新属性
client.execute_cypher("""
MATCH (p:Person {name: %s})
SET p.age = %s
""", ('Alice', 31))
# 创建关系
client.execute_cypher("""
MATCH (a:Person {name: %s}), (b:Person {name: %s})
CREATE (a)-[:KNOWS {since: %s}]->(b)
""", ('Alice', 'Bob', 2020))
# 删除
client.execute_cypher("""
MATCH (p:Person {name: %s})
DETACH DELETE p
""", ('Alice',))
13.2.3 使用 ORM 风格封装
class GraphModel:
"""图数据模型基类"""
def __init__(self, client, label):
self.client = client
self.label = label
def create(self, **props):
"""创建顶点"""
prop_str = ', '.join(f'{k}: %s' for k in props.keys())
query = f"CREATE (n:{self.label} {{{prop_str}}}) RETURN n"
return self.client.execute_cypher(query, list(props.values()))
def find(self, **conditions):
"""查找顶点"""
where = ' AND '.join(f'n.{k} = %s' for k in conditions.keys())
query = f"MATCH (n:{self.label}) WHERE {where} RETURN n"
return self.client.execute_cypher(query, list(conditions.values()))
def find_all(self, limit=100):
"""查找所有"""
query = f"MATCH (n:{self.label}) RETURN n LIMIT %s"
return self.client.execute_cypher(query, (limit,))
def update(self, match_props, **set_props):
"""更新顶点"""
where = ' AND '.join(f'n.{k} = %s' for k in match_props.keys())
set_clause = ', '.join(f'n.{k} = %s' for k in set_props.keys())
query = f"MATCH (n:{self.label}) WHERE {where} SET {set_clause}"
params = list(match_props.values()) + list(set_props.values())
return self.client.execute_cypher(query, params)
def delete(self, **conditions):
"""删除顶点"""
where = ' AND '.join(f'n.{k} = %s' for k in conditions.keys())
query = f"MATCH (n:{self.label}) WHERE {where} DETACH DELETE n"
return self.client.execute_cypher(query, list(conditions.values()))
# 使用示例
with AgensGraphClient(password='agens123') as client:
client.set_graph_path('social_network')
person = GraphModel(client, 'Person')
person.create(name='张三', age=30, city='北京')
person.create(name='李四', age=28, city='上海')
results = person.find(city='北京')
print(results)
person.update({'name': '张三'}, age=31)
person.delete(name='李四')
13.2.4 连接池管理
from psycopg2 import pool
class AgensGraphPool:
"""连接池管理"""
def __init__(self, min_conn=5, max_conn=20, **conn_params):
self.pool = pool.ThreadedConnectionPool(
min_conn, max_conn,
**conn_params
)
def get_connection(self):
return self.pool.getconn()
def release_connection(self, conn):
self.pool.putconn(conn)
def close_all(self):
self.pool.closeall()
# 使用连接池
pool = AgensGraphPool(
min_conn=5, max_conn=20,
host='localhost', port=5432,
dbname='agens', user='agens', password='agens123'
)
conn = pool.get_connection()
try:
conn.autocommit = True
cur = conn.cursor()
cur.execute("SET graph_path = social_network")
cur.execute("MATCH (p:Person) RETURN p.name")
results = cur.fetchall()
for row in results:
print(row)
cur.close()
finally:
pool.release_connection(conn)
13.3 Java 客户端
13.3.1 Maven 依赖
<!-- pom.xml -->
<dependencies>
<!-- PostgreSQL JDBC 驱动 -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.1</version>
</dependency>
<!-- HikariCP 连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
</dependency>
<!-- SLF4J 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
13.3.2 基本连接与查询
import java.sql.*;
public class AgensGraphClient {
private static final String URL = "jdbc:postgresql://localhost:5432/agens";
private static final String USER = "agens";
private static final String PASSWORD = "agens123";
public static void main(String[] args) {
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
conn.setAutoCommit(true);
// 设置图路径
try (Statement stmt = conn.createStatement()) {
stmt.execute("SET graph_path = social_network");
}
// 创建顶点
String createCypher = "CREATE (p:Person {name: $1, age: $2})";
try (PreparedStatement pstmt = conn.prepareStatement(createCypher)) {
pstmt.setString(1, "Alice");
pstmt.setInt(2, 30);
pstmt.execute();
}
// 查询
String queryCypher = "MATCH (p:Person) RETURN p.name, p.age";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(queryCypher)) {
while (rs.next()) {
String name = rs.getString(1);
int age = rs.getInt(2);
System.out.printf("Name: %s, Age: %d%n", name, age);
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
13.3.3 HikariCP 连接池
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.*;
public class AgensGraphPool {
private static HikariDataSource dataSource;
static {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/agens");
config.setUsername("agens");
config.setPassword("agens123");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
// PostgreSQL 特定配置
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource = new HikariDataSource(config);
}
public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
public static void close() {
if (dataSource != null) {
dataSource.close();
}
}
// 使用示例
public static List<String> getFriends(String personName) throws SQLException {
List<String> friends = new ArrayList<>();
String cypher = "MATCH (p:Person {name: ?})-[:KNOWS]->(f:Person) RETURN f.name";
try (Connection conn = getConnection();
PreparedStatement ps = conn.prepareStatement(cypher)) {
ps.setString(1, personName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
friends.add(rs.getString(1));
}
}
}
return friends;
}
}
13.3.4 事务管理
public class TransactionExample {
public static void transferCredits(String from, String to, int amount)
throws SQLException {
try (Connection conn = AgensGraphPool.getConnection()) {
conn.setAutoCommit(false);
try {
// 设置图路径
try (Statement stmt = conn.createStatement()) {
stmt.execute("SET graph_path = social_network");
}
// 扣除发送方积分
String debit = "MATCH (p:Person {name: ?}) SET p.credits = p.credits - ?";
try (PreparedStatement ps = conn.prepareStatement(debit)) {
ps.setString(1, from);
ps.setInt(2, amount);
ps.executeUpdate();
}
// 增加接收方积分
String credit = "MATCH (p:Person {name: ?}) SET p.credits = p.credits + ?";
try (PreparedStatement ps = conn.prepareStatement(credit)) {
ps.setString(1, to);
ps.setInt(2, amount);
ps.executeUpdate();
}
conn.commit();
System.out.println("Transfer successful");
} catch (SQLException e) {
conn.rollback();
throw new RuntimeException("Transfer failed", e);
}
}
}
}
13.4 JavaScript / Node.js 客户端
13.4.1 安装依赖
# 安装 node-postgres
npm install pg
# 安装 TypeScript 类型(可选)
npm install -D @types/pg
13.4.2 基本连接
// agensgraph-client.js
const { Pool, Client } = require('pg');
// 连接池配置
const pool = new Pool({
host: 'localhost',
port: 5432,
database: 'agens',
user: 'agens',
password: 'agens123',
max: 20, // 最大连接数
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
// 基本查询
async function executeCypher(query, params = []) {
const client = await pool.connect();
try {
// 设置图路径
await client.query('SET graph_path = social_network');
const result = await client.query(query, params);
return result.rows;
} finally {
client.release();
}
}
// 创建顶点
async function createPerson(name, age, city) {
const cypher = `
CREATE (p:Person {name: $1, age: $2, city: $3})
RETURN p.name AS name, p.age AS age
`;
const rows = await executeCypher(cypher, [name, age, city]);
return rows[0];
}
// 查询顶点
async function findPerson(name) {
const cypher = `
MATCH (p:Person {name: $1})
RETURN p.name AS name, p.age AS age, p.city AS city
`;
const rows = await executeCypher(cypher, [name]);
return rows[0] || null;
}
// 查询朋友
async function getFriends(name) {
const cypher = `
MATCH (p:Person {name: $1})-[r:KNOWS]->(f:Person)
RETURN f.name AS name, r.since AS since
ORDER BY r.since DESC
`;
return executeCypher(cypher, [name]);
}
// 导出模块
module.exports = {
pool,
executeCypher,
createPerson,
findPerson,
getFriends
};
13.4.3 Express.js 集成示例
// app.js
const express = require('express');
const { createPerson, findPerson, getFriends } = require('./agensgraph-client');
const app = express();
app.use(express.json());
// 创建人物
app.post('/api/persons', async (req, res) => {
try {
const { name, age, city } = req.body;
const person = await createPerson(name, age, city);
res.status(201).json(person);
} catch (err) {
console.error(err);
res.status(500).json({ error: 'Failed to create person' });
}
});
// 查询人物
app.get('/api/persons/:name', async (req, res) => {
try {
const person = await findPerson(req.params.name);
if (!person) {
return res.status(404).json({ error: 'Person not found' });
}
res.json(person);
} catch (err) {
console.error(err);
res.status(500).json({ error: 'Query failed' });
}
});
// 查询社交关系
app.get('/api/persons/:name/friends', async (req, res) => {
try {
const friends = await getFriends(req.params.name);
res.json(friends);
} catch (err) {
console.error(err);
res.status(500).json({ error: 'Query failed' });
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
13.5 连接池最佳实践
13.5.1 连接池参数对照
| 参数 | Python (psycopg2) | Java (HikariCP) | Node.js (pg) |
|---|
| 最小连接 | minconn | minimumIdle | min (通过 pool 模式) |
| 最大连接 | maxconn | maximumPoolSize | max |
| 连接超时 | - | connectionTimeout | connectionTimeoutMillis |
| 空闲超时 | - | idleTimeout | idleTimeoutMillis |
| 最大存活 | - | maxLifetime | - |
13.5.2 连接池大小计算
推荐连接池大小:
连接数 = (CPU 核心数 × 2) + 有效磁盘数
示例:
8 核 CPU, 1 块 SSD
连接数 = (8 × 2) + 1 = 17
一般建议:
小型应用: 5-10
中型应用: 10-30
大型应用: 30-50 (配合 PgBouncer)
13.5.3 连接健康检查
# Python 连接健康检查
def check_connection(pool):
"""检查连接池健康状态"""
try:
conn = pool.getconn()
cur = conn.cursor()
cur.execute("SELECT 1")
result = cur.fetchone()
cur.close()
pool.putconn(conn)
return result[0] == 1
except Exception as e:
print(f"Connection check failed: {e}")
return False
// Java HikariCP 健康检查
HikariConfig config = new HikariConfig();
config.setConnectionTestQuery("SELECT 1");
config.setValidationTimeout(5000);
13.6 批量操作优化
13.6.1 Python 批量导入
import psycopg2
from psycopg2.extras import execute_values
def bulk_create_persons(persons):
"""批量创建人物节点"""
conn = psycopg2.connect(host='localhost', dbname='agens',
user='agens', password='agens123')
conn.autocommit = True
cur = conn.cursor()
cur.execute("SET graph_path = social_network")
# 使用 UNWIND 批量创建
cypher = """
UNWIND $persons AS person
CREATE (p:Person {
name: person.name,
age: person.age,
city: person.city
})
RETURN count(p) AS created
"""
# 使用参数化批量插入
persons_data = [
{'name': f'Person_{i}', 'age': 20 + (i % 50), 'city': '北京'}
for i in range(10000)
]
cur.execute(cypher, {'persons': persons_data})
result = cur.fetchone()
print(f"Created {result[0]} persons")
cur.close()
conn.close()
# 调用
bulk_create_persons([])
13.7 错误处理与重试
13.7.1 Python 错误处理
import psycopg2
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except psycopg2.OperationalError as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed: {e}")
time.sleep(delay * (attempt + 1))
else:
raise
except psycopg2.Error as e:
print(f"Database error: {e}")
raise
return wrapper
return decorator
@retry_on_failure(max_retries=3, delay=2)
def safe_query(query, params=None):
"""带重试的安全查询"""
conn = psycopg2.connect(host='localhost', dbname='agens',
user='agens', password='agens123')
try:
conn.autocommit = True
cur = conn.cursor()
cur.execute("SET graph_path = social_network")
cur.execute(query, params)
if cur.description:
return cur.fetchall()
return None
finally:
conn.close()
13.7.2 常见错误代码
| 错误代码 | 说明 | 处理策略 |
|---|
08001 | 连接失败 | 检查网络和服务状态 |
08003 | 连接不存在 | 重新建立连接 |
08006 | 连接关闭 | 重新连接 |
23505 | 唯一约束违反 | 使用 MERGE 或更新 |
42601 | SQL 语法错误 | 检查查询语句 |
42P01 | 关系不存在 | 检查表/图名称 |
57014 | 查询取消 | 查询超时 |
13.8 本章小结
| 要点 | 说明 |
|---|
| Python | psycopg2,支持连接池和批量操作 |
| Java | JDBC + HikariCP,适合企业应用 |
| JavaScript | node-postgres (pg),适合 Web 应用 |
| 连接池 | 大小 = (CPU × 2) + 磁盘数 |
| 错误处理 | 重试机制 + 异常捕获 |
| 批量操作 | UNWIND + 参数化查询 |
13.9 练习
- 使用 Python 实现一个简单的图数据库 CRUD 封装类。
- 为 Node.js Express 应用添加完整的 AgensGraph 中间件。
- 实现一个带有重试逻辑和连接池的 Java 服务类。
- 编写一个批量数据导入脚本,比较逐条插入与批量插入的性能差异。
13.10 扩展阅读