强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

LevelDB 完全指南 / 第 5 章 · 批量写入

第 5 章 · 批量写入

5.1 为什么需要 WriteBatch

单个 Put 操作是原子的,但当你需要同时修改多个 Key 时,单独的 Put 无法保证一致性:

问题场景:转账

账户 A = 100 元,账户 B = 0 元
目标:A → B 转 50 元

操作 1: Put("account:A", "50")    ← 成功
                                ← 此时崩溃!
操作 2: Put("account:B", "50")    ← 未执行

结果:50 元凭空消失!

WriteBatch 解决了这个问题——将多个操作打包成原子单元


5.2 WriteBatch 基本用法

C++ 示例

#include "leveldb/db.h"
#include "leveldb/write_batch.h"

leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::DB::Open(options, "/tmp/testdb", &db);

// 创建 WriteBatch
leveldb::WriteBatch batch;

// 添加操作到批次
batch.Put("user:1001:name", "张三");
batch.Put("user:1001:email", "[email protected]");
batch.Put("user:1001:age", "30");
batch.Delete("user:1001:old_field");

// 原子提交
leveldb::Status s = db->Write(leveldb::WriteOptions(), &batch);
if (!s.ok()) {
    std::cerr << "WriteBatch failed: " << s.ToString() << std::endl;
}

delete db;

Go 示例

batch := new(leveldb.Batch)
batch.Put([]byte("user:1001:name"), []byte("张三"))
batch.Put([]byte("user:1001:email"), []byte("[email protected]"))
batch.Delete([]byte("user:1001:old_field"))

err := db.Write(batch, nil)
if err != nil {
    log.Fatal(err)
}

Python 示例

with db.write_batch() as wb:
    wb.put(b'user:1001:name', '张三'.encode())
    wb.put(b'user:1001:email', b'[email protected]')
    wb.delete(b'user:1001:old_field')
# 退出 with 块时自动提交

5.3 WriteBatch 的内部机制

写入流程

db.Write(WriteOptions, &batch)
    │
    ▼
┌────────────────────────────────────┐
│ 1. 加写锁                          │
│ 2. 序列化 batch 中所有操作          │
│ 3. 将整个 batch 写入 WAL(单条记录)│
│ 4. 将 batch 中每个操作逐个插入 MemTable │
│ 5. 释放写锁                         │
└────────────────────────────────────┘

WAL 格式

WriteBatch 序列化后的格式:

┌─────────────────┬───────────────────────────────────┐
│ count (varint)  │ sequence of Put/Delete records    │
│                 │                                   │
│   record 1: [type=Put][key_len][key][val_len][val] │
│   record 2: [type=Put][key_len][key][val_len][val] │
│   record 3: [type=Del][key_len][key]               │
│   ...                                               │
└─────────────────┴───────────────────────────────────┘

所有操作共享同一个 Sequence Number。

原子性保证

属性说明
原子性要么全部成功,要么全部失败
一致性所有操作使用同一个 Sequence Number
持久性WAL 保证(取决于 sync 选项)
隔离性写入期间阻塞其他写入,但不阻塞读取

⚠️ 注意:WriteBatch 不是事务。它不支持回滚(Rollback),也不提供隔离级别的控制。


5.4 WriteBatch 进阶操作

Clear — 清空批次

leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");

batch.Clear();  // 清空所有操作

batch.Put("key3", "value3");  // 重新添加
db->Write(wopts, &batch);     // 只写入 key3

Append — 合并批次

leveldb::WriteBatch batch1;
batch1.Put("user:1:name", "张三");
batch1.Put("user:1:email", "[email protected]");

leveldb::WriteBatch batch2;
batch2.Put("user:2:name", "李四");
batch2.Put("user:2:email", "[email protected]");

// 将 batch2 的操作追加到 batch1
batch1.Append(batch2);

// batch1 现在包含 4 个操作
db->Write(wopts, &batch1);

Iterate — 遍历批次内容

leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Delete("key3");

// 自定义 Handler 遍历批次
class BatchPrinter : public leveldb::WriteBatch::Handler {
public:
    void Put(const leveldb::Slice& key,
             const leveldb::Slice& value) override {
        std::cout << "PUT: " << key.ToString()
                  << " = " << value.ToString() << std::endl;
    }
    void Delete(const leveldb::Slice& key) override {
        std::cout << "DEL: " << key.ToString() << std::endl;
    }
};

BatchPrinter printer;
batch.Iterate(&printer);

// 输出:
// PUT: key1 = value1
// PUT: key2 = value2
// DEL: key3

5.5 业务场景

场景一:用户注册(多字段原子写入)

struct User {
    int id;
    std::string name;
    std::string email;
    std::string phone;
};

bool RegisterUser(leveldb::DB* db, const User& user) {
    leveldb::WriteBatch batch;
    std::string prefix = "user:" + std::to_string(user.id) + ":";

    // 主记录
    batch.Put(prefix + "name", user.name);
    batch.Put(prefix + "email", user.email);
    batch.Put(prefix + "phone", user.phone);
    batch.Put(prefix + "created_at", std::to_string(time(nullptr)));

    // 索引记录(支持按邮箱查找)
    batch.Put("idx:email:" + user.email, std::to_string(user.id));
    batch.Put("idx:phone:" + user.phone, std::to_string(user.id));

    leveldb::Status s = db->Write(leveldb::WriteOptions(), &batch);
    return s.ok();
}

场景二:计数器批量更新

