LevelDB 完全指南 / 第 5 章 · 批量写入
第 5 章 · 批量写入
5.1 为什么需要 WriteBatch
单个 Put 操作是原子的,但当你需要同时修改多个 Key 时,单独的 Put 无法保证一致性:
问题场景:转账
账户 A = 100 元,账户 B = 0 元
目标:A → B 转 50 元
操作 1: Put("account:A", "50") ← 成功
← 此时崩溃!
操作 2: Put("account:B", "50") ← 未执行
结果:50 元凭空消失!
WriteBatch 解决了这个问题——将多个操作打包成原子单元。
5.2 WriteBatch 基本用法
C++ 示例
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::DB::Open(options, "/tmp/testdb", &db);
// 创建 WriteBatch
leveldb::WriteBatch batch;
// 添加操作到批次
batch.Put("user:1001:name", "张三");
batch.Put("user:1001:email", "[email protected]");
batch.Put("user:1001:age", "30");
batch.Delete("user:1001:old_field");
// 原子提交
leveldb::Status s = db->Write(leveldb::WriteOptions(), &batch);
if (!s.ok()) {
std::cerr << "WriteBatch failed: " << s.ToString() << std::endl;
}
delete db;
Go 示例
batch := new(leveldb.Batch)
batch.Put([]byte("user:1001:name"), []byte("张三"))
batch.Put([]byte("user:1001:email"), []byte("[email protected]"))
batch.Delete([]byte("user:1001:old_field"))
err := db.Write(batch, nil)
if err != nil {
log.Fatal(err)
}
Python 示例
with db.write_batch() as wb:
wb.put(b'user:1001:name', '张三'.encode())
wb.put(b'user:1001:email', b'[email protected]')
wb.delete(b'user:1001:old_field')
# 退出 with 块时自动提交
5.3 WriteBatch 的内部机制
写入流程
db.Write(WriteOptions, &batch)
│
▼
┌────────────────────────────────────┐
│ 1. 加写锁 │
│ 2. 序列化 batch 中所有操作 │
│ 3. 将整个 batch 写入 WAL(单条记录)│
│ 4. 将 batch 中每个操作逐个插入 MemTable │
│ 5. 释放写锁 │
└────────────────────────────────────┘
WAL 格式
WriteBatch 序列化后的格式:
┌─────────────────┬───────────────────────────────────┐
│ count (varint) │ sequence of Put/Delete records │
│ │ │
│ record 1: [type=Put][key_len][key][val_len][val] │
│ record 2: [type=Put][key_len][key][val_len][val] │
│ record 3: [type=Del][key_len][key] │
│ ... │
└─────────────────┴───────────────────────────────────┘
所有操作共享同一个 Sequence Number。
原子性保证
| 属性 | 说明 |
|---|---|
| 原子性 | 要么全部成功,要么全部失败 |
| 一致性 | 所有操作使用同一个 Sequence Number |
| 持久性 | WAL 保证(取决于 sync 选项) |
| 隔离性 | 写入期间阻塞其他写入,但不阻塞读取 |
⚠️ 注意:WriteBatch 不是事务。它不支持回滚(Rollback),也不提供隔离级别的控制。
5.4 WriteBatch 进阶操作
Clear — 清空批次
leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Clear(); // 清空所有操作
batch.Put("key3", "value3"); // 重新添加
db->Write(wopts, &batch); // 只写入 key3
Append — 合并批次
leveldb::WriteBatch batch1;
batch1.Put("user:1:name", "张三");
batch1.Put("user:1:email", "[email protected]");
leveldb::WriteBatch batch2;
batch2.Put("user:2:name", "李四");
batch2.Put("user:2:email", "[email protected]");
// 将 batch2 的操作追加到 batch1
batch1.Append(batch2);
// batch1 现在包含 4 个操作
db->Write(wopts, &batch1);
Iterate — 遍历批次内容
leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Delete("key3");
// 自定义 Handler 遍历批次
class BatchPrinter : public leveldb::WriteBatch::Handler {
public:
void Put(const leveldb::Slice& key,
const leveldb::Slice& value) override {
std::cout << "PUT: " << key.ToString()
<< " = " << value.ToString() << std::endl;
}
void Delete(const leveldb::Slice& key) override {
std::cout << "DEL: " << key.ToString() << std::endl;
}
};
BatchPrinter printer;
batch.Iterate(&printer);
// 输出:
// PUT: key1 = value1
// PUT: key2 = value2
// DEL: key3
5.5 业务场景
场景一:用户注册(多字段原子写入)
struct User {
int id;
std::string name;
std::string email;
std::string phone;
};
bool RegisterUser(leveldb::DB* db, const User& user) {
leveldb::WriteBatch batch;
std::string prefix = "user:" + std::to_string(user.id) + ":";
// 主记录
batch.Put(prefix + "name", user.name);
batch.Put(prefix + "email", user.email);
batch.Put(prefix + "phone", user.phone);
batch.Put(prefix + "created_at", std::to_string(time(nullptr)));
// 索引记录(支持按邮箱查找)
batch.Put("idx:email:" + user.email, std::to_string(user.id));
batch.Put("idx:phone:" + user.phone, std::to_string(user.id));
leveldb::Status s = db->Write(leveldb::WriteOptions(), &batch);
return s.ok();
}
场景二:计数器批量更新
void UpdateStats(leveldb::DB* db, const std::string& event_type) {
leveldb::WriteBatch batch;
// 读取当前计数
std::string total_key = "stats:total";
std::string event_key = "stats:event:" + event_type;
std::string total_val, event_val;
db->Get(leveldb::ReadOptions(), total_key, &total_val);
db->Get(leveldb::ReadOptions(), event_key, &event_val);
int total = total_val.empty() ? 0 : std::stoi(total_val);
int event_count = event_val.empty() ? 0 : std::stoi(event_val);
// 更新计数
batch.Put(total_key, std::to_string(total + 1));
batch.Put(event_key, std::to_string(event_count + 1));
batch.Put("stats:last_event_at", std::to_string(time(nullptr)));
db->Write(leveldb::WriteOptions(), &batch);
}
场景三:数据迁移(带删除)
void MigrateKey(leveldb::DB* db,
const std::string& old_key,
const std::string& new_key) {
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), old_key, &value);
if (!s.ok()) return;
leveldb::WriteBatch batch;
batch.Put(new_key, value); // 写入新位置
batch.Delete(old_key); // 删除旧位置
db->Write(leveldb::WriteOptions(), &batch);
}
5.6 WriteBatch vs 逐条 Put 对比
| 对比项 | 逐条 Put | WriteBatch |
|---|---|---|
| 原子性 | 每条单独原子 | 整批原子 |
| WAL 写入次数 | N 次 | 1 次 |
| fsync 次数 | N 次(sync=true) | 1 次(sync=true) |
| 性能(大批量) | 较慢 | 快 2-10 倍 |
| 内存占用 | 每条操作独立 | 批次对象占用内存 |
| 适用场景 | 单条写入 | 多条关联写入 |
性能对比代码
#include <chrono>
#include "leveldb/db.h"
void BenchmarkIndividual(leveldb::DB* db, int n) {
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < n; i++) {
std::string key = "bench:" + std::to_string(i);
std::string value(100, 'x');
db->Put(leveldb::WriteOptions(), key, value);
}
auto end = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "逐条 Put " << n << " 次: " << ms.count() << " ms" << std::endl;
}
void BenchmarkBatch(leveldb::DB* db, int n) {
auto start = std::chrono::high_resolution_clock::now();
leveldb::WriteBatch batch;
for (int i = 0; i < n; i++) {
std::string key = "bench:" + std::to_string(i);
std::string value(100, 'x');
batch.Put(key, value);
}
db->Write(leveldb::WriteOptions(), &batch);
auto end = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "WriteBatch " << n << " 次: " << ms.count() << " ms" << std::endl;
}
5.7 注意事项与陷阱
陷阱一:批次太大导致内存暴涨
// ❌ 危险:一次写入 100 万个 key
leveldb::WriteBatch batch;
for (int i = 0; i < 1000000; i++) {
batch.Put("key:" + std::to_string(i), large_value);
}
db->Write(wopts, &batch); // 内存可能暴涨
// ✅ 正确:分批写入
const int BATCH_SIZE = 10000;
for (int start = 0; start < 1000000; start += BATCH_SIZE) {
leveldb::WriteBatch batch;
int end = std::min(start + BATCH_SIZE, 1000000);
for (int i = start; i < end; i++) {
batch.Put("key:" + std::to_string(i), large_value);
}
db->Write(wopts, &batch);
}
陷阱二:WriteBatch 不支持回滚
// ❌ 错误理解:以为 WriteBatch 像数据库事务一样可以回滚
leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
// 没有 rollback 方法!
// ✅ 正确做法:提交前验证,提交后处理错误
if (!ValidateBatch(batch)) {
// 不调用 db->Write 即可
return;
}
db->Write(wopts, &batch);
陷阱三:WriteBatch 内部的读写
// ❌ 不能在 WriteBatch 内部读取刚刚写入的值
leveldb::WriteBatch batch;
batch.Put("key1", "value1");
std::string value;
db->Get(ropts, "key1", &value); // 读不到!batch 还未提交
// ✅ 如果需要读写依赖,使用应用层缓存
std::string cached_value = "value1";
batch.Put("key1", cached_value);
陷阱四:sync=true 的性能影响
// sync=true 会等待 fsync 完成,性能急剧下降
leveldb::WriteOptions sync_opts;
sync_opts.sync = true;
db->Write(sync_opts, &batch); // 安全但慢
// 通常做法:批量写入时 sync=false,定期 sync
leveldb::WriteOptions no_sync;
no_sync.sync = false;
db->Write(no_sync, &batch); // 快速
// 每 N 次批量写入后做一次 sync
if (batch_count % 100 == 0) {
leveldb::WriteOptions force_sync;
force_sync.sync = true;
leveldb::WriteBatch empty_batch;
db->Write(force_sync, &empty_batch); // 触发 fsync
}
5.8 本章小结
| 要点 | 内容 |
|---|---|
| 核心 API | WriteBatch::Put, WriteBatch::Delete, DB::Write |
| 原子性 | 要么全部成功,要么全部失败 |
| 性能 | WAL 只写一次,比逐条 Put 快数倍 |
| 限制 | 不支持回滚、不支持读取未提交数据 |
| 最佳实践 | 分批处理大数据量,控制内存使用 |
扩展阅读
- WriteBatch 实现:
db/write_batch.cc - WAL 格式:
db/log_format.h - 事务隔离级别:了解数据库事务的四个隔离级别与 WriteBatch 的对应关系
← 第 4 章 · 基本操作 | 第 6 章 · 自定义比较器 →