强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

ClickHouse 教程 / 数据导入(CSV/JSON/Parquet)

数据导入(CSV/JSON/Parquet)

ClickHouse 支持多种数据导入方式,从简单的命令行导入到实时流式摄入,满足各种数据集成场景。


1. 命令行导入

1.1 clickhouse-client 直接导入

# CSV 文件导入(最常用)
clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < data.csv

# 指定分隔符的 CSV
clickhouse-client --query "INSERT INTO my_table FORMAT CSVWithNames" < data_with_header.csv

# TabSeparated(TSV)格式
clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparated" < data.tsv

# 带标题行的 TSV
clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparatedWithNames" < data.tsv

# 后台执行(大数据量时使用)
nohup clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < big_data.csv &

1.2 多线程并行导入

# 使用 parallel 命令加速
# 将大文件拆分为多个小文件
split -l 1000000 big_data.csv chunk_

# 并行导入
ls chunk_* | xargs -P 4 -I {} sh -c 'clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < {}'

1.3 clickhouse-local 查询本地文件

# 查询本地文件(不需要服务器)
clickhouse-local --query "
    SELECT *
    FROM file('data.csv', 'CSV', 'id UInt64, name String, value Float64')
    LIMIT 10
"

# 将本地文件导入服务器
clickhouse-local --query "
    SELECT * FROM file('data.csv', 'CSV', 'id UInt64, name String, value Float64')
" | clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparated"

2. INSERT INTO … FORMAT

2.1 支持的格式

-- CSV 格式
INSERT INTO my_table FORMAT CSV
1,'Alice',28,'Beijing'
2,'Bob',35,'Shanghai'
3,'Carol',22,'Guangzhou'

-- CSVWithNames(第一行为列名)
INSERT INTO my_table FORMAT CSVWithNames
id,name,age,city
1,'Alice',28,'Beijing'
2,'Bob',35,'Shanghai'

-- JSONEachRow(每行一个 JSON 对象)
INSERT INTO my_table FORMAT JSONEachRow
{"id": 1, "name": "Alice", "age": 28, "city": "Beijing"}
{"id": 2, "name": "Bob", "age": 35, "city": "Shanghai"}

-- JSONCompactEachRow
INSERT INTO my_table FORMAT JSONCompactEachRow
[1, "Alice", 28, "Beijing"]
[2, "Bob", 35, "Shanghai"]

-- Parquet
INSERT INTO my_table FORMAT Parquet < data.parquet

-- Arrow
INSERT INTO my_table FORMAT Arrow < data.arrow

-- ORC
INSERT INTO my_table FORMAT ORC < data.orc

-- MsgPack
INSERT INTO my_table FORMAT MsgPack < data.msgpack

-- 自定义分隔符
INSERT INTO my_table FORMAT CustomSeparated
1|Alice|28|Beijing
2|Bob|35|Shanghai
SETTINGS format_custom_escaping_rule = 'CSV',
         format_custom_field_delimiter = '|'

2.2 格式选项设置

-- CSV 导入选项
INSERT INTO my_table FORMAT CSV SETTINGS
    input_format_csv_skip_first_lines = 1,         -- 跳过首行(标题)
    input_format_csv_allow_single_quotes = 0,       -- 禁止单引号
    input_format_csv_use_default_on_missing_values = 1;  -- 缺失值用默认值

-- JSON 导入选项
INSERT INTO my_table FORMAT JSONEachRow SETTINGS
    input_format_json_named_tuples_as_objects = 1,  -- Tuple 映射为 JSON 对象
    input_format_json_defaults_for_missing_elements = 1,  -- 缺失字段用默认值
    input_format_json_read_objects_as_strings = 0;

-- Parquet 导入选项
INSERT INTO my_table FORMAT Parquet SETTINGS
    input_format_parquet_import_nested = 1;         -- 导入嵌套结构

⚠️ 注意:CSV 格式不支持 NULL 值的原生表示。使用 \N 表示 NULL,或在 Nullable 列中使用空值。


3. INSERT INTO … FROM INFILE

ClickHouse 22.8+ 支持直接从客户端本地文件导入:

-- 从本地文件导入(客户端机器上的文件)
INSERT INTO my_table FROM INFILE '/tmp/data.csv' FORMAT CSV;

-- 支持压缩文件
INSERT INTO my_table FROM INFILE '/tmp/data.csv.gz' FORMAT CSV;

-- 带设置的导入
INSERT INTO my_table FROM INFILE '/tmp/data.csv' FORMAT CSV
SETTINGS
    input_format_csv_skip_first_lines = 1,
    max_insert_block_size = 1000000;

