强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Julia 教程 / 数据库操作

数据库操作

数据持久化是大多数应用的核心需求。Julia 提供了多种数据库驱动和 ORM,支持 SQLite、PostgreSQL、MySQL 以及通用 ODBC 连接。本文介绍最常用的数据库操作方式。


1. SQLite.jl — SQLite 操作

1.1 安装与连接

using Pkg
Pkg.add("SQLite")

using SQLite

# 创建/打开数据库
db = SQLite.DB("mydata.db")

# 或内存数据库
db = SQLite.DB(":memory:")

# 关闭
close(db)

1.2 创建表与插入数据

db = SQLite.DB("mydata.db")

# 创建表
SQLite.execute(db, """
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        age INTEGER DEFAULT 0,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP
    )
""")

# 插入单条数据
SQLite.execute(db, """
    INSERT INTO users (name, email, age) VALUES (?, ?, ?)
""", ["Alice", "[email protected]", 25])

# 批量插入
names = ["Bob", "Charlie", "Diana"]
emails = ["[email protected]", "[email protected]", "[email protected]"]
ages = [30, 35, 28]

for (name, email, age) in zip(names, emails, ages)
    SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
                   [name, email, age])
end

1.3 查询数据

# 查询返回 DataFrame
using DataFrames

result = SQLite.execute(db, "SELECT * FROM users WHERE age > ?", [25])
df = DataFrame(result)
println(df)

# 查询单条
result = SQLite.execute(db, "SELECT name FROM users WHERE id = ?", [1])

1.4 事务处理

# 使用事务批量操作
SQLite.transaction(db) do
    for i in 1:1000
        SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
                       ["User $i", "user$i@example.com", rand(18:60)])
    end
end  # 自动提交;异常时自动回滚

2. LibPQ.jl — PostgreSQL

2.1 安装与连接

using Pkg
Pkg.add("LibPQ")

using LibPQ

# 连接 PostgreSQL
conn = LibPQ.Connection("host=localhost dbname=mydb user=postgres password=secret")

# 或使用 URL
conn = LibPQ.Connection("postgresql://postgres:secret@localhost/mydb")

# 关闭
close(conn)

2.2 基本操作

# 创建表
execute(conn, """
    CREATE TABLE IF NOT EXISTS products (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        price DECIMAL(10, 2),
        category VARCHAR(50),
        in_stock BOOLEAN DEFAULT true
    )
""")

# 插入数据(参数化查询,防 SQL 注入)
execute(conn, """
    INSERT INTO products (name, price, category) VALUES (\$1, \$2, \$3)
""", ["Laptop", 999.99, "Electronics"])

# 查询
result = execute(conn, "SELECT * FROM products WHERE category = \$1", ["Electronics"])
df = DataFrame(result)

2.3 使用连接池

using ConnectionPools

# 创建连接池
pool = ConnectionPool{LibPQ.Connection}(
    () -> LibPQ.Connection("host=localhost dbname=mydb"),
    5,   # 最小连接数
    20   # 最大连接数
)

# 从池中获取连接
conn = acquire(pool)
try
    result = execute(conn, "SELECT * FROM products")
finally
    release!(pool, conn)
end

3. ODBC.jl — 通用 ODBC 连接

3.1 配置

using Pkg
Pkg.add("ODBC")

using ODBC

# 列出可用驱动
ODBC.drivers()

# 列出数据源
ODBC.dsns()

3.2 连接与查询

# 连接
dsn = ODBC.DSN("MyDatabase", "username", "password")

# 或使用连接字符串
dsn = ODBC.DSN("Driver={PostgreSQL};Server=localhost;Database=mydb;")

# 查询
result = ODBC.query(dsn, "SELECT * FROM users LIMIT 10")
df = DataFrame(result)

# 参数化查询
result = ODBC.query(dsn, "SELECT * FROM users WHERE age > ?", [30])

3.3 支持的数据库

