强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

12 - Pushgateway

12 - Pushgateway

12.1 概述

Pushgateway 是 Prometheus 生态中唯一支持 Push(推)模式的组件,用于临时存储由短期任务推送的指标数据。

使用场景

场景说明
定时任务(CronJob)执行完成后推送结果
批处理任务处理完成后推送统计
一次性任务任务生命周期短于采集间隔
防火墙限制目标不可达,需要主动推送

架构

┌──────────┐  Push   ┌──────────────┐  Pull   ┌──────────────┐
│ CronJob  │────────►│ Pushgateway  │◄───────│ Prometheus   │
│ 批处理   │         │ /metrics     │         │ Server       │
└──────────┘         └──────────────┘         └──────────────┘

注意:Pushgateway 只是一个中间缓存层。指标被 Push 到 Pushgateway 后,仍然由 Prometheus 通过 Pull 方式从 Pushgateway 抓取。


12.2 安装与启动

# 二进制安装
PUSHGATEWAY_VERSION="1.7.0"
wget https://github.com/prometheus/pushgateway/releases/download/v${PUSHGATEWAY_VERSION}/pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64.tar.gz
tar xvfz pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64.tar.gz
sudo cp pushgateway-${PUSHGATEWAY_VERSION}.linux-amd64/pushgateway /usr/local/bin/

# 启动
pushgateway --web.listen-address=:9091

# Docker
docker run -d \
  --name=pushgateway \
  -p 9091:9091 \
  prom/pushgateway:v1.7.0

Prometheus 配置

scrape_configs:
  - job_name: 'pushgateway'
    honor_labels: true   # 重要!保留推送的 job 和 instance 标签
    static_configs:
      - targets: ['pushgateway:9091']

重要:必须设置 honor_labels: true,否则 Prometheus 会用 Pushgateway 的地址覆盖推送的 instance 标签,导致指标归属混乱。


12.3 推送指标

使用 curl 推送

# 推送单个指标
echo "batch_job_duration_seconds 42.5" | \
  curl --data-binary @- \
  http://pushgateway:9091/metrics/job/batch_job/instance/node1

# 推送多个指标
cat <<EOF | curl --data-binary @- \
  http://pushgateway:9091/metrics/job/daily_report/instance/cron01
# TYPE batch_job_duration_seconds gauge
batch_job_duration_seconds 42.5
# TYPE batch_job_processed_total gauge
batch_job_processed_total 1500
# TYPE batch_job_errors_total gauge
batch_job_errors_total 3
EOF

# 推送到分组(多标签)
echo "task_duration_seconds 120" | \
  curl --data-binary @- \
  'http://pushgateway:9091/metrics/job/backup/instance/db01/type/full'

使用脚本推送

#!/bin/bash
# push_metrics.sh - 批处理任务完成后推送指标

JOB_NAME="data_export"
INSTANCE="worker01"
PUSHGATEWAY="http://pushgateway:9091"

# 执行任务并计时
START_TIME=$(date +%s)
# ... 执行业务逻辑 ...
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))

# 推送指标
cat <<EOF | curl --data-binary @- \
  ${PUSHGATEWAY}/metrics/job/${JOB_NAME}/instance/${INSTANCE}
# TYPE data_export_duration_seconds gauge
data_export_duration_seconds ${DURATION}
# TYPE data_export_records_total gauge
data_export_records_total 50000
# TYPE data_export_success gauge
data_export_success 1
# TYPE data_export_last_run_timestamp gauge
data_export_last_run_timestamp ${END_TIME}
EOF

使用 Python 客户端

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import time

registry = CollectorRegistry()

# 定义指标
duration = Gauge('job_duration_seconds', 'Job duration', registry=registry)
records = Gauge('job_processed_records', 'Processed records', registry=registry)
errors = Gauge('job_errors_total', 'Total errors', registry=registry)
success = Gauge('job_success', 'Job success flag', registry=registry)
last_run = Gauge('job_last_run_timestamp', 'Last run timestamp', registry=registry)

