ClickHouse 教程 / 数据导入(CSV/JSON/Parquet)
数据导入(CSV/JSON/Parquet)
ClickHouse 支持多种数据导入方式,从简单的命令行导入到实时流式摄入,满足各种数据集成场景。
1. 命令行导入
1.1 clickhouse-client 直接导入
# CSV 文件导入(最常用)
clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < data.csv
# 指定分隔符的 CSV
clickhouse-client --query "INSERT INTO my_table FORMAT CSVWithNames" < data_with_header.csv
# TabSeparated(TSV)格式
clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparated" < data.tsv
# 带标题行的 TSV
clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparatedWithNames" < data.tsv
# 后台执行(大数据量时使用)
nohup clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < big_data.csv &
1.2 多线程并行导入
# 使用 parallel 命令加速
# 将大文件拆分为多个小文件
split -l 1000000 big_data.csv chunk_
# 并行导入
ls chunk_* | xargs -P 4 -I {} sh -c 'clickhouse-client --query "INSERT INTO my_table FORMAT CSV" < {}'
1.3 clickhouse-local 查询本地文件
# 查询本地文件(不需要服务器)
clickhouse-local --query "
SELECT *
FROM file('data.csv', 'CSV', 'id UInt64, name String, value Float64')
LIMIT 10
"
# 将本地文件导入服务器
clickhouse-local --query "
SELECT * FROM file('data.csv', 'CSV', 'id UInt64, name String, value Float64')
" | clickhouse-client --query "INSERT INTO my_table FORMAT TabSeparated"
2. INSERT INTO … FORMAT
2.1 支持的格式
-- CSV 格式
INSERT INTO my_table FORMAT CSV
1,'Alice',28,'Beijing'
2,'Bob',35,'Shanghai'
3,'Carol',22,'Guangzhou'
-- CSVWithNames(第一行为列名)
INSERT INTO my_table FORMAT CSVWithNames
id,name,age,city
1,'Alice',28,'Beijing'
2,'Bob',35,'Shanghai'
-- JSONEachRow(每行一个 JSON 对象)
INSERT INTO my_table FORMAT JSONEachRow
{"id": 1, "name": "Alice", "age": 28, "city": "Beijing"}
{"id": 2, "name": "Bob", "age": 35, "city": "Shanghai"}
-- JSONCompactEachRow
INSERT INTO my_table FORMAT JSONCompactEachRow
[1, "Alice", 28, "Beijing"]
[2, "Bob", 35, "Shanghai"]
-- Parquet
INSERT INTO my_table FORMAT Parquet < data.parquet
-- Arrow
INSERT INTO my_table FORMAT Arrow < data.arrow
-- ORC
INSERT INTO my_table FORMAT ORC < data.orc
-- MsgPack
INSERT INTO my_table FORMAT MsgPack < data.msgpack
-- 自定义分隔符
INSERT INTO my_table FORMAT CustomSeparated
1|Alice|28|Beijing
2|Bob|35|Shanghai
SETTINGS format_custom_escaping_rule = 'CSV',
format_custom_field_delimiter = '|'
2.2 格式选项设置
-- CSV 导入选项
INSERT INTO my_table FORMAT CSV SETTINGS
input_format_csv_skip_first_lines = 1, -- 跳过首行(标题)
input_format_csv_allow_single_quotes = 0, -- 禁止单引号
input_format_csv_use_default_on_missing_values = 1; -- 缺失值用默认值
-- JSON 导入选项
INSERT INTO my_table FORMAT JSONEachRow SETTINGS
input_format_json_named_tuples_as_objects = 1, -- Tuple 映射为 JSON 对象
input_format_json_defaults_for_missing_elements = 1, -- 缺失字段用默认值
input_format_json_read_objects_as_strings = 0;
-- Parquet 导入选项
INSERT INTO my_table FORMAT Parquet SETTINGS
input_format_parquet_import_nested = 1; -- 导入嵌套结构
⚠️ 注意:CSV 格式不支持 NULL 值的原生表示。使用 \N 表示 NULL,或在 Nullable 列中使用空值。
3. INSERT INTO … FROM INFILE
ClickHouse 22.8+ 支持直接从客户端本地文件导入:
-- 从本地文件导入(客户端机器上的文件)
INSERT INTO my_table FROM INFILE '/tmp/data.csv' FORMAT CSV;
-- 支持压缩文件
INSERT INTO my_table FROM INFILE '/tmp/data.csv.gz' FORMAT CSV;
-- 带设置的导入
INSERT INTO my_table FROM INFILE '/tmp/data.csv' FORMAT CSV
SETTINGS
input_format_csv_skip_first_lines = 1,
max_insert_block_size = 1000000;
-- Parquet 文件
INSERT INTO my_table FROM INFILE '/tmp/data.parquet' FORMAT Parquet;
-- JSON 文件
INSERT INTO my_table FROM INFILE '/tmp/data.jsonl' FORMAT JSONEachRow;
💡 提示:FROM INFILE 的文件路径是相对于客户端进程的工作目录,而非服务器目录。这在容器化环境中特别方便。
4. HTTP 接口导入
4.1 基本 HTTP 导入
# 使用 curl 导入 CSV
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
--data-binary @data.csv
# 使用 curl 导入 JSON
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+JSONEachRow' \
-d '{"id": 1, "name": "Alice"}\n{"id": 2, "name": "Bob"}'
# 使用 curl 导入 Parquet
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+Parquet' \
--data-binary @data.parquet
# 带认证的导入
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
-u default:password \
--data-binary @data.csv
4.2 分批导入
# 大文件分批导入(每批 100 万行)
split -l 1000000 big_data.csv chunk_
for f in chunk_*; do
curl 'http://localhost:8123/?query=INSERT+INTO+my_table+FORMAT+CSV' \
--data-binary @"$f"
echo "Imported $f"
rm "$f"
done
4.3 HTTP 接口查询
# 查询并导出为 JSON
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+JSON'
# 查询并导出为 CSV
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+CSVWithNames'
# 查询并下载 Parquet
curl 'http://localhost:8123/?query=SELECT+*+FROM+my_table+FORMAT+Parquet' \
-o output.parquet
5. 文件引擎 File()
5.1 使用 File 引擎查询本地文件
-- 创建基于本地文件的表
CREATE TABLE temp_csv
(
id UInt64,
name String,
value Float64
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv');
-- 查询文件内容
SELECT * FROM temp_csv;
-- 将文件数据写入另一张表
INSERT INTO my_table SELECT * FROM temp_csv;
5.2 支持压缩文件
-- 自动检测压缩格式
CREATE TABLE temp_compressed
(
id UInt64,
data String
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv.gz');
-- 支持多种压缩格式: .gz, .zst, .xz, .bz2, .lz4, .sz
CREATE TABLE temp_zst
(
id UInt64,
data String
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/data.csv.zst');
5.3 写入文件
-- 将查询结果写入本地文件
CREATE TABLE output_csv
(
id UInt64,
name String,
total Float64
)
ENGINE = File('CSV', '/var/lib/clickhouse/user_files/output.csv');
INSERT INTO output_csv
SELECT id, name, sum(value) AS total
FROM my_table
GROUP BY id, name;
6. 表函数 file()
6.1 一次性查询本地文件
-- 使用 file() 表函数直接查询(无需建表)
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'id UInt64, name String, value Float64');
-- 带压缩的文件
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv.gz', 'CSV', 'id UInt64, name String');
-- 查看文件结构(自动推断列类型)
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'c1 String, c2 String, c3 String')
LIMIT 5;
-- 使用 file() 写入数据
INSERT INTO my_table
SELECT *
FROM file('/tmp/import_data.csv', 'CSV', 'id UInt64, name String, value Float64');
6.2 自动推断列类型
-- ClickHouse 23.x+ 支持自动推断列类型
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV')
LIMIT 10;
-- 使用 structure 自动推断
SELECT *
FROM file('/var/lib/clickhouse/user_files/data.csv', 'CSV', 'auto')
LIMIT 10;
💡 提示:file() 表函数适合临时查询和一次性数据导入,无需创建持久表。
7. S3 函数导入
7.1 从 S3 读取数据
-- 直接查询 S3 文件
SELECT *
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events.csv',
'CSV',
'id UInt64, name String, event_time DateTime'
);
-- 使用凭证访问私有 S3
SELECT *
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events.csv',
'my-access-key',
'my-secret-key',
'CSV',
'id UInt64, name String'
);
-- 从 S3 导入到表
INSERT INTO my_table
SELECT *
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events.parquet',
'Parquet'
);
-- 通配符匹配多个文件
SELECT count()
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events_*.csv',
'CSV',
'id UInt64, name String'
);
-- 查询 Parquet 文件(自动推断列类型)
SELECT *
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events.parquet'
)
LIMIT 10;
-- 使用分区模式
SELECT *
FROM s3Cluster(
'default', -- 集群名
'https://my-bucket.s3.amazonaws.com/data/events_{0..99}.parquet',
'Parquet'
);
7.2 S3 引擎表
-- 创建 S3 引擎表
CREATE TABLE s3_table
(
id UInt64,
name String,
value Float64
)
ENGINE = S3(
'https://my-bucket.s3.amazonaws.com/data/table/',
'CSV',
'my-access-key',
'my-secret-key'
);
-- 读写 S3 表
SELECT * FROM s3_table LIMIT 10;
INSERT INTO s3_table VALUES (1, 'test', 3.14);
7.3 导出到 S3
-- 将查询结果导出到 S3
INSERT INTO FUNCTION s3(
'https://my-bucket.s3.amazonaws.com/export/results.parquet',
'my-access-key',
'my-secret-key',
'Parquet'
)
SELECT
toStartOfHour(event_time) AS hour,
count() AS events,
uniq(user_id) AS users
FROM events
GROUP BY hour;
8. URL 函数
-- 从 HTTP URL 查询数据
SELECT *
FROM url(
'https://example.com/data.csv',
'CSV',
'id UInt64, name String'
);
-- 导入到表
INSERT INTO my_table
SELECT *
FROM url(
'https://example.com/data.jsonl',
'JSONEachRow',
'id UInt64, name String'
);
-- 使用 URL 处理器处理分页数据
-- ClickHouse 内置支持 HTTP 重定向
9. Kafka 引擎导入
9.1 Kafka 引擎配置
-- 创建 Kafka 引擎表
CREATE TABLE kafka_events
(
event_id UInt64,
user_id UInt64,
event_type String,
event_time DateTime,
properties String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4,
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 1;
-- 创建目标表
CREATE TABLE events
(
event_id UInt64,
user_id UInt64,
event_type LowCardinality(String),
event_time DateTime,
properties Map(String, String)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);
-- 创建物化视图自动将 Kafka 数据写入目标表
CREATE MATERIALIZED VIEW kafka_events_mv TO events AS
SELECT
event_id,
user_id,
event_type,
event_time,
JSONExtract(properties, 'Map(String, String)') AS properties
FROM kafka_events;
9.2 Kafka 消费者监控
-- 查看 Kafka 消费者状态
SELECT *
FROM system.consumers
WHERE database = currentDatabase();
-- 查看 Kafka 表的消费进度
SELECT
table,
metadata_path,
is_readonly
FROM system.tables
WHERE engine = 'Kafka'
AND database = currentDatabase();
⚠️ 注意:Kafka 引擎的数据消费和目标表写入是独立的。物化视图负责数据转换和路由。如果目标表写入失败,Kafka 偏移量不会提交,数据会被重新消费。
10. 物化视图实时导入
10.1 基本实时导入
-- 原始数据表
CREATE TABLE raw_logs
(
log_time DateTime,
service String,
level String,
message String
)
ENGINE = MergeTree()
ORDER BY log_time;
-- 目标统计表
CREATE TABLE service_stats
(
log_date Date,
service LowCardinality(String),
level LowCardinality(String),
log_count UInt64
)
ENGINE = SummingMergeTree(log_count)
ORDER BY (log_date, service, level);
-- 物化视图:实时预聚合
CREATE MATERIALIZED VIEW mv_service_stats TO service_stats AS
SELECT
toDate(log_time) AS log_date,
service,
level,
count() AS log_count
FROM raw_logs
GROUP BY log_date, service, level;
-- 查询统计(使用 SummingMergeTree 自动合并)
SELECT
log_date,
service,
level,
sum(log_count) AS total_count
FROM service_stats
GROUP BY log_date, service, level;
10.2 多目标物化视图
-- 一个原始表可以有多个物化视图
-- 实时统计每小时的错误率
CREATE MATERIALIZED VIEW mv_hourly_errors
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, service)
AS
SELECT
toStartOfHour(log_time) AS hour,
service,
countState() AS total_count,
countIfState(level = 'ERROR') AS error_count,
countIfState(level = 'WARN') AS warn_count
FROM raw_logs
GROUP BY hour, service;
-- 查询
SELECT
hour,
service,
countMerge(total_count) AS total,
countIfMerge(error_count) AS errors,
round(countIfMerge(error_count) / countMerge(total_count) * 100, 2) AS error_rate
FROM mv_hourly_errors
GROUP BY hour, service
ORDER BY hour DESC;
11. 导入性能优化
11.1 max_insert_block_size
-- 控制单个 INSERT 块的大小
SET max_insert_block_size = 1048576; -- 默认 1048576 行
-- 大批量导入时可以增大
SET max_insert_block_size = 4194304; -- 400 万行
-- 查看当前设置
SELECT name, value
FROM system.settings
WHERE name = 'max_insert_block_size';
11.2 async_insert(异步写入)
-- 启用异步 INSERT(适合高频小批量写入)
SET async_insert = 1;
-- 异步 INSERT 配置
SET async_insert_max_data_size = 10485760; -- 10MB 后 flush
SET async_insert_busy_timeout_ms = 200; -- 最长等待 200ms
-- 验证异步 INSERT 是否生效
INSERT INTO my_table (id, name) VALUES (1, 'test');
-- 数据不会立即可见,等待 flush 后可见
-- 创建表时配置异步 INSERT
CREATE TABLE async_table
(
id UInt64,
data String
)
ENGINE = MergeTree()
ORDER BY id
SETTINGS async_insert = 1;
💡 提示:async_insert 适合 Kafka 消费者、HTTP 写入等高频小批量场景。ClickHouse 会在内存中缓冲数据,达到阈值或超时后批量写入,显著减少小 part 的产生。
11.3 批量导入优化参数
-- 批量导入优化设置
SET
max_insert_block_size = 1048576,
min_insert_block_size_rows = 1048576,
min_insert_block_size_bytes = 268435456, -- 256MB
async_insert = 0; -- 批量导入时关闭异步
-- 插入大批量数据时禁用副本同步检查(仅限副本表)
SET insert_deduplicate = 0;
-- 禁用写入后的同步 merge
SET optimize_on_insert = 0;
-- 设置写入超时
SET insert_distributed_timeout = 30000; -- 30 秒
-- 查看当前写入性能
SELECT
event_time,
query,
written_rows,
formatReadableSize(written_bytes) AS written_size,
query_duration_ms,
round(written_rows / query_duration_ms * 1000) AS rows_per_sec
FROM system.query_log
WHERE type = 'QueryFinish'
AND query LIKE 'INSERT INTO%'
ORDER BY event_time DESC
LIMIT 10;
11.4 并行写入
-- 分布式表并行写入
-- 创建分布式表
CREATE TABLE my_table_distributed AS my_table
ENGINE = Distributed('cluster', 'default', 'my_table', rand());
-- 并行写入分布式表
-- ClickHouse 自动将数据路由到不同分片
INSERT INTO my_table_distributed
SELECT * FROM s3('https://bucket/data.csv', 'CSV', 'id UInt64, name String');
12. 大批量导入策略
12.1 导入流程设计
数据源 → 预处理 → 批量导入 → 数据验证
│
▼
┌───────────────────┐
│ 1. 拆分大文件 │
│ 2. 并行写入 │
│ 3. 监控写入进度 │
│ 4. 数据校验 │
└───────────────────┘
12.2 导入监控
-- 查看当前正在执行的 INSERT 语句
SELECT
query_id,
user,
query,
formatReadableSize(memory_usage) AS memory,
elapsed,
read_rows,
written_rows,
formatReadableSize(written_bytes) AS written_size,
progress
FROM system.processes
WHERE query LIKE 'INSERT INTO%';
-- 查看系统写入统计
SELECT
event_time,
query,
written_rows,
formatReadableSize(written_bytes) AS written,
query_duration_ms AS duration_ms,
round(written_rows / query_duration_ms * 1000, 0) AS rows_per_second
FROM system.query_log
WHERE type = 'QueryFinish'
AND query LIKE 'INSERT INTO%'
AND event_time > now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;
12.3 导入错误处理
-- 允许一定比例的错误行
INSERT INTO my_table
SELECT *
FROM file('data.csv', 'CSV', 'id UInt64, name String')
SETTINGS
input_format_allow_errors_num = 100, -- 允许 100 行错误
input_format_allow_errors_ratio = 0.01; -- 允许 1% 的错误率
-- 查看导入错误
SELECT *
FROM system.errors
WHERE code = 'INCORRECT_DATA'
ORDER BY event_time DESC
LIMIT 10;
-- 跳过损坏的 JSON 消息
INSERT INTO my_table FORMAT JSONEachRow
SETTINGS input_format_skip_unknown_fields = 1
{"id": 1, "name": "test", "unknown_field": "value"}
13. 数据格式对比
| 格式 | 导入速度 | 文件大小 | 自描述 | 适用场景 |
|---|---|---|---|---|
| CSV | 快 | 中 | ❌ | 简单表格数据 |
| TSV | 快 | 中 | ❌ | ClickHouse 原生格式 |
| JSONEachRow | 中 | 大 | ✅ | API 数据、日志 |
| Parquet | 极快 | 小 | ✅ | 大数据生态、推荐 |
| Arrow | 快 | 中 | ✅ | 内存数据交换 |
| ORC | 快 | 小 | ✅ | Hadoop 生态 |
-- 查看所有支持的格式
SELECT name, is_input, is_output, supports_parallel_parsing
FROM system.formats
ORDER BY name;
💡 提示:大批量数据导入推荐使用 Parquet 格式,它具有最高的导入速度和最小的文件大小,且支持列类型自描述。