数据库驱动连接字符串示例
PostgreSQLpsqlODBCDriver={PostgreSQL};Server=localhost;Database=mydb;
MySQLMySQL ODBCDriver={MySQL};Server=localhost;Database=mydb;
SQL ServerODBC Driver 17Driver={ODBC Driver 17 for SQL Server};Server=.;Database=mydb;
SQLiteSQLite3 ODBCDriver={SQLite3 ODBC};Database=mydb.db;

4. MySQL.jl

using Pkg
Pkg.add("MySQL")

using MySQL

# 连接
conn = MySQL.connect("localhost", "root", "password", db="mydb")

# 查询
result = MySQL.query(conn, "SELECT * FROM users")
df = DataFrame(result)

# 插入
MySQL.execute!(conn, "INSERT INTO users (name, age) VALUES (?, ?)", ["Alice", 25])

# 关闭
MySQL.disconnect(conn)

5. SearchLight.jl — ORM

5.1 定义模型

using SearchLight

Base.@kwdef mutable struct Article <: AbstractModel
    id::DbId = DbId()
    title::String = ""
    content::String = ""
    views::Int = 0
    published::Bool = false
end

5.2 迁移

# db/migrations/001_create_articles.jl
module CreateArticles

using SearchLight.Migration

function up()
    create_table(:articles) do
        [
            column(:id, :integer, primary_key=true, autoincrement=true)
            column(:title, :string, not_null=true, limit=200)
            column(:content, :text)
            column(:views, :integer, default=0)
            column(:published, :boolean, default=false)
            timestamps()
        ]
    end

    add_index(:articles, :title, unique=true)
end

function down()
    drop_table(:articles)
end

end

# 执行迁移
SearchLight.Migration.up()

5.3 CRUD 操作

# 创建
article = Article(title="Julia 入门", content="这是内容...")
save!(article)

# 查询
all_articles = all(Article)
article = findone(Article, title="Julia 入门")
recent = find(Article, SQLWhereExpression("views > ?", [100]))

# 更新
article.views += 1
save!(article)

# 删除
delete!(article)

# 链式查询(如果支持)
# articles = Article |> where(:published, "=", true) |> order(:views, :desc) |> limit(10)

6. 连接池管理

6.1 手动实现连接池

mutable struct ConnectionPool{T}
    factory::Function
    pool::Vector{T}
    lock::ReentrantLock
    max_size::Int
end

function ConnectionPool{T}(factory::Function, max_size::Int=10) where T
    pool = T[]
    ConnectionPool{T}(factory, pool, ReentrantLock(), max_size)
end

function acquire!(pool::ConnectionPool{T}) where T
    lock(pool.lock) do
        if !isempty(pool.pool)
            return pop!(pool.pool)
        end
    end
    return pool.factory()
end

function release!(pool::ConnectionPool{T}, conn::T) where T
    lock(pool.lock) do
        if length(pool.pool) < pool.max_size
            push!(pool.pool, conn)
        else
            close(conn)
        end
    end
end

# 使用
pool = ConnectionPool{SQLite.DB}(() -> SQLite.DB("mydata.db"), 5)
conn = acquire!(pool)
try
    SQLite.execute(conn, "SELECT 1")
finally
    release!(pool, conn)
end

6.2 使用模式

# do 语法自动管理连接
function with_connection(f, pool::ConnectionPool)
    conn = acquire!(pool)
    try
        f(conn)
    finally
        release!(pool, conn)
    end
end

# 使用
with_connection(pool) do conn
    result = SQLite.execute(conn, "SELECT * FROM users")
    DataFrame(result)
end

7. 事务处理

7.1 SQLite 事务

function transfer_funds(db, from_id, to_id, amount)
    SQLite.transaction(db) do
        # 检查余额
        balance = SQLite.execute(db,
            "SELECT balance FROM accounts WHERE id = ?", [from_id])
        
        if balance < amount
            error("余额不足")
        end
        
        # 扣款
        SQLite.execute(db,
            "UPDATE accounts SET balance = balance - ? WHERE id = ?",
            [amount, from_id])
        
        # 入账
        SQLite.execute(db,
            "UPDATE accounts SET balance = balance + ? WHERE id = ?",
            [amount, to_id])
    end
    # 事务自动提交;任何异常自动回滚
