强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第五章:自定义 Lua 测试

第五章:自定义 Lua 测试

5.1 为什么需要自定义测试

虽然 Sysbench 内置了多种 OLTP 测试,但真实业务场景往往与标准测试有差异。通过编写自定义 Lua 脚本,你可以:

  • 模拟特定业务的 SQL 模式(如电商下单、社交 Feed 流)
  • 测试特定表结构(如 JSON 字段、全文索引)
  • 调整读写比例以匹配真实负载
  • 实现自定义的初始化和清理逻辑
  • 测试存储过程 / 函数

5.2 Lua 脚本基础

5.2.1 Sysbench 的 Lua API

Sysbench 使用内嵌的 Lua 5.1 解释器(支持 LuaJIT),并提供以下核心 API:

API说明
sysbench.cmdline命令行参数定义
sysbench.opt访问命令行选项值
sysbench.rand随机数生成
sysbench.sqlSQL 辅助函数
sb_event事件类型常量
sb_status状态查询

5.2.2 脚本生命周期

-- 1. 命令行选项定义(加载时执行)
sysbench.cmdline.options = { ... }

-- 2. thread_init() - 每个线程初始化时调用一次
function thread_init()
  -- 创建每线程的数据库连接
  -- 初始化局部变量
end

-- 3. event() - 每次事件(事务)调用一次
function event()
  -- 核心测试逻辑
  -- 这里放置 SQL 操作
end

-- 4. thread_done() - 每个线程结束时调用一次
function thread_done()
  -- 关闭连接
  -- 清理资源
end

-- 5. prepare() - prepare 阶段调用(可选)
function prepare()
  -- 创建表、插入初始数据
end

-- 6. cleanup() - cleanup 阶段调用(可选)
function cleanup()
  -- 删除表、清理数据
end

-- 7. done() - 所有线程结束后调用(可选,用于报告自定义指标)
function done()
  -- 汇总统计、输出自定义报告
end

5.2.3 最小示例

-- minimal.lua - 最小的自定义测试脚本

function event()
  -- 每次事件什么都不做
end
sysbench --lua-script=minimal.lua run --threads=4 --time=10

5.3 数据库操作 API

5.3.1 连接管理

function thread_init()
  -- 创建数据库连接
  drv = sysbench.sql.driver()
  con = drv:connect()
end

function thread_done()
  -- 断开连接
  con:disconnect()
end

5.3.2 执行 SQL

function event()
  -- 执行查询(返回结果集)
  local rs = con:query("SELECT * FROM users WHERE id = " .. sysbench.rand.uniform(1, 100000))
  
  -- 遍历结果集
  for i = 1, rs.nrows do
    local id = rs[i][1]    -- 第 1 列
    local name = rs[i][2]  -- 第 2 列
  end
  
  -- 执行更新(无返回结果)
  con:query("UPDATE users SET login_count = login_count + 1 WHERE id = 1")
  
  -- 执行插入
  con:query(string.format(
    "INSERT INTO logs (user_id, action, created_at) VALUES (%d, '%s', NOW())",
    sysbench.rand.uniform(1, 100000),
    "login"
  ))
end

5.3.3 预编译语句(Prepared Statement)

function thread_init()
  drv = sysbench.sql.driver()
  con = drv:connect()
  
  -- 准备语句(性能更好)
  stmt = con:prepare("SELECT * FROM users WHERE id = ?")
end

function event()
  -- 绑定参数并执行
  stmt:bind(sysbench.rand.uniform(1, 100000))
  local rs = stmt:execute()
end

function thread_done()
  stmt:close()
  con:disconnect()
end

5.3.4 事务控制