-- Parquet 文件
INSERT INTO my_table FROM INFILE '/tmp/data.parquet' FORMAT Parquet;

-- JSON 文件
INSERT INTO my_table FROM INFILE '/tmp/data.jsonl' FORMAT JSONEachRow;

💡 提示FROM INFILE 的文件路径是相对于客户端进程的工作目录,而非服务器目录。这在容器化环境中特别方便。


4. HTTP 接口导入

4.1 基本 HTTP 导入

# 使用 curl 导入 CSV
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
    --data-binary @data.csv

# 使用 curl 导入 JSON
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+JSONEachRow' \
    -d '{"id": 1, "name": "Alice"}\n{"id": 2, "name": "Bob"}'

# 使用 curl 导入 Parquet
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+Parquet' \
    --data-binary @data.parquet

# 带认证的导入
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
    -u default:password \
    --data-binary @data.csv

4.2 分批导入

# 大文件分批导入(每批 100 万行)
split -l 1000000 big_data.csv chunk_
for f in chunk_*; do
    curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
        --data-binary @"$f"
    echo "Imported $f"
    rm "$f"
done

4.3 HTTP 接口查询

# 查询并导出为 JSON
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+JSON'

# 查询并导出为 CSV
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+CSVWithNames'

# 查询并下载 Parquet
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+Parquet' \
    -o output.parquet

5. 文件引擎 File()

5.1 使用 File 引擎查询本地文件

-- 创建基于本地文件的表
CREATE TABLE temp_csv
(
    id UInt64,
    name String,
    value Float64
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv');

-- 查询文件内容
SELECT * FROM temp_csv;

-- 将文件数据写入另一张表
INSERT INTO my_table SELECT * FROM temp_csv;

5.2 支持压缩文件

-- 自动检测压缩格式
CREATE TABLE temp_compressed
(
    id UInt64,
    data String
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv.gz');

-- 支持多种压缩格式: .gz, .zst, .xz, .bz2, .lz4, .sz
CREATE TABLE temp_zst
(
    id UInt64,
    data String
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv.zst');

5.3 写入文件

-- 将查询结果写入本地文件
CREATE TABLE output_csv
(
    id UInt64,
    name String,
    total Float64
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/output.csv');

INSERT INTO output_csv
SELECT id, name, sum(value) AS total
FROM my_table
GROUP BY id, name;

6. 表函数 file()

6.1 一次性查询本地文件

-- 使用 file() 表函数直接查询(无需建表)
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'id UInt64, name String, value Float64');

-- 带压缩的文件
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv.gz', 'CSV', 'id UInt64, name String');

-- 查看文件结构(自动推断列类型)
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'c1 String, c2 String, c3 String')
LIMIT 5;

-- 使用 file() 写入数据
INSERT INTO my_table
SELECT *
FROM file('/tmp/import_data.csv', 'CSV', 'id UInt64, name String, value Float64');

6.2 自动推断列类型

-- ClickHouse 23.x+ 支持自动推断列类型
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV')
LIMIT 10;

-- 使用 structure 自动推断
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'auto')
LIMIT 10;

💡 提示file() 表函数适合临时查询和一次性数据导入,无需创建持久表。


7. S3 函数导入

7.1 从 S3 读取数据

-- 直接查询 S3 文件
SELECT *
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events.csv',
    'CSV',
    'id UInt64, name String, event_time DateTime'
);

-- 使用凭证访问私有 S3
SELECT *
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events.csv',
    'my-access-key',
    'my-secret-key',
    'CSV',
    'id UInt64, name String'
);

-- 从 S3 导入到表
INSERT INTO my_table
SELECT *
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events.parquet',
    'Parquet'
);

-- 通配符匹配多个文件
SELECT count()
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events_*.csv',
    'CSV',
    'id UInt64, name String'
);

-- 查询 Parquet 文件(自动推断列类型)
SELECT *
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events.parquet'
)
LIMIT 10;

-- 使用分区模式
SELECT *
FROM s3Cluster(
    'default',   -- 集群名
    'https://my-bucket.s3.amazonaws.com/data/events_{0..99}.parquet',
    'Parquet'
);

7.2 S3 引擎表

-- 创建 S3 引擎表
CREATE TABLE s3_table
(
    id UInt64,
    name String,
    value Float64
)
ENGINE = S3(
    'https://my-bucket.s3.amazonaws.com/data/table/',
    'CSV',
    'my-access-key',
    'my-secret-key'
);

-- 读写 S3 表
SELECT * FROM s3_table LIMIT 10;
INSERT INTO s3_table VALUES (1, 'test', 3.14);