end

7.2 PostgreSQL 事务

function batch_insert(conn, records)
    execute(conn, "BEGIN")
    try
        for record in records
            execute(conn, "INSERT INTO logs (message) VALUES (\$1)", [record])
        end
        execute(conn, "COMMIT")
    catch e
        execute(conn, "ROLLBACK")
        rethrow(e)
    end
end

8. SQL 查询构建

8.1 原生 SQL(推荐简单场景)

# 参数化查询 — 防止 SQL 注入
function search_users(db, name_pattern, min_age)
    sql = """
        SELECT id, name, email, age
        FROM users
        WHERE name LIKE ? AND age >= ?
        ORDER BY name
        LIMIT 100
    """
    DataFrame(SQLite.execute(db, sql, [name_pattern, min_age]))
end

8.2 查询构建器模式

# 简单的查询构建器
mutable struct QueryBuilder
    table::String
    conditions::Vector{String}
    params::Vector{Any}
    order::String
    limit_val::Int
end

QueryBuilder(table::String) = QueryBuilder(table, String[], [], "", 0)

function where!(qb::QueryBuilder, condition::String, params...)
    push!(qb.conditions, condition)
    append!(qb.params, collect(params))
    return qb
end

function order_by!(qb::QueryBuilder, col::String, dir::String="ASC")
    qb.order = "$col $dir"
    return qb
end

function limit!(qb::QueryBuilder, n::Int)
    qb.limit_val = n
    return qb
end

function build(qb::QueryBuilder)
    sql = "SELECT * FROM $(qb.table)"
    if !isempty(qb.conditions)
        sql *= " WHERE " * join(qb.conditions, " AND ")
    end
    if !isempty(qb.order)
        sql *= " ORDER BY $(qb.order)"
    end
    if qb.limit_val > 0
        sql *= " LIMIT $(qb.limit_val)"
    end
    return sql, qb.params
end

# 使用
qb = QueryBuilder("users")
where!(qb, "age > ?", 18)
where!(qb, "active = ?", true)
order_by!(qb, "name")
limit!(qb, 50)

sql, params = build(qb)
println(sql)  # SELECT * FROM users WHERE age > ? AND active = ? ORDER BY name LIMIT 50

9. 数据库设计模式

9.1 Repository 模式

abstract type AbstractRepository end

struct UserRepository <: AbstractRepository
    db::SQLite.DB
end

function find_by_id(repo::UserRepository, id::Int)
    result = SQLite.execute(repo.db, "SELECT * FROM users WHERE id = ?", [id])
    df = DataFrame(result)
    isempty(df) ? nothing : first(df)
end

function find_all(repo::UserRepository; limit=100, offset=0)
    SQLite.execute(repo.db, "SELECT * FROM users LIMIT ? OFFSET ?", [limit, offset])
    DataFrame(result)
end

function save(repo::UserRepository, user::NamedTuple)
    SQLite.execute(repo.db,
        "INSERT OR REPLACE INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
        [user.id, user.name, user.email, user.age])
end

# 使用
repo = UserRepository(db)
user = find_by_id(repo, 1)

9.2 Unit of Work 模式

mutable struct UnitOfWork
    db::SQLite.DB
    pending_inserts::Vector{Tuple{String, Vector{Any}}}
    pending_updates::Vector{Tuple{String, Vector{Any}}}
    pending_deletes::Vector{Tuple{String, Vector{Any}}}
end

UnitOfWork(db) = UnitOfWork(db, [], [], [])

function register_insert!(uow::UnitOfWork, table::String, data::Vector{Any})
    cols = join(keys(data), ", ")
    placeholders = join(["?" for _ in data], ", ")
    push!(uow.pending_inserts, ("INSERT INTO $table ($cols) VALUES ($placeholders)", collect(values(data))))
end

