11 - 函数与过程
第 11 章 · 函数与过程
PostgreSQL 支持多种过程语言编写存储过程和函数。PL/pgSQL 是最常用的过程语言。
11.1 SQL 函数 vs 过程语言函数
-- SQL 函数(简单)
CREATE FUNCTION add(a INT, b INT) RETURNS INT AS $$
SELECT a + b;
$$ LANGUAGE SQL;
SELECT add(3, 5); -- 8
-- PL/pgSQL 函数(复杂逻辑)
CREATE FUNCTION get_employee(p_id INT) RETURNS TEXT AS $$
DECLARE
v_name TEXT;
BEGIN
SELECT name INTO v_name FROM employees WHERE id = p_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'Employee % not found', p_id;
END IF;
RETURN v_name;
END;
$$ LANGUAGE plpgsql;
11.2 PL/pgSQL 基础
函数结构
CREATE FUNCTION function_name(params) RETURNS return_type AS $$
DECLARE
-- 变量声明
variable_name type := default_value;
BEGIN
-- 函数体
RETURN value;
EXCEPTION
-- 异常处理
WHEN condition THEN
-- 处理代码
END;
$$ LANGUAGE plpgsql;
变量和参数
CREATE FUNCTION transfer_funds(
p_from_id INT,
p_to_id INT,
p_amount NUMERIC
) RETURNS VOID AS $$
DECLARE
v_from_balance NUMERIC;
BEGIN
-- 获取源账户余额
SELECT balance INTO STRICT v_from_balance
FROM accounts WHERE id = p_from_id FOR UPDATE;
IF v_from_balance < p_amount THEN
RAISE EXCEPTION 'Insufficient funds: have %, need %', v_from_balance, p_amount;
END IF;
UPDATE accounts SET balance = balance - p_amount WHERE id = p_from_id;
UPDATE accounts SET balance = balance + p_amount WHERE id = p_to_id;
END;
$$ LANGUAGE plpgsql;
-- 调用
SELECT transfer_funds(1, 2, 100.00);
控制流
-- IF/ELSIF/ELSE
CREATE FUNCTION classify_salary(p_salary NUMERIC) RETURNS TEXT AS $$
BEGIN
IF p_salary >= 20000 THEN
RETURN 'Senior';
ELSIF p_salary >= 15000 THEN
RETURN 'Middle';
ELSIF p_salary >= 10000 THEN
RETURN 'Junior';
ELSE
RETURN 'Intern';
END IF;
END;
$$ LANGUAGE plpgsql;
-- CASE
CREATE FUNCTION status_text(p_status INT) RETURNS TEXT AS $$
BEGIN
RETURN CASE p_status
WHEN 0 THEN 'Pending'
WHEN 1 THEN 'Paid'
WHEN 2 THEN 'Shipped'
WHEN 3 THEN 'Completed'
ELSE 'Unknown'
END;
END;
$$ LANGUAGE plpgsql;
-- LOOP
CREATE FUNCTION factorial(n INT) RETURNS BIGINT AS $$
DECLARE
result BIGINT := 1;
i INT := 1;
BEGIN
LOOP
EXIT WHEN i > n;
result := result * i;
i := i + 1;
END LOOP;
RETURN result;
END;
$$ LANGUAGE plpgsql;
-- FOR 循环
CREATE FUNCTION sum_range(p_from INT, p_to INT) RETURNS INT AS $$
DECLARE
total INT := 0;
BEGIN
FOR i IN p_from..p_to LOOP
total := total + i;
END LOOP;
RETURN total;
END;
$$ LANGUAGE plpgsql;
-- FOR ... IN QUERY
CREATE FUNCTION count_by_dept() RETURNS TABLE(dept TEXT, cnt BIGINT) AS $$
DECLARE
rec RECORD;
BEGIN
FOR rec IN SELECT department, COUNT(*) AS c FROM employees GROUP BY department
LOOP
dept := rec.department;
cnt := rec.c;
RETURN NEXT;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- WHILE
CREATE FUNCTION fibonacci(n INT) RETURNS TABLE(idx INT, val INT) AS $$
DECLARE
a INT := 0;
b INT := 1;
temp INT;
BEGIN
FOR i IN 1..n LOOP
idx := i;
val := a;
RETURN NEXT;
temp := a + b;
a := b;
b := temp;
END LOOP;
END;
$$ LANGUAGE plpgsql;
游标(Cursor)
CREATE FUNCTION process_employees() RETURNS VOID AS $$
DECLARE
cur CURSOR FOR SELECT id, name, salary FROM employees WHERE is_active;
rec RECORD;
BEGIN
OPEN cur;
LOOP
FETCH cur INTO rec;
EXIT WHEN NOT FOUND;
-- 处理每行
RAISE NOTICE 'Processing: % - % ($%)', rec.id, rec.name, rec.salary;
END LOOP;
CLOSE cur;
END;
$$ LANGUAGE plpgsql;
异常处理
CREATE FUNCTION safe_insert(p_name TEXT, p_email TEXT) RETURNS TEXT AS $$
BEGIN
INSERT INTO employees (name, email, age, salary, department, hire_date)
VALUES (p_name, p_email, 25, 8000, '新人部', CURRENT_DATE);
RETURN 'OK';
EXCEPTION
WHEN unique_violation THEN
RETURN 'Duplicate email: ' || p_email;
WHEN foreign_key_violation THEN
RETURN 'FK violation';
WHEN OTHERS THEN
RETURN 'Error: ' || SQLERRM;
END;
$$ LANGUAGE plpgsql;
11.3 触发器(Trigger)
触发器类型
| 类型 | 时机 | 级别 |
|---|---|---|
| BEFORE INSERT | 插入前 | 行级/语句级 |
| AFTER INSERT | 插入后 | 行级/语句级 |
| BEFORE UPDATE | 更新前 | 行级/语句级 |
| AFTER UPDATE | 更新后 | 行级/语句级 |
| BEFORE DELETE | 删除前 | 行级/语句级 |
| AFTER DELETE | 删除后 | 行级/语句级 |
| INSTEAD OF | 替代操作 | 行级(仅视图) |
实战示例
-- 自动更新 updated_at
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_update_timestamp
BEFORE UPDATE ON employees
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
-- 审计日志
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name TEXT,
operation TEXT,
old_data JSONB,
new_data JSONB,
changed_by TEXT DEFAULT current_user,
changed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO audit_log (table_name, operation, new_data)
VALUES (TG_TABLE_NAME, 'INSERT', to_jsonb(NEW));
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO audit_log (table_name, operation, old_data, new_data)
VALUES (TG_TABLE_NAME, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW));
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO audit_log (table_name, operation, old_data)
VALUES (TG_TABLE_NAME, 'DELETE', to_jsonb(OLD));
END IF;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_audit_employees
AFTER INSERT OR UPDATE OR DELETE ON employees
FOR EACH ROW
EXECUTE FUNCTION audit_trigger();
-- 条件触发器
CREATE TRIGGER trg_check_salary
BEFORE UPDATE ON employees
FOR EACH ROW
WHEN (NEW.salary < 0)
EXECUTE FUNCTION raise_error();
-- 约束触发器(可延迟)
CREATE CONSTRAINT TRIGGER trg_check_order_items
AFTER INSERT ON order_items
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION validate_order_item();
11.4 事件触发器(Event Trigger)
-- 事件触发器在 DDL 事件上触发
CREATE OR REPLACE FUNCTION log_ddl_event()
RETURNS event_trigger AS $$
DECLARE
obj RECORD;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
LOOP
RAISE NOTICE 'DDL: % on %', obj.command_tag, obj.object_identity;
END LOOP;
END;
$$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER trg_log_ddl
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE', 'DROP TABLE')
EXECUTE FUNCTION log_ddl_event();
-- 禁止特定 DDL
CREATE OR REPLACE FUNCTION prevent_drop_table()
RETURNS event_trigger AS $$
BEGIN
RAISE EXCEPTION 'DROP TABLE is not allowed in production!';
END;
$$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER trg_prevent_drop
ON sql_drop
WHEN TAG IN ('DROP TABLE')
EXECUTE FUNCTION prevent_drop_table();
11.5 RETURN TABLE 和 SETOF
-- 返回多行
CREATE FUNCTION get_top_employees(p_dept TEXT, p_limit INT)
RETURNS TABLE(name TEXT, salary NUMERIC) AS $$
BEGIN
RETURN QUERY
SELECT e.name, e.salary
FROM employees e
WHERE e.department = p_dept
ORDER BY e.salary DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM get_top_employees('工程部', 5);
11.6 动态 SQL
CREATE FUNCTION dynamic_query(p_table TEXT, p_column TEXT, p_value TEXT)
RETURNS SETOF RECORD AS $$
DECLARE
query TEXT;
BEGIN
query := format('SELECT * FROM %I WHERE %I = %L', p_table, p_column, p_value);
RETURN QUERY EXECUTE query;
END;
$$ LANGUAGE plpgsql;
-- EXECUTE 的 USING 参数(防注入)
CREATE FUNCTION safe_search(p_table TEXT, p_term TEXT)
RETURNS SETOF RECORD AS $$
BEGIN
RETURN QUERY EXECUTE
format('SELECT * FROM %I WHERE name ILIKE $1', p_table)
USING '%' || p_term || '%';
END;
$$ LANGUAGE plpgsql;
⚠️ 注意事项:动态 SQL 必须使用 format() 的 %I 或 USING 参数防止 SQL 注入。绝对不要用字符串拼接!
业务场景
| 场景 | 推荐方案 |
|---|---|
| 自动更新时间戳 | BEFORE UPDATE 触发器 |
| 数据审计 | AFTER INSERT/UPDATE/DELETE 触发器 |
| 复杂业务逻辑 | PL/pgSQL 存储过程 |
| 数据校验 | 约束触发器 |
| DDL 审计/防护 | 事件触发器 |
| 定时任务 | pg_cron 扩展 |