Perl 完全指南 / 第 25 章:实战项目
第 25 章:实战项目
“纸上得来终觉浅,绝知此事要躬行” — 陆游
本章通过三个完整的实战项目,将前面所学知识融会贯通。每个项目都是可运行的生产级代码。
项目一:日志分析工具
需求
分析 Nginx/Apache 访问日志,输出统计报告:
- 请求总数和时间段
- 状态码分布
- Top 10 IP 地址
- Top 10 请求路径
- 平均响应大小
- 请求速率(QPS)
完整代码
#!/usr/bin/env perl
# log_analyzer.pl — 日志分析工具
use strict;
use warnings;
use Getopt::Long;
use POSIX qw(strftime);
my %opt = (
top => 10,
);
GetOptions(
"file|f=s" => \$opt{file},
"top|t=i" => \$opt{top},
"help|h" => sub { usage(); exit },
) or die "参数错误\n";
usage() and exit unless $opt{file};
die "文件不存在: $opt{file}\n" unless -f $opt{file};
# Nginx 日志正则
my $log_re = qr/
^(?<ip>\S+) # 客户端 IP
\s+\S+ # -
\s+\S+ # 用户标识
\s+\[(?<time>[^\]]+)\] # 时间
\s+"(?<method>\S+) # 请求方法
\s+(?<path>\S+) # 请求路径
\s+\S+" # HTTP 版本
\s+(?<status>\d{3}) # 状态码
\s+(?<size>\d+) # 响应大小
/x;
# 统计数据
my %stats = (
total => 0,
by_status => {},
by_ip => {},
by_path => {},
total_size => 0,
first_time => "",
last_time => "",
);
# 解析日志
open my $fh, '<:encoding(UTF-8)', $opt{file} or die "打开失败: $!\n";
while (my $line = <$fh>) {
if ($line =~ $log_re) {
$stats{total}++;
$stats{by_status}{$+{status}}++;
$stats{by_ip}{$+{ip}}++;
$stats{by_path}{$+{path}}++;
$stats{total_size} += $+{size};
$stats{first_time} //= $+{time};
$stats{last_time} = $+{time};
}
}
close $fh;
# 输出报告
print_report(\%stats, $opt{top});
sub print_report {
my ($s, $top_n) = @_;
print "=" x 60, "\n";
print "日志分析报告\n";
print "=" x 60, "\n\n";
# 总览
printf "日志文件: %s\n", $opt{file};
printf "时间范围: %s ~ %s\n", $s->{first_time}, $s->{last_time};
printf "请求总数: %s\n", format_number($s->{total});
printf "总流量: %s\n", format_size($s->{total_size});
printf "平均请求大小: %s\n", $s->{total} ? format_size($s->{total_size} / $s->{total}) : "N/A";
# 状态码分布
print "\n", "-" x 40, "\n";
print "状态码分布:\n";
print "-" x 40, "\n";
for my $code (sort keys %{$s->{by_status}}) {
my $count = $s->{by_status}{$code};
my $pct = $count / $s->{total} * 100;
my $bar = "█" x int($pct / 2);
printf " %s: %8s (%5.1f%%) %s\n",
$code, format_number($count), $pct, $bar;
}
# Top N IP
print "\n", "-" x 40, "\n";
printf "Top %d IP 地址:\n", $top_n;
print "-" x 40, "\n";
my @top_ips = sort { $s->{by_ip}{$b} <=> $s->{by_ip}{$a} }
keys %{$s->{by_ip}};
for my $i (0 .. $top_n - 1) {
last unless $top_ips[$i];
printf " %2d. %-18s %8s\n",
$i + 1, $top_ips[$i],
format_number($s->{by_ip}{$top_ips[$i]});
}
# Top N 路径
print "\n", "-" x 40, "\n";
printf "Top %d 请求路径:\n", $top_n;
print "-" x 40, "\n";
my @top_paths = sort { $s->{by_path}{$b} <=> $s->{by_path}{$a} }
keys %{$s->{by_path}};
for my $i (0 .. $top_n - 1) {
last unless $top_paths[$i];
my $path = $top_paths[$i];
$path = substr($path, 0, 35) . "..." if length($path) > 38;
printf " %2d. %-38s %8s\n",
$i + 1, $path,
format_number($s->{by_path}{$top_paths[$i]});
}
print "\n", "=" x 60, "\n";
}
sub format_number {
my ($n) = @_;
my $str = reverse sprintf("%d", $n);
$str =~ s/(\d{3})(?=\d)/$1,/g;
return reverse $str;
}
sub format_size {
my ($bytes) = @_;
return sprintf "%.1f B", $bytes if $bytes < 1024;
return sprintf "%.1f KB", $bytes / 1024 if $bytes < 1048576;
return sprintf "%.1f MB", $bytes / 1048576 if $bytes < 1073741824;
return sprintf "%.1f GB", $bytes / 1073741824;
}
sub usage {
print <<USAGE;
用法: $0 -f <日志文件> [-t <Top N>]
选项:
-f, --file FILE 日志文件路径(必填)
-t, --top N 显示 Top N 条目(默认: 10)
-h, --help 显示帮助
示例:
$0 -f /var/log/nginx/access.log
$0 -f access.log -t 20
USAGE
}
项目二:系统管理自动化工具
需求
一个系统巡检工具,检查:
- 磁盘使用率
- 内存使用
- CPU 负载
- 重要服务状态
- 安全检查(SSH 失败登录)
完整代码
#!/usr/bin/env perl
# syscheck.pl — 系统巡检工具
use strict;
use warnings;
use POSIX qw(strftime);
use Getopt::Long;
my %opt = (
disk_warn => 80,
disk_crit => 90,
mem_warn => 80,
load_warn => 5.0,
email => "",
);
GetOptions(
"disk-warn=i" => \$opt{disk_warn},
"disk-crit=i" => \$opt{disk_crit},
"mem-warn=i" => \$opt{mem_warn},
"load-warn=f" => \$opt{load_warn},
"email=s" => \$opt{email},
"help|h" => sub { usage(); exit },
) or die;
my $timestamp = strftime("%Y-%m-%d %H:%M:%S", localtime);
my $hostname = `hostname -f` || `hostname`;
chomp $hostname;
my @issues;
my @report;
push @report, "=" x 60;
push @report, "系统巡检报告";
push @report, "主机: $hostname";
push @report, "时间: $timestamp";
push @report, "=" x 60;
push @report, "";
# 1. 磁盘检查
check_disk();
# 2. 内存检查
check_memory();
# 3. 负载检查
check_load();
# 4. 服务检查
check_services();
# 5. 安全检查
check_security();
# 输出报告
print join("\n", @report), "\n";
if (@issues) {
print "\n", "!" x 60, "\n";
print "发现 ", scalar(@issues), " 个问题:\n";
print "!" x 60, "\n";
for my $issue (@issues) {
print " ⚠ $issue\n";
}
# 发送邮件告警(如果有配置)
send_alert() if $opt{email};
}
# === 子程序 ===
sub check_disk {
push @report, "--- 磁盘使用 ---";
push @report, sprintf " %-20s %8s %8s %8s %6s %s",
"挂载点", "总计", "已用", "可用", "使用率", "状态";
push @report, " " . "-" x 70;
open my $fh, '-|', 'df -h --output=target,size,used,avail,pcent 2>/dev/null || df -h';
while (my $line = <$fh>) {
chomp $line;
next if $line =~ /^Filesystem|^文件系统/;
my @f = split /\s+/, $line;
next unless @f >= 5;
my ($mount, $size, $used, $avail, $pct) = @f;
$pct =~ s/%//;
my $status = "✅";
if ($pct >= $opt{disk_crit}) {
$status = "🔴";
push @issues, "磁盘 $mount 使用率 ${pct}% (超过 ${opt{disk_crit}}% 阈值)";
} elsif ($pct >= $opt{disk_warn}) {
$status = "🟡";
push @issues, "磁盘 $mount 使用率 ${pct}% (超过 ${opt{disk_warn}}% 阈值)";
}
push @report, sprintf " %-20s %8s %8s %8s %6s %s",
$mount, $size, $used, $avail, "${pct}%", $status;
}
close $fh;
push @report, "";
}
sub check_memory {
push @report, "--- 内存使用 ---";
open my $fh, '<', '/proc/meminfo' or do {
push @report, " 无法读取 /proc/meminfo\n";
return;
};
my %mem;
while (<$fh>) {
if (/^(\w+):\s+(\d+)/) {
$mem{$1} = $2;
}
}
close $fh;
my $total = $mem{MemTotal} || 1;
my $available = $mem{MemAvailable} // ($mem{MemFree} + $mem{Buffers} + $mem{Cached});
my $used = $total - $available;
my $pct = sprintf "%.1f", ($used / $total) * 100;
my $status = "✅";
if ($pct >= $opt{mem_warn}) {
$status = "🟡";
push @issues, "内存使用率 ${pct}%";
}
push @report, sprintf " 总计: %d MB | 已用: %d MB | 可用: %d MB | 使用率: %s%% %s",
$total / 1024, $used / 1024, $available / 1024, $pct, $status;
# Swap
if ($mem{SwapTotal}) {
my $swap_total = $mem{SwapTotal};
my $swap_free = $mem{SwapFree};
my $swap_used = $swap_total - $swap_free;
my $swap_pct = sprintf "%.1f", ($swap_used / $swap_total) * 100;
push @report, sprintf " Swap: %d MB / %d MB (%s%%)",
$swap_used / 1024, $swap_total / 1024, $swap_pct;
}
push @report, "";
}
sub check_load {
push @report, "--- CPU 负载 ---";
open my $fh, '<', '/proc/loadavg' or do {
push @report, " 无法读取 /proc/loadavg\n";
return;
};
my $line = <$fh>;
close $fh;
chomp $line;
my @load = split /\s+/, $line;
my $status = "✅";
if ($load[0] > $opt{load_warn}) {
$status = "🟡";
push @issues, "CPU 负载过高: $load[0]";
}
push @report, sprintf " 1分钟: %s | 5分钟: %s | 15分钟: %s %s",
$load[0], $load[1], $load[2], $status;
push @report, sprintf " 运行进程: %s", $load[3];
push @report, "";
}
sub check_services {
push @report, "--- 服务状态 ---";
my @services = qw(nginx sshd cron);
for my $svc (@services) {
my $status = `systemctl is-active $svc 2>/dev/null` || "unknown";
chomp $status;
my $icon = $status eq "active" ? "✅" : "🔴";
push @report, sprintf " %-15s %s %s", $svc, $icon, $status;
if ($status ne "active") {
push @issues, "服务 $svc 未运行 ($status)";
}
}
push @report, "";
}
sub check_security {
push @report, "--- 安全检查 ---";
# 检查 SSH 失败登录
if (-f '/var/log/auth.log') {
my $fail_count = `grep -c "Failed password" /var/log/auth.log 2>/dev/null || echo 0`;
chomp $fail_count;
if ($fail_count > 100) {
push @issues, "SSH 失败登录次数较多: $fail_count";
}
push @report, " SSH 失败登录: $fail_count 次";
}
# 检查最近登录
my @last = `last -n 5 2>/dev/null`;
if (@last) {
push @report, " 最近登录:";
for my $line (@last[0..4]) {
chomp $line;
push @report, " $line" if $line;
}
}
push @report, "";
}
sub send_alert {
use MIME::Lite;
my $body = join("\n", @report) . "\n\n问题:\n" .
join("\n", map { " - $_" } @issues);
my $msg = MIME::Lite->new(
From => 'monitor@localhost',
To => $opt{email},
Subject => "[告警] 系统巡检 - $hostname",
Data => $body,
);
$msg->send('sendmail', '/usr/sbin/sendmail -t');
print "\n告警邮件已发送至 $opt{email}\n";
}
sub usage {
print <<USAGE;
用法: $0 [选项]
选项:
--disk-warn N 磁盘使用率警告阈值(默认: 80%)
--disk-crit N 磁盘使用率严重阈值(默认: 90%)
--mem-warn N 内存使用率警告阈值(默认: 80%)
--load-warn N CPU 负载警告阈值(默认: 5.0)
--email ADDR 发送告警邮件地址
-h, --help 显示帮助
USAGE
}
项目三:RESTful Web API 服务
需求
一个任务管理 REST API:
- CRUD 操作
- JWT 认证
- SQLite 存储
- 输入验证
- 错误处理
- API 文档
完整代码
#!/usr/bin/env perl
# task_api.pl — 任务管理 REST API
use Mojolicious::Lite -signatures;
use Mojo::SQLite;
use Mojo::JWT;
use Crypt::Passphrase;
use Crypt::Passphrase::Bcrypt;
# 配置
app->secrets(['your-secret-key-change-this']);
my $jwt_secret = 'jwt-secret-change-this';
# 数据库
my $sql = Mojo::SQLite->new('sqlite:tasks.db');
$sql->db->query(<<'SQL');
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
created DATETIME DEFAULT CURRENT_TIMESTAMP
)
SQL
$sql->db->query(<<'SQL');
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'pending',
priority INTEGER DEFAULT 0,
created DATETIME DEFAULT CURRENT_TIMESTAMP,
updated DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
SQL
my $passphrase = Crypt::Passphrase->new(
encoder => 'Bcrypt',
);
# === 认证中间件 ===
under '/api' => sub ($c) {
my $auth = $c->req->headers->authorization // '';
if ($auth =~ /^Bearer\s+(.+)$/) {
my $token = $1;
eval {
my $payload = Mojo::JWT->new(secret => $jwt_secret)->decode($token);
$c->stash(user_id => $payload->{user_id});
};
unless ($@) {
return 1;
}
}
$c->render(json => {error => "未认证"}, status => 401);
return 0;
};
# === 认证路由(不需要认证)===
group {
# 前面的 under 已经限定了 /api 前缀
# 这里放不需要认证的路由
};
# 重新定义路由(不含认证)
app->routes->post('/api/register' => sub ($c) {
my $data = $c->req->json;
my $username = $data->{username} // '';
my $password = $data->{password} // '';
unless ($username =~ /^\w{3,20}$/ && length($password) >= 6) {
return $c->render(json => {
error => "用户名 3-20 字符,密码至少 6 位"
}, status => 400);
}
my $hashed = $passphrase->hash_password($password);
eval {
$sql->db->query(
'INSERT INTO users (username, password) VALUES (?, ?)',
$username, $hashed
);
};
if ($@ && $@ =~ /unique/i) {
return $c->render(json => {error => "用户名已存在"}, status => 409);
} elsif ($@) {
return $c->render(json => {error => "注册失败"}, status => 500);
}
$c->render(json => {success => 1, message => "注册成功"}, status => 201);
});
app->routes->post('/api/login' => sub ($c) {
my $data = $c->req->json;
my $username = $data->{username} // '';
my $password = $data->{password} // '';
my $user = $sql->db->query(
'SELECT id, password FROM users WHERE username = ?', $username
)->hash;
unless ($user && $passphrase->verify_password($password, $user->{password})) {
return $c->render(json => {error => "用户名或密码错误"}, status => 401);
}
my $token = Mojo::JWT->new(
secret => $jwt_secret,
expires => time + 86400, # 24 小时
claims => { user_id => $user->{id} },
)->encode;
$c->render(json => {token => $token, user_id => $user->{id}});
});
# === 任务 CRUD(需要认证)===
app->routes->get('/api/tasks' => sub ($c) {
my $user_id = $c->stash('user_id');
my $status = $c->param('status');
my $limit = $c->param('limit') // 50;
my $offset = $c->param('offset') // 0;
my @where = ("user_id = ?");
my @bind = ($user_id);
if ($status && $status =~ /^(pending|in_progress|done)$/) {
push @where, "status = ?";
push @bind, $status;
}
my $where = join(" AND ", @where);
my $tasks = $sql->db->query(
"SELECT * FROM tasks WHERE $where ORDER BY priority DESC, created DESC LIMIT ? OFFSET ?",
@bind, $limit, $offset
)->hashes;
my $total = $sql->db->query(
"SELECT COUNT(*) FROM tasks WHERE $where", @bind
)->array->[0];
$c->render(json => {tasks => $tasks, total => $total});
});
app->routes->post('/api/tasks' => sub ($c) {
my $user_id = $c->stash('user_id');
my $data = $c->req->json;
unless ($data->{title} && length($data->{title}) > 0) {
return $c->render(json => {error => "title 必填"}, status => 400);
}
my $id = $sql->db->query(
'INSERT INTO tasks (user_id, title, description, priority) VALUES (?, ?, ?, ?)',
$user_id, $data->{title}, $data->{description} // '', $data->{priority} // 0
)->last_insert_id;
my $task = $sql->db->query('SELECT * FROM tasks WHERE id = ?', $id)->hash;
$c->render(json => $task, status => 201);
});
app->routes->get('/api/tasks/:id' => sub ($c) {
my $user_id = $c->stash('user_id');
my $id = $c->param('id');
my $task = $sql->db->query(
'SELECT * FROM tasks WHERE id = ? AND user_id = ?', $id, $user_id
)->hash;
$task ? $c->render(json => $task)
: $c->render(json => {error => "任务不存在"}, status => 404);
});
app->routes->put('/api/tasks/:id' => sub ($c) {
my $user_id = $c->stash('user_id');
my $id = $c->param('id');
my $data = $c->req->json;
my $task = $sql->db->query(
'SELECT * FROM tasks WHERE id = ? AND user_id = ?', $id, $user_id
)->hash;
return $c->render(json => {error => "任务不存在"}, status => 404)
unless $task;
my %fields;
$fields{title} = $data->{title} if exists $data->{title};
$fields{description} = $data->{description} if exists $data->{description};
$fields{status} = $data->{status} if exists $data->{status};
$fields{priority} = $data->{priority} if exists $data->{priority};
if (exists $data->{status} && $data->{status} !~ /^(pending|in_progress|done)$/) {
return $c->render(json => {error => "status 无效"}, status => 400);
}
if (%fields) {
my $sets = join(", ", map { "$_ = ?" } keys %fields);
$sql->db->query(
"UPDATE tasks SET $sets, updated = CURRENT_TIMESTAMP WHERE id = ?",
values %fields, $id
);
}
$task = $sql->db->query('SELECT * FROM tasks WHERE id = ?', $id)->hash;
$c->render(json => $task);
});
app->routes->delete('/api/tasks/:id' => sub ($c) {
my $user_id = $c->stash('user_id');
my $id = $c->param('id');
my $rows = $sql->db->query(
'DELETE FROM tasks WHERE id = ? AND user_id = ?', $id, $user_id
)->rows;
$rows > 0 ? $c->render(json => {success => 1})
: $c->render(json => {error => "任务不存在"}, status => 404);
});
# === 统计接口 ===
app->routes->get('/api/stats' => sub ($c) {
my $user_id = $c->stash('user_id');
my $stats = $sql->db->query(<<'SQL', $user_id)->hash;
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress,
SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) as done
FROM tasks WHERE user_id = ?
SQL
$c->render(json => $stats);
});
app->start;
使用示例
# 启动服务
morbo task_api.pl
# 注册
curl -X POST http://localhost:3000/api/register \
-H 'Content-Type: application/json' \
-d '{"username":"test","password":"123456"}'
# 登录获取 Token
TOKEN=$(curl -s -X POST http://localhost:3000/api/login \
-H 'Content-Type: application/json' \
-d '{"username":"test","password":"123456"}' | jq -r '.token')
# 创建任务
curl -X POST http://localhost:3000/api/tasks \
-H "Authorization: Bearer $TOKEN" \
-H 'Content-Type: application/json' \
-d '{"title":"学习 Perl","description":"完成第 25 章","priority":1}'
# 查询任务
curl http://localhost:3000/api/tasks \
-H "Authorization: Bearer $TOKEN"
# 更新状态
curl -X PUT http://localhost:3000/api/tasks/1 \
-H "Authorization: Bearer $TOKEN" \
-H 'Content-Type: application/json' \
-d '{"status":"done"}'
# 统计
curl http://localhost:3000/api/stats \
-H "Authorization: Bearer $TOKEN"
本章小结
| 项目 | 技术栈 | 核心知识点 |
|---|---|---|
| 日志分析 | 正则、文件 I/O、格式化输出 | 文本处理、数据统计 |
| 系统巡检 | 系统命令、文件读取、告警 | 系统管理、Shell 集成 |
| REST API | Mojolicious、SQLite、JWT | Web 开发、数据库、认证 |
练习
- 为日志分析工具添加 IP 地理位置查询功能
- 为系统巡检工具添加 cron 定时执行和邮件报告
- 为 REST API 编写完整的 Test::Mojo 测试套件
- 将 REST API 改为使用 Mojolicious 框架(非 Lite 版本)
- 为三个项目编写 Dockerfile 并容器化部署
扩展阅读
- Mojolicious Cookbook
- Mojo::SQLite
- Mojo::JWT
- Perl Power Tools — Unix 工具的 Perl 实现