function commit!(uow::UnitOfWork)
    SQLite.transaction(uow.db) do
        for (sql, params) in uow.pending_inserts
            SQLite.execute(uow.db, sql, params)
        end
        for (sql, params) in uow.pending_updates
            SQLite.execute(uow.db, sql, params)
        end
        for (sql, params) in uow.pending_deletes
            SQLite.execute(uow.db, sql, params)
        end
    end
    empty!(uow.pending_inserts)
    empty!(uow.pending_updates)
    empty!(uow.pending_deletes)
end

10. 实际案例:数据采集与存储系统

module DataCollector

using SQLite, DataFrames, Dates

struct Collector
    db::SQLite.DB
end

function Collector(db_path::String="collector.db")
    db = SQLite.DB(db_path)
    SQLite.execute(db, """
        CREATE TABLE IF NOT EXISTS readings (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            sensor_id TEXT NOT NULL,
            value REAL NOT NULL,
            unit TEXT,
            timestamp TEXT DEFAULT CURRENT_TIMESTAMP
        )
    """)
    SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_sensor ON readings(sensor_id)")
    SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_ts ON readings(timestamp)")
    Collector(db)
end

function record!(c::Collector, sensor_id::String, value::Float64, unit::String="")
    SQLite.execute(c.db,
        "INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
        [sensor_id, value, unit])
end

function batch_record!(c::Collector, records::Vector{NamedTuple})
    SQLite.transaction(c.db) do
        for r in records
            SQLite.execute(c.db,
                "INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
                [r.sensor_id, r.value, r.unit])
        end
    end
end

function query_range(c::Collector, sensor_id::String, from::DateTime, to::DateTime)
    SQLite.execute(c.db, """
        SELECT * FROM readings
        WHERE sensor_id = ? AND timestamp BETWEEN ? AND ?
        ORDER BY timestamp
    """, [sensor_id, string(from), string(to)])
    DataFrame(result)
end

function statistics(c::Collector, sensor_id::String)
    SQLite.execute(c.db, """
        SELECT
            COUNT(*) as count,
            AVG(value) as mean,
            MIN(value) as min,
            MAX(value) as max
        FROM readings WHERE sensor_id = ?
    """, [sensor_id])
    DataFrame(result)
end

end

# 使用
using .DataCollector, Dates
collector = Collector("sensors.db")

DataCollector.record!(collector, "temp_01", 23.5, "°C")
DataCollector.record!(collector, "temp_01", 24.1, "°C")
DataCollector.record!(collector, "humidity_01", 65.2, "%")

stats = DataCollector.statistics(collector, "temp_01")
println(stats)

各数据库驱动对比

驱动数据库特点推荐场景
SQLite.jlSQLite轻量、无需服务器嵌入式、原型开发
LibPQ.jlPostgreSQL功能完整、连接池生产环境、大数据
MySQL.jlMySQL广泛使用Web 应用
ODBC.jl任意通用接口遗留系统集成
SearchLight.jl多种ORM 抽象Genie.jl 项目

业务场景

场景一:IoT 数据存储

使用 SQLite.jl 存储传感器数据,批量写入+事务保证性能。定期查询统计,导出 CSV 给分析团队。

场景二:Web 应用后端

使用 PostgreSQL + LibPQ.jl 配合 Genie.jl 构建 Web 应用。连接池管理数据库连接,事务保证数据一致性。

场景三:数据迁移工具

编写 Julia 脚本从旧数据库(ODBC)读取数据,清洗后写入新数据库(PostgreSQL)。利用 Julia 的数据处理能力和多驱动支持。


总结

主题关键要点
SQLite.jl轻量嵌入式,SQLite.execute(db, sql, params)
LibPQ.jlPostgreSQL 专业驱动
ODBC.jl通用接口,支持多种数据库
SearchLight.jlORM,与 Genie.jl 集成
事务SQLite.transaction(db) do ... end
连接池手动实现或使用 ConnectionPools.jl
安全始终使用参数化查询,防止 SQL 注入

扩展阅读