Julia 教程 / 数据库操作
数据库操作
数据持久化是大多数应用的核心需求。Julia 提供了多种数据库驱动和 ORM,支持 SQLite、PostgreSQL、MySQL 以及通用 ODBC 连接。本文介绍最常用的数据库操作方式。
1. SQLite.jl — SQLite 操作
1.1 安装与连接
using Pkg
Pkg.add("SQLite")
using SQLite
# 创建/打开数据库
db = SQLite.DB("mydata.db")
# 或内存数据库
db = SQLite.DB(":memory:")
# 关闭
close(db)
1.2 创建表与插入数据
db = SQLite.DB("mydata.db")
# 创建表
SQLite.execute(db, """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入单条数据
SQLite.execute(db, """
INSERT INTO users (name, email, age) VALUES (?, ?, ?)
""", ["Alice", "[email protected]", 25])
# 批量插入
names = ["Bob", "Charlie", "Diana"]
emails = ["[email protected]", "[email protected]", "[email protected]"]
ages = [30, 35, 28]
for (name, email, age) in zip(names, emails, ages)
SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
[name, email, age])
end
1.3 查询数据
# 查询返回 DataFrame
using DataFrames
result = SQLite.execute(db, "SELECT * FROM users WHERE age > ?", [25])
df = DataFrame(result)
println(df)
# 查询单条
result = SQLite.execute(db, "SELECT name FROM users WHERE id = ?", [1])
1.4 事务处理
# 使用事务批量操作
SQLite.transaction(db) do
for i in 1:1000
SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
["User $i", "user$i@example.com", rand(18:60)])
end
end # 自动提交;异常时自动回滚
2. LibPQ.jl — PostgreSQL
2.1 安装与连接
using Pkg
Pkg.add("LibPQ")
using LibPQ
# 连接 PostgreSQL
conn = LibPQ.Connection("host=localhost dbname=mydb user=postgres password=secret")
# 或使用 URL
conn = LibPQ.Connection("postgresql://postgres:secret@localhost/mydb")
# 关闭
close(conn)
2.2 基本操作
# 创建表
execute(conn, """
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10, 2),
category VARCHAR(50),
in_stock BOOLEAN DEFAULT true
)
""")
# 插入数据(参数化查询,防 SQL 注入)
execute(conn, """
INSERT INTO products (name, price, category) VALUES (\$1, \$2, \$3)
""", ["Laptop", 999.99, "Electronics"])
# 查询
result = execute(conn, "SELECT * FROM products WHERE category = \$1", ["Electronics"])
df = DataFrame(result)
2.3 使用连接池
using ConnectionPools
# 创建连接池
pool = ConnectionPool{LibPQ.Connection}(
() -> LibPQ.Connection("host=localhost dbname=mydb"),
5, # 最小连接数
20 # 最大连接数
)
# 从池中获取连接
conn = acquire(pool)
try
result = execute(conn, "SELECT * FROM products")
finally
release!(pool, conn)
end
3. ODBC.jl — 通用 ODBC 连接
3.1 配置
using Pkg
Pkg.add("ODBC")
using ODBC
# 列出可用驱动
ODBC.drivers()
# 列出数据源
ODBC.dsns()
3.2 连接与查询
# 连接
dsn = ODBC.DSN("MyDatabase", "username", "password")
# 或使用连接字符串
dsn = ODBC.DSN("Driver={PostgreSQL};Server=localhost;Database=mydb;")
# 查询
result = ODBC.query(dsn, "SELECT * FROM users LIMIT 10")
df = DataFrame(result)
# 参数化查询
result = ODBC.query(dsn, "SELECT * FROM users WHERE age > ?", [30])
3.3 支持的数据库
| 数据库 | 驱动 | 连接字符串示例 |
|---|---|---|
| PostgreSQL | psqlODBC | Driver={PostgreSQL};Server=localhost;Database=mydb; |
| MySQL | MySQL ODBC | Driver={MySQL};Server=localhost;Database=mydb; |
| SQL Server | ODBC Driver 17 | Driver={ODBC Driver 17 for SQL Server};Server=.;Database=mydb; |
| SQLite | SQLite3 ODBC | Driver={SQLite3 ODBC};Database=mydb.db; |
4. MySQL.jl
using Pkg
Pkg.add("MySQL")
using MySQL
# 连接
conn = MySQL.connect("localhost", "root", "password", db="mydb")
# 查询
result = MySQL.query(conn, "SELECT * FROM users")
df = DataFrame(result)
# 插入
MySQL.execute!(conn, "INSERT INTO users (name, age) VALUES (?, ?)", ["Alice", 25])
# 关闭
MySQL.disconnect(conn)
5. SearchLight.jl — ORM
5.1 定义模型
using SearchLight
Base.@kwdef mutable struct Article <: AbstractModel
id::DbId = DbId()
title::String = ""
content::String = ""
views::Int = 0
published::Bool = false
end
5.2 迁移
# db/migrations/001_create_articles.jl
module CreateArticles
using SearchLight.Migration
function up()
create_table(:articles) do
[
column(:id, :integer, primary_key=true, autoincrement=true)
column(:title, :string, not_null=true, limit=200)
column(:content, :text)
column(:views, :integer, default=0)
column(:published, :boolean, default=false)
timestamps()
]
end
add_index(:articles, :title, unique=true)
end
function down()
drop_table(:articles)
end
end
# 执行迁移
SearchLight.Migration.up()
5.3 CRUD 操作
# 创建
article = Article(title="Julia 入门", content="这是内容...")
save!(article)
# 查询
all_articles = all(Article)
article = findone(Article, title="Julia 入门")
recent = find(Article, SQLWhereExpression("views > ?", [100]))
# 更新
article.views += 1
save!(article)
# 删除
delete!(article)
# 链式查询(如果支持)
# articles = Article |> where(:published, "=", true) |> order(:views, :desc) |> limit(10)
6. 连接池管理
6.1 手动实现连接池
mutable struct ConnectionPool{T}
factory::Function
pool::Vector{T}
lock::ReentrantLock
max_size::Int
end
function ConnectionPool{T}(factory::Function, max_size::Int=10) where T
pool = T[]
ConnectionPool{T}(factory, pool, ReentrantLock(), max_size)
end
function acquire!(pool::ConnectionPool{T}) where T
lock(pool.lock) do
if !isempty(pool.pool)
return pop!(pool.pool)
end
end
return pool.factory()
end
function release!(pool::ConnectionPool{T}, conn::T) where T
lock(pool.lock) do
if length(pool.pool) < pool.max_size
push!(pool.pool, conn)
else
close(conn)
end
end
end
# 使用
pool = ConnectionPool{SQLite.DB}(() -> SQLite.DB("mydata.db"), 5)
conn = acquire!(pool)
try
SQLite.execute(conn, "SELECT 1")
finally
release!(pool, conn)
end
6.2 使用模式
# do 语法自动管理连接
function with_connection(f, pool::ConnectionPool)
conn = acquire!(pool)
try
f(conn)
finally
release!(pool, conn)
end
end
# 使用
with_connection(pool) do conn
result = SQLite.execute(conn, "SELECT * FROM users")
DataFrame(result)
end
7. 事务处理
7.1 SQLite 事务
function transfer_funds(db, from_id, to_id, amount)
SQLite.transaction(db) do
# 检查余额
balance = SQLite.execute(db,
"SELECT balance FROM accounts WHERE id = ?", [from_id])
if balance < amount
error("余额不足")
end
# 扣款
SQLite.execute(db,
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
[amount, from_id])
# 入账
SQLite.execute(db,
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
[amount, to_id])
end
# 事务自动提交;任何异常自动回滚
end
7.2 PostgreSQL 事务
function batch_insert(conn, records)
execute(conn, "BEGIN")
try
for record in records
execute(conn, "INSERT INTO logs (message) VALUES (\$1)", [record])
end
execute(conn, "COMMIT")
catch e
execute(conn, "ROLLBACK")
rethrow(e)
end
end
8. SQL 查询构建
8.1 原生 SQL(推荐简单场景)
# 参数化查询 — 防止 SQL 注入
function search_users(db, name_pattern, min_age)
sql = """
SELECT id, name, email, age
FROM users
WHERE name LIKE ? AND age >= ?
ORDER BY name
LIMIT 100
"""
DataFrame(SQLite.execute(db, sql, [name_pattern, min_age]))
end
8.2 查询构建器模式
# 简单的查询构建器
mutable struct QueryBuilder
table::String
conditions::Vector{String}
params::Vector{Any}
order::String
limit_val::Int
end
QueryBuilder(table::String) = QueryBuilder(table, String[], [], "", 0)
function where!(qb::QueryBuilder, condition::String, params...)
push!(qb.conditions, condition)
append!(qb.params, collect(params))
return qb
end
function order_by!(qb::QueryBuilder, col::String, dir::String="ASC")
qb.order = "$col $dir"
return qb
end
function limit!(qb::QueryBuilder, n::Int)
qb.limit_val = n
return qb
end
function build(qb::QueryBuilder)
sql = "SELECT * FROM $(qb.table)"
if !isempty(qb.conditions)
sql *= " WHERE " * join(qb.conditions, " AND ")
end
if !isempty(qb.order)
sql *= " ORDER BY $(qb.order)"
end
if qb.limit_val > 0
sql *= " LIMIT $(qb.limit_val)"
end
return sql, qb.params
end
# 使用
qb = QueryBuilder("users")
where!(qb, "age > ?", 18)
where!(qb, "active = ?", true)
order_by!(qb, "name")
limit!(qb, 50)
sql, params = build(qb)
println(sql) # SELECT * FROM users WHERE age > ? AND active = ? ORDER BY name LIMIT 50
9. 数据库设计模式
9.1 Repository 模式
abstract type AbstractRepository end
struct UserRepository <: AbstractRepository
db::SQLite.DB
end
function find_by_id(repo::UserRepository, id::Int)
result = SQLite.execute(repo.db, "SELECT * FROM users WHERE id = ?", [id])
df = DataFrame(result)
isempty(df) ? nothing : first(df)
end
function find_all(repo::UserRepository; limit=100, offset=0)
SQLite.execute(repo.db, "SELECT * FROM users LIMIT ? OFFSET ?", [limit, offset])
DataFrame(result)
end
function save(repo::UserRepository, user::NamedTuple)
SQLite.execute(repo.db,
"INSERT OR REPLACE INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
[user.id, user.name, user.email, user.age])
end
# 使用
repo = UserRepository(db)
user = find_by_id(repo, 1)
9.2 Unit of Work 模式
mutable struct UnitOfWork
db::SQLite.DB
pending_inserts::Vector{Tuple{String, Vector{Any}}}
pending_updates::Vector{Tuple{String, Vector{Any}}}
pending_deletes::Vector{Tuple{String, Vector{Any}}}
end
UnitOfWork(db) = UnitOfWork(db, [], [], [])
function register_insert!(uow::UnitOfWork, table::String, data::Vector{Any})
cols = join(keys(data), ", ")
placeholders = join(["?" for _ in data], ", ")
push!(uow.pending_inserts, ("INSERT INTO $table ($cols) VALUES ($placeholders)", collect(values(data))))
end
function commit!(uow::UnitOfWork)
SQLite.transaction(uow.db) do
for (sql, params) in uow.pending_inserts
SQLite.execute(uow.db, sql, params)
end
for (sql, params) in uow.pending_updates
SQLite.execute(uow.db, sql, params)
end
for (sql, params) in uow.pending_deletes
SQLite.execute(uow.db, sql, params)
end
end
empty!(uow.pending_inserts)
empty!(uow.pending_updates)
empty!(uow.pending_deletes)
end
10. 实际案例:数据采集与存储系统
module DataCollector
using SQLite, DataFrames, Dates
struct Collector
db::SQLite.DB
end
function Collector(db_path::String="collector.db")
db = SQLite.DB(db_path)
SQLite.execute(db, """
CREATE TABLE IF NOT EXISTS readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_sensor ON readings(sensor_id)")
SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_ts ON readings(timestamp)")
Collector(db)
end
function record!(c::Collector, sensor_id::String, value::Float64, unit::String="")
SQLite.execute(c.db,
"INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
[sensor_id, value, unit])
end
function batch_record!(c::Collector, records::Vector{NamedTuple})
SQLite.transaction(c.db) do
for r in records
SQLite.execute(c.db,
"INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
[r.sensor_id, r.value, r.unit])
end
end
end
function query_range(c::Collector, sensor_id::String, from::DateTime, to::DateTime)
SQLite.execute(c.db, """
SELECT * FROM readings
WHERE sensor_id = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""", [sensor_id, string(from), string(to)])
DataFrame(result)
end
function statistics(c::Collector, sensor_id::String)
SQLite.execute(c.db, """
SELECT
COUNT(*) as count,
AVG(value) as mean,
MIN(value) as min,
MAX(value) as max
FROM readings WHERE sensor_id = ?
""", [sensor_id])
DataFrame(result)
end
end
# 使用
using .DataCollector, Dates
collector = Collector("sensors.db")
DataCollector.record!(collector, "temp_01", 23.5, "°C")
DataCollector.record!(collector, "temp_01", 24.1, "°C")
DataCollector.record!(collector, "humidity_01", 65.2, "%")
stats = DataCollector.statistics(collector, "temp_01")
println(stats)
各数据库驱动对比
| 驱动 | 数据库 | 特点 | 推荐场景 |
|---|---|---|---|
| SQLite.jl | SQLite | 轻量、无需服务器 | 嵌入式、原型开发 |
| LibPQ.jl | PostgreSQL | 功能完整、连接池 | 生产环境、大数据 |
| MySQL.jl | MySQL | 广泛使用 | Web 应用 |
| ODBC.jl | 任意 | 通用接口 | 遗留系统集成 |
| SearchLight.jl | 多种 | ORM 抽象 | Genie.jl 项目 |
业务场景
场景一:IoT 数据存储
使用 SQLite.jl 存储传感器数据,批量写入+事务保证性能。定期查询统计,导出 CSV 给分析团队。
场景二:Web 应用后端
使用 PostgreSQL + LibPQ.jl 配合 Genie.jl 构建 Web 应用。连接池管理数据库连接,事务保证数据一致性。
场景三:数据迁移工具
编写 Julia 脚本从旧数据库(ODBC)读取数据,清洗后写入新数据库(PostgreSQL)。利用 Julia 的数据处理能力和多驱动支持。
总结
| 主题 | 关键要点 |
|---|---|
| SQLite.jl | 轻量嵌入式,SQLite.execute(db, sql, params) |
| LibPQ.jl | PostgreSQL 专业驱动 |
| ODBC.jl | 通用接口,支持多种数据库 |
| SearchLight.jl | ORM,与 Genie.jl 集成 |
| 事务 | SQLite.transaction(db) do ... end |
| 连接池 | 手动实现或使用 ConnectionPools.jl |
| 安全 | 始终使用参数化查询,防止 SQL 注入 |