7.3 导出到 S3

-- 将查询结果导出到 S3
INSERT INTO FUNCTION s3(
    'https://my-bucket.s3.amazonaws.com/export/results.parquet',
    'my-access-key',
    'my-secret-key',
    'Parquet'
)
SELECT
    toStartOfHour(event_time) AS hour,
    count() AS events,
    uniq(user_id) AS users
FROM events
GROUP BY hour;

8. URL 函数

-- 从 HTTP URL 查询数据
SELECT *
FROM url(
    'https://example.com/data.csv',
    'CSV',
    'id UInt64, name String'
);

-- 导入到表
INSERT INTO my_table
SELECT *
FROM url(
    'https://example.com/data.jsonl',
    'JSONEachRow',
    'id UInt64, name String'
);

-- 使用 URL 处理器处理分页数据
-- ClickHouse 内置支持 HTTP 重定向

9. Kafka 引擎导入

9.1 Kafka 引擎配置

-- 创建 Kafka 引擎表
CREATE TABLE kafka_events
(
    event_id UInt64,
    user_id UInt64,
    event_type String,
    event_time DateTime,
    properties String
)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka-broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4,
    kafka_max_block_size = 1048576,
    kafka_skip_broken_messages = 1;

-- 创建目标表
CREATE TABLE events
(
    event_id UInt64,
    user_id UInt64,
    event_type LowCardinality(String),
    event_time DateTime,
    properties Map(String, String)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);

-- 创建物化视图自动将 Kafka 数据写入目标表
CREATE MATERIALIZED VIEW kafka_events_mv TO events AS
SELECT
    event_id,
    user_id,
    event_type,
    event_time,
    JSONExtract(properties, 'Map(String, String)') AS properties
FROM kafka_events;

9.2 Kafka 消费者监控

-- 查看 Kafka 消费者状态
SELECT *
FROM system.consumers
WHERE database = currentDatabase();

-- 查看 Kafka 表的消费进度
SELECT
    table,
    metadata_path,
    is_readonly
FROM system.tables
WHERE engine = 'Kafka'
  AND database = currentDatabase();

⚠️ 注意:Kafka 引擎的数据消费和目标表写入是独立的。物化视图负责数据转换和路由。如果目标表写入失败,Kafka 偏移量不会提交,数据会被重新消费。


10. 物化视图实时导入

10.1 基本实时导入

-- 原始数据表
CREATE TABLE raw_logs
(
    log_time DateTime,
    service String,
    level String,
    message String
)
ENGINE = MergeTree()
ORDER BY log_time;

-- 目标统计表
CREATE TABLE service_stats
(
    log_date Date,
    service LowCardinality(String),
    level LowCardinality(String),
    log_count UInt64
)
ENGINE = SummingMergeTree(log_count)
ORDER BY (log_date, service, level);

-- 物化视图:实时预聚合
CREATE MATERIALIZED VIEW mv_service_stats TO service_stats AS
SELECT
    toDate(log_time) AS log_date,
    service,
    level,
    count() AS log_count
FROM raw_logs
GROUP BY log_date, service, level;

-- 查询统计(使用 SummingMergeTree 自动合并)
SELECT
    log_date,
    service,
    level,
    sum(log_count) AS total_count
FROM service_stats
GROUP BY log_date, service, level;

10.2 多目标物化视图

-- 一个原始表可以有多个物化视图
-- 实时统计每小时的错误率
CREATE MATERIALIZED VIEW mv_hourly_errors
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, service)
AS
SELECT
    toStartOfHour(log_time) AS hour,
    service,
    countState() AS total_count,
    countIfState(level = 'ERROR') AS error_count,
    countIfState(level = 'WARN') AS warn_count
FROM raw_logs
GROUP BY hour, service;

-- 查询
SELECT
    hour,
    service,
    countMerge(total_count) AS total,
    countIfMerge(error_count) AS errors,
    round(countIfMerge(error_count) / countMerge(total_count) * 100, 2) AS error_rate
FROM mv_hourly_errors
GROUP BY hour, service
ORDER BY hour DESC;

11. 导入性能优化

11.1 max_insert_block_size

-- 控制单个 INSERT 块的大小
SET max_insert_block_size = 1048576;  -- 默认 1048576 行

-- 大批量导入时可以增大
SET max_insert_block_size = 4194304;  -- 400 万行

-- 查看当前设置
SELECT name, value
FROM system.settings
WHERE name = 'max_insert_block_size';

11.2 async_insert(异步写入)

-- 启用异步 INSERT(适合高频小批量写入)
SET async_insert = 1;