void UpdateStats(leveldb::DB* db, const std::string& event_type) {
    leveldb::WriteBatch batch;

    // 读取当前计数
    std::string total_key = "stats:total";
    std::string event_key = "stats:event:" + event_type;
    std::string total_val, event_val;

    db->Get(leveldb::ReadOptions(), total_key, &total_val);
    db->Get(leveldb::ReadOptions(), event_key, &event_val);

    int total = total_val.empty() ? 0 : std::stoi(total_val);
    int event_count = event_val.empty() ? 0 : std::stoi(event_val);

    // 更新计数
    batch.Put(total_key, std::to_string(total + 1));
    batch.Put(event_key, std::to_string(event_count + 1));
    batch.Put("stats:last_event_at", std::to_string(time(nullptr)));

    db->Write(leveldb::WriteOptions(), &batch);
}

场景三:数据迁移(带删除)

void MigrateKey(leveldb::DB* db,
                const std::string& old_key,
                const std::string& new_key) {
    std::string value;
    leveldb::Status s = db->Get(leveldb::ReadOptions(), old_key, &value);
    if (!s.ok()) return;

    leveldb::WriteBatch batch;
    batch.Put(new_key, value);      // 写入新位置
    batch.Delete(old_key);           // 删除旧位置
    db->Write(leveldb::WriteOptions(), &batch);
}

5.6 WriteBatch vs 逐条 Put 对比

对比项逐条 PutWriteBatch
原子性每条单独原子整批原子
WAL 写入次数N 次1 次
fsync 次数N 次(sync=true)1 次(sync=true)
性能(大批量)较慢快 2-10 倍
内存占用每条操作独立批次对象占用内存
适用场景单条写入多条关联写入

性能对比代码

#include <chrono>
#include "leveldb/db.h"

void BenchmarkIndividual(leveldb::DB* db, int n) {
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < n; i++) {
        std::string key = "bench:" + std::to_string(i);
        std::string value(100, 'x');
        db->Put(leveldb::WriteOptions(), key, value);
    }
    auto end = std::chrono::high_resolution_clock::now();
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << "逐条 Put " << n << " 次: " << ms.count() << " ms" << std::endl;
}

void BenchmarkBatch(leveldb::DB* db, int n) {
    auto start = std::chrono::high_resolution_clock::now();
    leveldb::WriteBatch batch;
    for (int i = 0; i < n; i++) {
        std::string key = "bench:" + std::to_string(i);
        std::string value(100, 'x');
        batch.Put(key, value);
    }
    db->Write(leveldb::WriteOptions(), &batch);
    auto end = std::chrono::high_resolution_clock::now();
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << "WriteBatch " << n << " 次: " << ms.count() << " ms" << std::endl;
}

5.7 注意事项与陷阱

陷阱一:批次太大导致内存暴涨

// ❌ 危险:一次写入 100 万个 key
leveldb::WriteBatch batch;
for (int i = 0; i < 1000000; i++) {
    batch.Put("key:" + std::to_string(i), large_value);
}
db->Write(wopts, &batch);  // 内存可能暴涨

// ✅ 正确:分批写入
const int BATCH_SIZE = 10000;
for (int start = 0; start < 1000000; start += BATCH_SIZE) {
    leveldb::WriteBatch batch;
    int end = std::min(start + BATCH_SIZE, 1000000);
    for (int i = start; i < end; i++) {
        batch.Put("key:" + std::to_string(i), large_value);
    }
    db->Write(wopts, &batch);
}

陷阱二:WriteBatch 不支持回滚

// ❌ 错误理解:以为 WriteBatch 像数据库事务一样可以回滚
leveldb::WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
// 没有 rollback 方法!

// ✅ 正确做法:提交前验证,提交后处理错误
if (!ValidateBatch(batch)) {
    // 不调用 db->Write 即可
    return;
}
db->Write(wopts, &batch);

陷阱三:WriteBatch 内部的读写

// ❌ 不能在 WriteBatch 内部读取刚刚写入的值
leveldb::WriteBatch batch;
batch.Put("key1", "value1");

std::string value;
db->Get(ropts, "key1", &value);  // 读不到!batch 还未提交

// ✅ 如果需要读写依赖,使用应用层缓存
std::string cached_value = "value1";
batch.Put("key1", cached_value);

陷阱四:sync=true 的性能影响

// sync=true 会等待 fsync 完成,性能急剧下降
leveldb::WriteOptions sync_opts;
sync_opts.sync = true;
db->Write(sync_opts, &batch);  // 安全但慢

// 通常做法:批量写入时 sync=false,定期 sync
leveldb::WriteOptions no_sync;
no_sync.sync = false;
db->Write(no_sync, &batch);  // 快速

// 每 N 次批量写入后做一次 sync
if (batch_count % 100 == 0) {
    leveldb::WriteOptions force_sync;
    force_sync.sync = true;
    leveldb::WriteBatch empty_batch;
    db->Write(force_sync, &empty_batch);  // 触发 fsync
}

5.8 本章小结

要点内容
核心 APIWriteBatch::Put, WriteBatch::Delete, DB::Write
原子性要么全部成功,要么全部失败
性能WAL 只写一次,比逐条 Put 快数倍
限制不支持回滚、不支持读取未提交数据
最佳实践分批处理大数据量,控制内存使用

扩展阅读

  1. WriteBatch 实现db/write_batch.cc
  2. WAL 格式db/log_format.h
  3. 事务隔离级别:了解数据库事务的四个隔离级别与 WriteBatch 的对应关系

第 4 章 · 基本操作 | 第 6 章 · 自定义比较器