function event()
  con:query("BEGIN")
  
  -- 执行一系列操作
  con:query("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
  con:query("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
  
  con:query("COMMIT")
  -- 或 con:query("ROLLBACK")
end

5.4 自定义命令行选项

5.4.1 定义选项

-- custom_options.lua

sysbench.cmdline.options = {
  {"用户表大小", "users-table-size", 100000},
  {"订单表大小", "orders-table-size", 500000},
  {"热点数据比例 (百分比)", "hotspot-pct", 20},
  {"是否使用事务", "use-transaction", true},
  {"自定义 SQL", "custom-sql", ""},
}
# 使用自定义选项
sysbench --lua-script=custom_options.lua \
  --users-table-size=500000 \
  --orders-table-size=2000000 \
  --hotspot-pct=10 \
  --threads=16 \
  --time=60 \
  run

5.4.2 在脚本中访问选项

function event()
  -- 通过 sysbench.opt 访问选项值
  local table_size = sysbench.opt.users_table_size  -- 注意:连字符转下划线
  local hotspot_pct = sysbench.opt.hotspot_pct
  
  -- 使用选项值
  local id
  if sysbench.rand.uniform(1, 100) <= hotspot_pct then
    -- 热点数据:访问前 20% 的数据
    id = sysbench.rand.uniform(1, math.floor(table_size * 0.2))
  else
    -- 冷数据:访问后 80% 的数据
    id = sysbench.rand.uniform(math.floor(table_size * 0.2) + 1, table_size)
  end
  
  con:query("SELECT * FROM users WHERE id = " .. id)
end

5.5 完整示例:电商下单测试

5.5.1 场景描述

模拟电商下单场景:

  1. 查询用户信息
  2. 查询商品信息和库存
  3. 创建订单
  4. 减少库存
  5. 所有操作在一个事务中

5.5.2 完整脚本

-- ecommerce_order.lua
-- 电商下单场景测试

sysbench.cmdline.options = {
  {"用户表行数", "users-size", 100000},
  {"商品表行数", "products-size", 10000},
  {"订单表行数", "orders-size", 1000000},
  {"每单商品数", "items-per-order", 3},
  {"热点商品比例 (%)", "hotspot-pct", 10},
}

local drv, con

function prepare()
  local host = sysbench.opt.mysql_host or "127.0.0.1"
  local port = sysbench.opt.mysql_port or 3306
  local user = sysbench.opt.mysql_user or "root"
  local pass = sysbench.opt.mysql_password or ""
  local db   = sysbench.opt.mysql_db or "sbtest"
  
  drv = sysbench.sql.driver()
  con = drv:connect()
  
  -- 创建数据库
  con:query("CREATE DATABASE IF NOT EXISTS " .. db)
  con:query("USE " .. db)
  
  -- 创建用户表
  con:query([[
    CREATE TABLE IF NOT EXISTS ec_users (
      id INT PRIMARY KEY AUTO_INCREMENT,
      username VARCHAR(64) NOT NULL,
      email VARCHAR(128) NOT NULL,
      balance DECIMAL(10,2) DEFAULT 0.00,
      level TINYINT DEFAULT 1,
      created_at DATETIME DEFAULT NOW(),
      INDEX idx_username (username)
    ) ENGINE=InnoDB
  ]])
  
  -- 创建商品表
  con:query([[
    CREATE TABLE IF NOT EXISTS ec_products (
      id INT PRIMARY KEY AUTO_INCREMENT,
      name VARCHAR(255) NOT NULL,
      price DECIMAL(10,2) NOT NULL,
      stock INT NOT NULL DEFAULT 0,
      category_id INT DEFAULT 1,
      status TINYINT DEFAULT 1,
      INDEX idx_category (category_id),
      INDEX idx_status (status)
    ) ENGINE=InnoDB
  ]])
  
  -- 创建订单表
  con:query([[
    CREATE TABLE IF NOT EXISTS ec_orders (
      id BIGINT PRIMARY KEY AUTO_INCREMENT,
      user_id INT NOT NULL,
      total_amount DECIMAL(12,2) NOT NULL,
      status TINYINT DEFAULT 0,
      created_at DATETIME DEFAULT NOW(),
      INDEX idx_user_id (user_id),
      INDEX idx_status (status),
      INDEX idx_created (created_at)
    ) ENGINE=InnoDB
  ]])
  
  -- 创建订单明细表
  con:query([[
    CREATE TABLE IF NOT EXISTS ec_order_items (
      id BIGINT PRIMARY KEY AUTO_INCREMENT,
      order_id BIGINT NOT NULL,
      product_id INT NOT NULL,
      quantity INT NOT NULL,
      price DECIMAL(10,2) NOT NULL,
      INDEX idx_order_id (order_id),
      INDEX idx_product_id (product_id)
    ) ENGINE=InnoDB
  ]])
  
  -- 插入用户数据
  print("Inserting users...")
  for i = 1, sysbench.opt.users_size do
    con:query(string.format(
      "INSERT IGNORE INTO ec_users (username, email, balance) VALUES ('user%d', 'user%[email protected]', %.2f)",
      i, i, sysbench.rand.uniform_double() * 10000
    ))
  end
  
  -- 插入商品数据
  print("Inserting products...")
  for i = 1, sysbench.opt.products_size do
    con:query(string.format(
      "INSERT IGNORE INTO ec_products (name, price, stock) VALUES ('Product-%d', %.2f, %d)",
      i,
      sysbench.rand.uniform_double() * 999 + 1,
      sysbench.rand.uniform(10, 1000)
    ))
  end
  
  print("Prepare complete!")
end

function cleanup()
  drv = sysbench.sql.driver()
  con = drv:connect()
  
  con:query("DROP TABLE IF EXISTS ec_order_items")
  con:query("DROP TABLE IF EXISTS ec_orders")
  con:query("DROP TABLE IF EXISTS ec_products")
  con:query("DROP TABLE IF EXISTS ec_users")
end

function thread_init()
  drv = sysbench.sql.driver()
  con = drv:connect()
  con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
end

function thread_done()
  con:disconnect()
end

function get_hot_product_id()
  local total = sysbench.opt.products_size
  local hotspot_size = math.floor(total * sysbench.opt.hotspot_pct / 100)
  
  if sysbench.rand.uniform(1, 100) <= 70 then
    -- 70% 概率访问热点商品
    return sysbench.rand.uniform(1, hotspot_size)
  else
    -- 30% 概率访问冷门商品
    return sysbench.rand.uniform(hotspot_size + 1, total)
  end
end

function event()
  local user_id = sysbench.rand.uniform(1, sysbench.opt.users_size)
  local num_items = sysbench.rand.uniform(1, sysbench.opt.items_per_order)
  
  con:query("BEGIN")
  
  -- 1. 查询用户余额
  local rs = con:query("SELECT balance FROM ec_users WHERE id = " .. user_id)
  if rs.nrows == 0 then
    con:query("ROLLBACK")
    return
  end
  
  -- 2. 逐个处理商品
  local total = 0
  local items = {}
  for i = 1, num_items do
    local product_id = get_hot_product_id()
    
    -- 查询商品信息和库存
    local prs = con:query(string.format(
      "SELECT price, stock FROM ec_products WHERE id = %d AND status = 1 FOR UPDATE",
      product_id
    ))
    
    if prs.nrows > 0 then
      local price = tonumber(prs[1][1])
      local stock = tonumber(prs[1][2])
      
      if stock > 0 then
        local qty = math.min(sysbench.rand.uniform(1, 3), stock)
        total = total + price * qty
        table.insert(items, {product_id, qty, price})
        
        -- 减少库存
        con:query(string.format(
          "UPDATE ec_products SET stock = stock - %d WHERE id = %d",
          qty, product_id
        ))
      end
    end
  end
  
  -- 3. 创建订单
  if #items > 0 then
    con:query(string.format(
      "INSERT INTO ec_orders (user_id, total_amount, status) VALUES (%d, %.2f, 0)",
      user_id, total
    ))
    
    -- 获取订单 ID
    local ors = con:query("SELECT LAST_INSERT_ID()")
    local order_id = tonumber(ors[1][1])
    
    -- 插入订单明细
    for _, item in ipairs(items) do
      con:query(string.format(
        "INSERT INTO ec_order_items (order_id, product_id, quantity, price) VALUES (%d, %d, %d, %.2f)",
        order_id, item[1], item[2], item[3]
      ))
    end
  end
  
  con:query("COMMIT")
end

5.5.3 使用自定义脚本

# 准备数据
sysbench --lua-script=ecommerce_order.lua \
  --mysql-host=127.0.0.1 \
  --mysql-user=root \
  --mysql-password=secret \
  --mysql-db=ecommerce_bench \
  --users-size=100000 \
  --products-size=10000 \
  prepare

# 运行测试
sysbench --lua-script=ecommerce_order.lua \
  --mysql-host=127.0.0.1 \
  --mysql-user=root \
  --mysql-password=secret \
  --mysql-db=ecommerce_bench \
  --users-size=100000 \
  --products-size=10000 \
  --hotspot-pct=10 \
  --threads=32 \
  --time=300 \
  --histogram \
  run

# 清理
sysbench --lua-script=ecommerce_order.lua \
  --mysql-host=127.0.0.1 \
  --mysql-user=root \
  --mysql-db=ecommerce_bench \
  cleanup

5.6 完整示例:社交动态 Feed 测试

-- social_feed.lua
-- 社交平台 Feed 流读写测试

sysbench.cmdline.options = {
  {"用户数", "user-count", 100000},
  {"每人关注数", "follows-per-user", 50},
  {"每人动态数", "posts-per-user", 100},
}

local drv, con

function prepare()
  drv = sysbench.sql.driver()
  con = drv:connect()
  con:query("CREATE DATABASE IF NOT EXISTS " .. (sysbench.opt.mysql_db or "sbtest"))
  con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
  
  con:query([[
    CREATE TABLE IF NOT EXISTS sf_users (
      id INT PRIMARY KEY AUTO_INCREMENT,
      nickname VARCHAR(64),
      INDEX idx_nickname (nickname)
    ) ENGINE=InnoDB
  ]])
  
  con:query([[
    CREATE TABLE IF NOT EXISTS sf_follows (
      user_id INT NOT NULL,
      follow_id INT NOT NULL,
      created_at DATETIME DEFAULT NOW(),
      PRIMARY KEY (user_id, follow_id),
      INDEX idx_follow (follow_id)
    ) ENGINE=InnoDB
  ]])
  
  con:query([[
    CREATE TABLE IF NOT EXISTS sf_posts (
      id BIGINT PRIMARY KEY AUTO_INCREMENT,
      user_id INT NOT NULL,
      content TEXT,
      like_count INT DEFAULT 0,
      comment_count INT DEFAULT 0,
      created_at DATETIME DEFAULT NOW(),
      INDEX idx_user_time (user_id, created_at DESC)
    ) ENGINE=InnoDB
  ]])
  
  -- 插入用户
  print("Inserting users...")
  local batch_size = 1000
  for i = 1, sysbench.opt.user_count, batch_size do
    local values = {}
    for j = i, math.min(i + batch_size - 1, sysbench.opt.user_count) do
      table.insert(values, string.format("('user_%d')", j))
    end
    con:query("INSERT INTO sf_users (nickname) VALUES " .. table.concat(values, ","))
  end
  
  -- 插入关注关系
  print("Inserting follows...")
  for i = 1, sysbench.opt.user_count do
    for j = 1, sysbench.opt.follows_per_user do
      local follow_id = sysbench.rand.uniform(1, sysbench.opt.user_count)
      con:query(string.format(
        "INSERT IGNORE INTO sf_follows (user_id, follow_id) VALUES (%d, %d)",
        i, follow_id
      ))
    end
  end
  
  -- 插入动态
  print("Inserting posts...")
  for i = 1, sysbench.opt.user_count do
    for j = 1, sysbench.opt.posts_per_user do
      con:query(string.format(
        "INSERT INTO sf_posts (user_id, content, like_count, comment_count) VALUES (%d, 'Post content %d-%d', %d, %d)",
        i, i, j, sysbench.rand.uniform(0, 1000), sysbench.rand.uniform(0, 100)
      ))
    end
  end
  
  print("Prepare complete!")
end

function cleanup()
  drv = sysbench.sql.driver()
  con = drv:connect()
  con:query("DROP TABLE IF EXISTS sf_posts")
  con:query("DROP TABLE IF EXISTS sf_follows")
  con:query("DROP TABLE IF EXISTS sf_users")
end

function thread_init()
  drv = sysbench.sql.driver()
  con = drv:connect()
  con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
end

function thread_done()
  con:disconnect()
end

function event()
  local user_id = sysbench.rand.uniform(1, sysbench.opt.user_count)
  local action = sysbench.rand.uniform(1, 10)
  
  if action <= 5 then
    -- 50%: 读取 Feed(关注的人的最新动态)
    con:query(string.format([[
      SELECT p.id, p.user_id, p.content, p.like_count, p.created_at
      FROM sf_posts p
      INNER JOIN sf_follows f ON f.follow_id = p.user_id
      WHERE f.user_id = %d
      ORDER BY p.created_at DESC
      LIMIT 20
    ]], user_id))
    
  elseif action <= 7 then
    -- 20%: 发布动态
    con:query(string.format(
      "INSERT INTO sf_posts (user_id, content, like_count) VALUES (%d, 'New post at %d', 0)",
      user_id, os.time()
    ))
    
  elseif action <= 9 then
    -- 20%: 点赞
    con:query(string.format(
      "UPDATE sf_posts SET like_count = like_count + 1 WHERE id = %d",
      sysbench.rand.uniform(1, sysbench.opt.user_count * sysbench.opt.posts_per_user)
    ))
    
  else
    -- 10%: 查看某用户的个人页
    con:query(string.format(
      "SELECT * FROM sf_posts WHERE user_id = %d ORDER BY created_at DESC LIMIT 20",
      user_id
    ))
  end
end

5.7 随机数 API 详解

5.7.1 随机数函数

-- 均匀分布随机整数 [min, max]
sysbench.rand.uniform(1, 1000)

-- 均匀分布随机浮点数 [0, 1)
sysbench.rand.uniform_double()

-- 高斯分布(正态分布),参数为均值和标准差
sysbench.rand.gaussian(100, 20)

-- 特殊分布(热点数据集中)
-- 由 --rand-spec-pct 和 --rand-spec-res 控制
sysbench.rand.special()

-- Pareto 分布
sysbench.rand.pareto()

-- Zipfian 分布
sysbench.rand.zipfian()

-- 生成指定长度的随机字符串
sysbench.rand.string("#####-@@@@-$$$$")  -- #=数字 @=字母 $=混合

5.7.2 随机字符串模板

字符含义
@随机字母(a-z, A-Z)
#随机数字(0-9)
$随机字母或数字
其他原样输出
-- 生成订单号格式:ORD-20240101-ABCD1234
local order_no = "ORD-" .. sysbench.rand.string("######") .. "-" .. sysbench.rand.string("@@@@####")

-- 生成邮箱:[email protected]
local email = "user" .. sysbench.rand.string("####") .. "@example.com"

5.8 高级技巧

5.8.1 自定义统计报告

-- 使用全局变量收集自定义统计
local custom_stats = {
  order_count = 0,
  total_amount = 0,
  error_count = 0,
}

function event()
  -- ... 执行业务逻辑 ...
  
  -- 更新统计
  custom_stats.order_count = custom_stats.order_count + 1
  custom_stats.total_amount = custom_stats.total_amount + amount
  
  if error then
    custom_stats.error_count = custom_stats.error_count + 1
  end
end

function done()
  -- 输出自定义报告
  print("")
  print("=== 自定义业务统计 ===")
  print(string.format("订单总数: %d", custom_stats.order_count))
  print(string.format("总金额: %.2f", custom_stats.total_amount))
  print(string.format("错误次数: %d", custom_stats.error_count))
  print(string.format("平均每单金额: %.2f", 
    custom_stats.order_count > 0 and custom_stats.total_amount / custom_stats.order_count or 0))
end

注意done() 在所有线程结束后由主线程调用,但自定义变量在多线程环境下是隔离的,需要通过其他方式汇总(如写入共享文件或数据库)。

5.8.2 条件执行与错误处理

function event()
  local ok, err = pcall(function()
    con:query("BEGIN")
    con:query("UPDATE accounts SET balance = balance - 100 WHERE id = 1 AND balance >= 100")
    con:query("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    con:query("COMMIT")
  end)
  
  if not ok then
    -- 错误处理
    pcall(function() con:query("ROLLBACK") end)
    sysbench.metrics.report_event("errors")
  end
end

5.8.3 连接复用与连接池

-- 每个线程使用独立的连接(Sysbench 默认模式)
-- 连接在 thread_init() 中创建,在 thread_done() 中关闭

-- 如果需要多个连接(如同时操作主库和从库)
function thread_init()
  drv = sysbench.sql.driver()
  con_master = drv:connect()  -- 主库连接
  con_slave = drv:connect()   -- 从库连接(需配置不同选项)
end

5.8.4 使用外部文件中的数据

-- 从 CSV 文件加载测试数据
function prepare()
  local file = io.open("test_data.csv", "r")
  if not file then
    error("Cannot open test_data.csv")
  end
  
  for line in file:lines() do
    local fields = {}
    for field in line:gmatch("[^,]+") do
      table.insert(fields, field)
    end
    
    con:query(string.format(
      "INSERT INTO users (name, email, age) VALUES ('%s', '%s', %d)",
      fields[1], fields[2], tonumber(fields[3])
    ))
  end
  
  file:close()
end

5.9 内置 OLTP 脚本的源码结构

理解内置脚本的结构有助于编写自定义脚本。以 oltp_read_write.lua 为例:

-- /usr/share/sysbench/oltp_read_write.lua 的简化结构

pathtest = string.match(testpath, "(.*/)")
-- 加载公共模块
require(pathtest .. "oltp_common")

function prepare()
  -- 调用公共模块的 prepare
  oltp_prepare()
end

function cleanup()
  oltp_cleanup()
end

function thread_init()
  -- 调用公共模块的线程初始化
  oltp_thread_init()
end

function thread_done()
  oltp_thread_done()
end

function event()
  -- 每个事务包含的具体操作
  local table_name = "sbtest" .. sysbench.rand.uniform(1, sysbench.opt.tables)
  local id = sysbench.rand.uniform(1, sysbench.opt.table_size)
  
  con:query("BEGIN")
  
  -- 点查询
  for i = 1, sysbench.opt.point_selects do
    con:query("SELECT c FROM " .. table_name .. " WHERE id = " .. id)
  end
  
  -- 范围查询
  for i = 1, sysbench.opt.simple_ranges do
    local range_start = sysbench.rand.uniform(1, sysbench.opt.table_size)
    local range_end = range_start + sysbench.opt.range_size
    con:query(string.format("SELECT c FROM %s WHERE id BETWEEN %d AND %d",
      table_name, range_start, range_end))
  end
  
  -- ... 更多操作 ...
  
  con:query("COMMIT")
end

公共模块 oltp_common.lua 包含:

  • 表结构定义
  • oltp_prepare() 函数(创建表、插入数据)
  • oltp_cleanup() 函数(删除表)
  • 命令行选项定义

5.10 调试 Lua 脚本

5.10.1 启用调试输出

-- 使用 print() 输出调试信息
function event()
  local id = sysbench.rand.uniform(1, 1000)
  print("DEBUG: Querying id = " .. id)  -- 调试输出
  con:query("SELECT * FROM sbtest WHERE id = " .. id)
end
# 运行时增加 verbosity
sysbench --lua-script=my_test.lua --verbosity=5 --threads=1 --time=5 run

5.10.2 使用断言

function event()
  local rs = con:query("SELECT COUNT(*) FROM users")
  assert(rs.nrows > 0, "Users table is empty!")
  
  local count = tonumber(rs[1][1])
  assert(count > 0, "No users found")
end

5.10.3 单线程调试

# 使用 1 个线程,短时间运行
sysbench --lua-script=my_test.lua --threads=1 --time=5 --verbosity=5 run

5.11 小结

要点说明
脚本结构thread_init → event() 循环 → thread_done
数据库操作con:query() 执行 SQL,con:prepare() 预编译
自定义选项sysbench.cmdline.options 定义,sysbench.opt 访问
随机数sysbench.rand.uniform/uniform_double/gaussian 等
调试方法print() + –verbosity=5 + 单线程运行
最佳实践使用预编译语句、合理使用事务、错误处理

扩展阅读