# 执行任务
start = time.time()
try:
    # ... 业务逻辑 ...
    records.set(50000)
    errors.set(0)
    success.set(1)
except Exception as e:
    errors.set(1)
    success.set(0)
finally:
    duration.set(time.time() - start)
    last_run.set(time.time())

# 推送到 Pushgateway
push_to_gateway(
    'http://pushgateway:9091',
    job='data_export',
    registry=registry,
    grouping_key={'instance': 'worker01'}
)

使用 Go 客户端

package main

import (
    "net/http"
    "time"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/push"
)

func main() {
    duration := prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "batch_job_duration_seconds",
        Help: "Duration of the batch job",
    })
    processed := prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "batch_job_processed_total",
        Help: "Total processed items",
    })

    // 执行任务
    start := time.Now()
    // ... 业务逻辑 ...
    duration.Set(time.Since(start).Seconds())
    processed.Set(1500)

    // 推送
    err := push.New("http://pushgateway:9091", "batch_job").
        Collector(duration).
        Collector(processed).
        Grouping("instance", "node1").
        Push()
    if err != nil {
        panic(err)
    }
}

12.4 管理指标

查看所有推送的指标

# 浏览器访问
http://pushgateway:9091

# API 查看所有指标
curl http://pushgateway:9091/api/v1/metrics

删除指标

# 删除特定 job 和 instance 的指标
curl -X DELETE http://pushgateway:9091/metrics/job/batch_job/instance/node1

# 删除特定 job 的所有指标
curl -X DELETE http://pushgateway:9091/metrics/job/batch_job

# 删除所有指标
curl -X DELETE http://pushgateway:9091/api/v1/admin/wipe

12.5 Pushgateway 注意事项

指标持久性

Pushgateway 中的指标是持久的,即使推送任务已经结束,指标仍然保留在 Pushgateway 中,直到被显式删除或 Pushgateway 重启。

T=0:   任务推送指标 ──► Pushgateway 存储
T=1m:  Prometheus 抓取 (正常)
T=5m:  任务结束
T=6m:  Prometheus 仍然能抓到数据 ← 指标不会自动消失!
...

最佳实践:任务应该在下次运行前删除旧指标,或在任务开始时先推送一个成功标志为 0 的指标,完成后再更新为 1。

推送时间戳

# 带时间戳推送(不推荐,使用 Pushgateway 的抓取时间更好)
echo "batch_duration 42 1609459200000" | \
  curl --data-binary @- \
  http://pushgateway:9091/metrics/job/batch

高可用部署

Pushgateway 本身不支持集群模式。如果需要高可用,可以:

方案1: 单实例 + 持久化
  pushgateway --persistence.file=/data/pushgateway.dat

方案2: 多实例 + 负载均衡(写到所有实例)
  ┌──────────┐
  │ 任务     │──► Pushgateway 1
  │          │──► Pushgateway 2
  └──────────┘
       │
  Prometheus 从所有实例抓取

12.6 告警规则

groups:
  - name: pushgateway
    rules:
      # 任务执行失败
      - alert: BatchJobFailed
        expr: job_success == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "批处理任务 {{ $labels.job }} 失败"

      # 任务超时未完成(数据过旧)
      - alert: BatchJobStale
        expr: (time() - job_last_run_timestamp) > 86400
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "批处理任务 {{ $labels.job }} 超过 24 小时未运行"

      # 任务处理时间过长
      - alert: BatchJobSlow
        expr: job_duration_seconds > 3600
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "批处理任务 {{ $labels.job }} 执行时间超过 1 小时"

12.7 本章小结

要点说明
用途短期任务/批处理的指标推送
模式Push → Pushgateway → Prometheus Pull
honor_labels必须设为 true
指标持久性需要手动删除过期指标
高可用持久化文件或多实例

扩展阅读


上一章11 - Exporter 生态 下一章13 - 联邦集群