-- 异步 INSERT 配置
SET async_insert_max_data_size = 10485760;  -- 10MB 后 flush
SET async_insert_busy_timeout_ms = 200;      -- 最长等待 200ms

-- 验证异步 INSERT 是否生效
INSERT INTO my_table (id, name) VALUES (1, 'test');
-- 数据不会立即可见,等待 flush 后可见

-- 创建表时配置异步 INSERT
CREATE TABLE async_table
(
    id UInt64,
    data String
)
ENGINE = MergeTree()
ORDER BY id
SETTINGS async_insert = 1;

💡 提示async_insert 适合 Kafka 消费者、HTTP 写入等高频小批量场景。ClickHouse 会在内存中缓冲数据,达到阈值或超时后批量写入,显著减少小 part 的产生。

11.3 批量导入优化参数

-- 批量导入优化设置
SET
    max_insert_block_size = 1048576,
    min_insert_block_size_rows = 1048576,
    min_insert_block_size_bytes = 268435456,  -- 256MB
    async_insert = 0;  -- 批量导入时关闭异步

-- 插入大批量数据时禁用副本同步检查(仅限副本表)
SET insert_deduplicate = 0;

-- 禁用写入后的同步 merge
SET optimize_on_insert = 0;

-- 设置写入超时
SET insert_distributed_timeout = 30000;  -- 30 秒

-- 查看当前写入性能
SELECT
    event_time,
    query,
    written_rows,
    formatReadableSize(written_bytes) AS written_size,
    query_duration_ms,
    round(written_rows / query_duration_ms * 1000) AS rows_per_sec
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query LIKE 'INSERT INTO%'
ORDER BY event_time DESC
LIMIT 10;

11.4 并行写入

-- 分布式表并行写入
-- 创建分布式表
CREATE TABLE my_table_distributed AS my_table
ENGINE = Distributed('cluster', 'default', 'my_table', rand());

-- 并行写入分布式表
-- ClickHouse 自动将数据路由到不同分片
INSERT INTO my_table_distributed
SELECT * FROM s3('https://bucket/data.csv', 'CSV', 'id UInt64, name String');

12. 大批量导入策略

12.1 导入流程设计

数据源 → 预处理 → 批量导入 → 数据验证
                │
                ▼
        ┌───────────────────┐
        │  1. 拆分大文件     │
        │  2. 并行写入       │
        │  3. 监控写入进度   │
        │  4. 数据校验       │
        └───────────────────┘

12.2 导入监控

-- 查看当前正在执行的 INSERT 语句
SELECT
    query_id,
    user,
    query,
    formatReadableSize(memory_usage) AS memory,
    elapsed,
    read_rows,
    written_rows,
    formatReadableSize(written_bytes) AS written_size,
    progress
FROM system.processes
WHERE query LIKE 'INSERT INTO%';

-- 查看系统写入统计
SELECT
    event_time,
    query,
    written_rows,
    formatReadableSize(written_bytes) AS written,
    query_duration_ms AS duration_ms,
    round(written_rows / query_duration_ms * 1000, 0) AS rows_per_second
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query LIKE 'INSERT INTO%'
  AND event_time > now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;

12.3 导入错误处理

-- 允许一定比例的错误行
INSERT INTO my_table
SELECT *
FROM file('data.csv', 'CSV', 'id UInt64, name String')
SETTINGS
    input_format_allow_errors_num = 100,     -- 允许 100 行错误
    input_format_allow_errors_ratio = 0.01;  -- 允许 1% 的错误率

-- 查看导入错误
SELECT *
FROM system.errors
WHERE code = 'INCORRECT_DATA'
ORDER BY event_time DESC
LIMIT 10;

-- 跳过损坏的 JSON 消息
INSERT INTO my_table FORMAT JSONEachRow
SETTINGS input_format_skip_unknown_fields = 1
{"id": 1, "name": "test", "unknown_field": "value"}

13. 数据格式对比

格式导入速度文件大小自描述适用场景
CSV简单表格数据
TSVClickHouse 原生格式
JSONEachRowAPI 数据、日志
Parquet极快大数据生态、推荐
Arrow内存数据交换
ORCHadoop 生态
-- 查看所有支持的格式
SELECT name, is_input, is_output, supports_parallel_parsing
FROM system.formats
ORDER BY name;

💡 提示:大批量数据导入推荐使用 Parquet 格式,它具有最高的导入速度和最小的文件大小,且支持列类型自描述。


扩展阅读

  1. INSERT INTO 语法
  2. 数据格式
  3. S3 表函数
  4. Kafka 引擎
  5. 异步 INSERT
  6. FROM INFILE