12 - Pushgateway
12 - Pushgateway
12.1 概述
Pushgateway 是 Prometheus 生态中唯一支持 Push(推)模式的组件,用于临时存储由短期任务推送的指标数据。
使用场景
| 场景 | 说明 |
|---|---|
| 定时任务(CronJob) | 执行完成后推送结果 |
| 批处理任务 | 处理完成后推送统计 |
| 一次性任务 | 任务生命周期短于采集间隔 |
| 防火墙限制 | 目标不可达,需要主动推送 |
架构
┌──────────┐ Push ┌──────────────┐ Pull ┌──────────────┐
│ CronJob │────────►│ Pushgateway │◄───────│ Prometheus │
│ 批处理 │ │ /metrics │ │ Server │
└──────────┘ └──────────────┘ └──────────────┘
注意:Pushgateway 只是一个中间缓存层。指标被 Push 到 Pushgateway 后,仍然由 Prometheus 通过 Pull 方式从 Pushgateway 抓取。
12.2 安装与启动
# 二进制安装
PUSHGATEWAY_VERSION="1.7.0"
wget https://github.com/prometheus/pushgateway/releases/download/v${PUSHGATEWAY_VERSION}/pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64.tar.gz
tar xvfz pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64.tar.gz
sudo cp pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64/pushgateway /usr/local/bin/
# 启动
pushgateway --web.listen-address=:9091
# Docker
docker run -d \
--name=pushgateway \
-p 9091:9091 \
prom/pushgateway:v1.7.0
Prometheus 配置
scrape_configs:
- job_name: 'pushgateway'
honor_labels: true # 重要!保留推送的 job 和 instance 标签
static_configs:
- targets: ['pushgateway:9091']
重要:必须设置
honor_labels: true,否则 Prometheus 会用 Pushgateway 的地址覆盖推送的instance标签,导致指标归属混乱。
12.3 推送指标
使用 curl 推送
# 推送单个指标
echo "batch_job_duration_seconds 42.5" | \
curl --data-binary @- \
http://pushgateway:9091/metrics/job/batch_job/instance/node1
# 推送多个指标
cat <<EOF | curl --data-binary @- \
http://pushgateway:9091/metrics/job/daily_report/instance/cron01
# TYPE batch_job_duration_seconds gauge
batch_job_duration_seconds 42.5
# TYPE batch_job_processed_total gauge
batch_job_processed_total 1500
# TYPE batch_job_errors_total gauge
batch_job_errors_total 3
EOF
# 推送到分组(多标签)
echo "task_duration_seconds 120" | \
curl --data-binary @- \
'http://pushgateway:9091/metrics/job/backup/instance/db01/type/full'
使用脚本推送
#!/bin/bash
# push_metrics.sh - 批处理任务完成后推送指标
JOB_NAME="data_export"
INSTANCE="worker01"
PUSHGATEWAY="http://pushgateway:9091"
# 执行任务并计时
START_TIME=$(date +%s)
# ... 执行业务逻辑 ...
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
# 推送指标
cat <<EOF | curl --data-binary @- \
${PUSHGATEWAY}/metrics/job/${JOB_NAME}/instance/${INSTANCE}
# TYPE data_export_duration_seconds gauge
data_export_duration_seconds ${DURATION}
# TYPE data_export_records_total gauge
data_export_records_total 50000
# TYPE data_export_success gauge
data_export_success 1
# TYPE data_export_last_run_timestamp gauge
data_export_last_run_timestamp ${END_TIME}
EOF
使用 Python 客户端
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import time
registry = CollectorRegistry()
# 定义指标
duration = Gauge('job_duration_seconds', 'Job duration', registry=registry)
records = Gauge('job_processed_records', 'Processed records', registry=registry)
errors = Gauge('job_errors_total', 'Total errors', registry=registry)
success = Gauge('job_success', 'Job success flag', registry=registry)
last_run = Gauge('job_last_run_timestamp', 'Last run timestamp', registry=registry)
# 执行任务
start = time.time()
try:
# ... 业务逻辑 ...
records.set(50000)
errors.set(0)
success.set(1)
except Exception as e:
errors.set(1)
success.set(0)
finally:
duration.set(time.time() - start)
last_run.set(time.time())
# 推送到 Pushgateway
push_to_gateway(
'http://pushgateway:9091',
job='data_export',
registry=registry,
grouping_key={'instance': 'worker01'}
)
使用 Go 客户端
package main
import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
func main() {
duration := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "batch_job_duration_seconds",
Help: "Duration of the batch job",
})
processed := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "batch_job_processed_total",
Help: "Total processed items",
})
// 执行任务
start := time.Now()
// ... 业务逻辑 ...
duration.Set(time.Since(start).Seconds())
processed.Set(1500)
// 推送
err := push.New("http://pushgateway:9091", "batch_job").
Collector(duration).
Collector(processed).
Grouping("instance", "node1").
Push()
if err != nil {
panic(err)
}
}
12.4 管理指标
查看所有推送的指标
# 浏览器访问
http://pushgateway:9091
# API 查看所有指标
curl http://pushgateway:9091/api/v1/metrics
删除指标
# 删除特定 job 和 instance 的指标
curl -X DELETE http://pushgateway:9091/metrics/job/batch_job/instance/node1
# 删除特定 job 的所有指标
curl -X DELETE http://pushgateway:9091/metrics/job/batch_job
# 删除所有指标
curl -X DELETE http://pushgateway:9091/api/v1/admin/wipe
12.5 Pushgateway 注意事项
指标持久性
Pushgateway 中的指标是持久的,即使推送任务已经结束,指标仍然保留在 Pushgateway 中,直到被显式删除或 Pushgateway 重启。
T=0: 任务推送指标 ──► Pushgateway 存储
T=1m: Prometheus 抓取 (正常)
T=5m: 任务结束
T=6m: Prometheus 仍然能抓到数据 ← 指标不会自动消失!
...
最佳实践:任务应该在下次运行前删除旧指标,或在任务开始时先推送一个成功标志为 0 的指标,完成后再更新为 1。
推送时间戳
# 带时间戳推送(不推荐,使用 Pushgateway 的抓取时间更好)
echo "batch_duration 42 1609459200000" | \
curl --data-binary @- \
http://pushgateway:9091/metrics/job/batch
高可用部署
Pushgateway 本身不支持集群模式。如果需要高可用,可以:
方案1: 单实例 + 持久化
pushgateway --persistence.file=/data/pushgateway.dat
方案2: 多实例 + 负载均衡(写到所有实例)
┌──────────┐
│ 任务 │──► Pushgateway 1
│ │──► Pushgateway 2
└──────────┘
│
Prometheus 从所有实例抓取
12.6 告警规则
groups:
- name: pushgateway
rules:
# 任务执行失败
- alert: BatchJobFailed
expr: job_success == 0
for: 1m
labels:
severity: critical
annotations:
summary: "批处理任务 {{ $labels.job }} 失败"
# 任务超时未完成(数据过旧)
- alert: BatchJobStale
expr: (time() - job_last_run_timestamp) > 86400
for: 1h
labels:
severity: warning
annotations:
summary: "批处理任务 {{ $labels.job }} 超过 24 小时未运行"
# 任务处理时间过长
- alert: BatchJobSlow
expr: job_duration_seconds > 3600
for: 1m
labels:
severity: warning
annotations:
summary: "批处理任务 {{ $labels.job }} 执行时间超过 1 小时"
12.7 本章小结
| 要点 | 说明 |
|---|---|
| 用途 | 短期任务/批处理的指标推送 |
| 模式 | Push → Pushgateway → Prometheus Pull |
| honor_labels | 必须设为 true |
| 指标持久性 | 需要手动删除过期指标 |
| 高可用 | 持久化文件或多实例 |
扩展阅读
上一章:11 - Exporter 生态 下一章:13 - 联邦集群