Python 集成
Python 集成
概述
Python 是 FFmpeg 自动化和集成的常用语言。本章介绍如何使用 Python 操作 FFmpeg,包括 FFmpeg-Python 库、子进程调用、复杂滤镜图构建和批量处理。
FFmpeg-Python 库
安装
pip install ffmpeg-python
基本用法
import ffmpeg
# 基本转码
ffmpeg.input('input.mp4').output('output.mp4').run()
# 指定编解码器
ffmpeg.input('input.mp4').output('output.mp4',
vcodec='libx264',
acodec='aac',
crf=23
).run()
# 覆盖输出文件
ffmpeg.input('input.mp4').output('output.mp4').overwrite_output().run()
输入输出
import ffmpeg
# 单输入单输出
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 多输入
video = ffmpeg.input('video.mp4')
audio = ffmpeg.input('audio.mp3')
stream = ffmpeg.output(video, audio, 'output.mp4', vcodec='copy', acodec='aac')
ffmpeg.run(stream)
# 从 URL 输入
stream = ffmpeg.input('rtmp://server/live/stream')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 输出到管道
stream = ffmpeg.input('input.mp4')
process = ffmpeg.run_async(stream, pipe_stdout=True)
编解码器设置
import ffmpeg
# 视频编解码器
ffmpeg.input('input.mp4').output('output.mp4',
vcodec='libx264',
acodec='aac',
video_bitrate='2M',
audio_bitrate='128k'
).run()
# CRF 模式
ffmpeg.input('input.mp4').output('output.mp4',
vcodec='libx264',
crf=23,
preset='medium'
).run()
# 复制流
ffmpeg.input('input.mp4').output('output.mkv',
vcodec='copy',
acodec='copy'
).run()
视频滤镜
import ffmpeg
# 缩放
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'scale', 1280, 720)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 裁剪
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'crop', 640, 480, 0, 0)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 旋转
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'transpose', 1)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
音频滤镜
import ffmpeg
# 音量调整
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'volume', 2.0)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 淡入淡出
stream = ffmpeg.input('input.mp4')
stream = ffmpeg.filter(stream, 'afade', t='in', st=0, d=3)
stream = ffmpeg.filter(stream, 'afade', t='out', st=57, d=3)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
复杂滤镜图
import ffmpeg
# 画中画
main = ffmpeg.input('main.mp4')
pip = ffmpeg.input('pip.mp4')
pip = ffmpeg.filter(pip, 'scale', 320, 240)
stream = ffmpeg.overlay(main, pip, x='W-w-10', y='H-h-10')
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
# 水印叠加
video = ffmpeg.input('input.mp4')
watermark = ffmpeg.input('watermark.png')
stream = ffmpeg.overlay(video, watermark, x=10, y=10)
stream = ffmpeg.output(stream, 'output.mp4')
ffmpeg.run(stream)
获取媒体信息
import ffmpeg
# 获取文件信息
info = ffmpeg.probe('input.mp4')
# 打印信息
print(f"格式: {info['format']['format_name']}")
print(f"时长: {info['format']['duration']} 秒")
print(f"大小: {info['format']['size']} 字节")
# 获取流信息
for stream in info['streams']:
print(f"流类型: {stream['codec_type']}")
print(f"编解码器: {stream['codec_name']}")
错误处理
import ffmpeg
try:
ffmpeg.input('input.mp4').output('output.mp4').run()
except ffmpeg.Error as e:
print(f"FFmpeg 错误: {e.stderr.decode()}")
子进程调用
基本调用
import subprocess
# 基本命令
cmd = ['ffmpeg', '-i', 'input.mp4', 'output.mp4']
subprocess.run(cmd)
# 带参数
cmd = [
'ffmpeg', '-i', 'input.mp4',
'-c:v', 'libx264', '-crf', '23',
'-c:a', 'aac', '-b:a', '128k',
'output.mp4'
]
subprocess.run(cmd)
捕获输出
import subprocess
# 捕获标准输出和错误
cmd = ['ffmpeg', '-i', 'input.mp4', 'output.mp4']
result = subprocess.run(cmd, capture_output=True, text=True)
print("标准输出:", result.stdout)
print("标准错误:", result.stderr)
print("返回码:", result.returncode)
实时输出
import subprocess
# 实时显示输出
cmd = ['ffmpeg', '-i', 'input.mp4', '-c:v', 'libx264', 'output.mp4']
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, universal_newlines=True)
for line in process.stderr:
if 'time=' in line:
print(line.strip())
process.wait()
进度监控
import subprocess
import re
import json
def get_duration(input_file):
"""获取视频时长"""
cmd = [
'ffprobe', '-v', 'quiet',
'-print_format', 'json',
'-show_format', input_file
]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
return float(info['format']['duration'])
def transcode_with_progress(input_file, output_file):
"""带进度的转码"""
duration = get_duration(input_file)
cmd = [
'ffmpeg', '-i', input_file,
'-c:v', 'libx264', '-crf', '23',
'-c:a', 'aac',
output_file
]
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, universal_newlines=True)
for line in process.stderr:
time_match = re.search(r'time=(\d+:\d+:\d+\.\d+)', line)
if time_match:
time_str = time_match.group(1)
h, m, s = time_str.split(':')
current = float(h) * 3600 + float(m) * 60 + float(s)
progress = (current / duration) * 100
print(f"\r进度: {progress:.1f}%", end='')
process.wait()
print("\n转码完成")
高级用法
并行处理
import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor
def process_file(input_file, output_file):
"""处理单个文件"""
try:
ffmpeg.input(input_file).output(
output_file,
vcodec='libx264',
crf=23,
acodec='aac'
).overwrite_output().run(quiet=True)
return True
except ffmpeg.Error:
return False
def batch_process(input_dir, output_dir, max_workers=4):
"""批量处理"""
os.makedirs(output_dir, exist_ok=True)
files = [
f for f in os.listdir(input_dir)
if f.endswith(('.mp4', '.avi', '.mkv'))
]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for f in files:
input_path = os.path.join(input_dir, f)
output_path = os.path.join(output_dir, f)
futures.append(executor.submit(process_file, input_path, output_path))
for future in futures:
future.result()
流媒体处理
import ffmpeg
# RTMP 推流
ffmpeg.input('input.mp4').output(
'rtmp://server/live/stream',
vcodec='libx264',
acodec='aac',
preset='fast',
tune='zerolatency',
video_bitrate='3M',
maxrate='3M',
bufsize='6M',
f='flv'
).run()
# HLS 输出
ffmpeg.input('input.mp4').output(
'playlist.m3u8',
vcodec='libx264',
acodec='aac',
hls_time=4,
hls_list_size=10,
hls_segment_filename='segment_%03d.ts'
).run()
硬件加速
import ffmpeg
# NVIDIA GPU 加速
ffmpeg.input('input.mp4').output(
'output.mp4',
vcodec='h264_nvenc',
preset='fast',
video_bitrate='3M'
).run()
# Intel QSV 加速
ffmpeg.input('input.mp4').output(
'output.mp4',
vcodec='h264_qsv',
preset='fast',
video_bitrate='3M'
).run()
多输出
import ffmpeg
# 多码率输出
stream = ffmpeg.input('input.mp4')
# 1080p
stream_1080p = ffmpeg.filter(stream, 'scale', 1920, 1080)
output_1080p = ffmpeg.output(stream_1080p, 'output_1080p.mp4',
vcodec='libx264', crf=23, preset='medium')
# 720p
stream_720p = ffmpeg.filter(stream, 'scale', 1280, 720)
output_720p = ffmpeg.output(stream_720p, 'output_720p.mp4',
vcodec='libx264', crf=23, preset='medium')
# 480p
stream_480p = ffmpeg.filter(stream, 'scale', 854, 480)
output_480p = ffmpeg.output(stream_480p, 'output_480p.mp4',
vcodec='libx264', crf=23, preset='medium')
# 运行所有输出
ffmpeg.run(output_1080p, output_720p, output_480p)
封装类
FFmpeg 处理器类
import ffmpeg
import subprocess
import json
import os
from typing import Optional, Dict, List
class FFmpegProcessor:
"""FFmpeg 处理器"""
def __init__(self, ffmpeg_path: str = 'ffmpeg', ffprobe_path: str = 'ffprobe'):
self.ffmpeg_path = ffmpeg_path
self.ffprobe_path = ffprobe_path
def get_info(self, input_file: str) -> Dict:
"""获取媒体信息"""
cmd = [
self.ffprobe_path,
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
input_file
]
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)
def get_duration(self, input_file: str) -> float:
"""获取时长"""
info = self.get_info(input_file)
return float(info['format']['duration'])
def transcode(
self,
input_file: str,
output_file: str,
vcodec: str = 'libx264',
acodec: str = 'aac',
crf: int = 23,
preset: str = 'medium',
video_bitrate: Optional[str] = None,
audio_bitrate: Optional[str] = '128k',
overwrite: bool = True
) -> bool:
"""转码"""
try:
stream = ffmpeg.input(input_file)
kwargs = {
'vcodec': vcodec,
'acodec': acodec,
'crf': crf,
'preset': preset,
}
if video_bitrate:
kwargs['video_bitrate'] = video_bitrate
if audio_bitrate:
kwargs['audio_bitrate'] = audio_bitrate
stream = ffmpeg.output(stream, output_file, **kwargs)
if overwrite:
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def scale(
self,
input_file: str,
output_file: str,
width: int,
height: int,
**kwargs
) -> bool:
"""缩放"""
try:
stream = ffmpeg.input(input_file)
stream = ffmpeg.filter(stream, 'scale', width, height)
stream = ffmpeg.output(stream, output_file, **kwargs)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def extract_audio(
self,
input_file: str,
output_file: str,
acodec: str = 'libmp3lame',
audio_bitrate: str = '192k'
) -> bool:
"""提取音频"""
try:
stream = ffmpeg.input(input_file)
stream = ffmpeg.output(stream, output_file,
vn=None,
acodec=acodec,
audio_bitrate=audio_bitrate)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def add_watermark(
self,
input_file: str,
watermark_file: str,
output_file: str,
x: int = 10,
y: int = 10,
**kwargs
) -> bool:
"""添加水印"""
try:
video = ffmpeg.input(input_file)
watermark = ffmpeg.input(watermark_file)
stream = ffmpeg.overlay(video, watermark, x=x, y=y)
stream = ffmpeg.output(stream, output_file, **kwargs)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def create_thumbnail(
self,
input_file: str,
output_file: str,
timestamp: str = '00:00:10'
) -> bool:
"""创建缩略图"""
try:
stream = ffmpeg.input(input_file, ss=timestamp)
stream = ffmpeg.output(stream, output_file, vframes=1)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def concat(
self,
input_files: List[str],
output_file: str,
**kwargs
) -> bool:
"""拼接视频"""
try:
inputs = [ffmpeg.input(f) for f in input_files]
stream = ffmpeg.concat(*inputs)
stream = ffmpeg.output(stream, output_file, **kwargs)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
使用示例
# 创建处理器实例
processor = FFmpegProcessor()
# 获取文件信息
info = processor.get_info('input.mp4')
print(f"时长: {processor.get_duration('input.mp4')} 秒")
# 转码
processor.transcode('input.mp4', 'output.mp4', crf=23, preset='slow')
# 缩放
processor.scale('input.mp4', 'output_720p.mp4', 1280, 720)
# 提取音频
processor.extract_audio('input.mp4', 'audio.mp3')
# 添加水印
processor.add_watermark('input.mp4', 'watermark.png', 'output_watermark.mp4')
# 创建缩略图
processor.create_thumbnail('input.mp4', 'thumbnail.jpg')
# 拼接视频
processor.concat(['part1.mp4', 'part2.mp4', 'part3.mp4'], 'output.mp4')
批量处理工具
批量转码工具
import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BatchTranscoder:
"""批量转码器"""
def __init__(
self,
input_dir: str,
output_dir: str,
max_workers: int = 4,
vcodec: str = 'libx264',
acodec: str = 'aac',
crf: int = 23,
preset: str = 'medium'
):
self.input_dir = input_dir
self.output_dir = output_dir
self.max_workers = max_workers
self.vcodec = vcodec
self.acodec = acodec
self.crf = crf
self.preset = preset
os.makedirs(output_dir, exist_ok=True)
def _process_file(self, input_file: str) -> Tuple[str, bool, str]:
"""处理单个文件"""
filename = os.path.basename(input_file)
output_file = os.path.join(self.output_dir, filename)
try:
ffmpeg.input(input_file).output(
output_file,
vcodec=self.vcodec,
acodec=self.acodec,
crf=self.crf,
preset=self.preset
).overwrite_output().run(quiet=True)
return filename, True, "成功"
except ffmpeg.Error as e:
return filename, False, str(e.stderr.decode())
def process(self) -> List[Tuple[str, bool, str]]:
"""批量处理"""
# 获取文件列表
files = [
os.path.join(self.input_dir, f)
for f in os.listdir(self.input_dir)
if f.endswith(('.mp4', '.avi', '.mkv', '.mov'))
]
logger.info(f"找到 {len(files)} 个文件")
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self._process_file, f): f
for f in files
}
for future in as_completed(futures):
filename, success, message = future.result()
if success:
logger.info(f"✓ {filename}")
else:
logger.error(f"✗ {filename}: {message}")
results.append((filename, success, message))
# 统计
success_count = sum(1 for _, success, _ in results if success)
logger.info(f"完成: {success_count}/{len(results)} 成功")
return results
# 使用示例
if __name__ == '__main__':
transcoder = BatchTranscoder(
input_dir='./input',
output_dir='./output',
max_workers=4,
crf=23,
preset='medium'
)
results = transcoder.process()
多分辨率批量工具
import ffmpeg
import os
from concurrent.futures import ThreadPoolExecutor
class MultiResolutionBatch:
"""多分辨率批量处理"""
def __init__(self, input_dir: str, output_dir: str, max_workers: int = 4):
self.input_dir = input_dir
self.output_dir = output_dir
self.max_workers = max_workers
self.resolutions = {
'1080p': (1920, 1080),
'720p': (1280, 720),
'480p': (854, 480),
}
for res in self.resolutions:
os.makedirs(os.path.join(output_dir, res), exist_ok=True)
def _process_file(self, input_file: str, resolution: str, width: int, height: int):
"""处理单个文件"""
filename = os.path.basename(input_file)
output_file = os.path.join(self.output_dir, resolution, filename)
try:
stream = ffmpeg.input(input_file)
stream = ffmpeg.filter(stream, 'scale', width, height)
stream = ffmpeg.output(stream, output_file,
vcodec='libx264',
acodec='aac',
crf=23,
preset='fast')
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return True
except ffmpeg.Error:
return False
def process(self):
"""批量处理"""
files = [
os.path.join(self.input_dir, f)
for f in os.listdir(self.input_dir)
if f.endswith(('.mp4', '.avi', '.mkv'))
]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for file in files:
for resolution, (width, height) in self.resolutions.items():
futures.append(
executor.submit(self._process_file, file, resolution, width, height)
)
for future in futures:
future.result()
命令行工具封装
CLI 工具
#!/usr/bin/env python3
"""FFmpeg 命令行工具"""
import argparse
import ffmpeg
import sys
import os
def transcode(args):
"""转码命令"""
try:
stream = ffmpeg.input(args.input)
stream = ffmpeg.output(stream, args.output,
vcodec=args.vcodec,
acodec=args.acodec,
crf=args.crf,
preset=args.preset)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream)
print(f"转码完成: {args.output}")
except ffmpeg.Error as e:
print(f"转码失败: {e.stderr.decode()}", file=sys.stderr)
sys.exit(1)
def scale(args):
"""缩放命令"""
try:
stream = ffmpeg.input(args.input)
stream = ffmpeg.filter(stream, 'scale', args.width, args.height)
stream = ffmpeg.output(stream, args.output,
vcodec='libx264',
acodec='aac',
crf=23)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream)
print(f"缩放完成: {args.output}")
except ffmpeg.Error as e:
print(f"缩放失败: {e.stderr.decode()}", file=sys.stderr)
sys.exit(1)
def extract_audio(args):
"""提取音频命令"""
try:
stream = ffmpeg.input(args.input)
stream = ffmpeg.output(stream, args.output,
vn=None,
acodec=args.acodec,
audio_bitrate=args.bitrate)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream)
print(f"音频提取完成: {args.output}")
except ffmpeg.Error as e:
print(f"音频提取失败: {e.stderr.decode()}", file=sys.stderr)
sys.exit(1)
def info(args):
"""信息命令"""
try:
probe = ffmpeg.probe(args.input)
print(f"文件: {args.input}")
print(f"格式: {probe['format']['format_name']}")
print(f"时长: {float(probe['format']['duration']):.2f} 秒")
print(f"大小: {int(probe['format']['size']) / 1024 / 1024:.2f} MB")
for stream in probe['streams']:
if stream['codec_type'] == 'video':
print(f"\n视频流:")
print(f" 编解码器: {stream['codec_name']}")
print(f" 分辨率: {stream['width']}x{stream['height']}")
print(f" 帧率: {stream.get('r_frame_rate', 'N/A')}")
elif stream['codec_type'] == 'audio':
print(f"\n音频流:")
print(f" 编解码器: {stream['codec_name']}")
print(f" 采样率: {stream.get('sample_rate', 'N/A')}")
print(f" 声道数: {stream.get('channels', 'N/A')}")
except ffmpeg.Error as e:
print(f"获取信息失败: {e.stderr.decode()}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='FFmpeg 工具')
subparsers = parser.add_subparsers(dest='command', help='子命令')
# 转码命令
transcode_parser = subparsers.add_parser('transcode', help='转码')
transcode_parser.add_argument('input', help='输入文件')
transcode_parser.add_argument('output', help='输出文件')
transcode_parser.add_argument('--vcodec', default='libx264', help='视频编解码器')
transcode_parser.add_argument('--acodec', default='aac', help='音频编解码器')
transcode_parser.add_argument('--crf', type=int, default=23, help='CRF 值')
transcode_parser.add_argument('--preset', default='medium', help='编码预设')
# 缩放命令
scale_parser = subparsers.add_parser('scale', help='缩放')
scale_parser.add_argument('input', help='输入文件')
scale_parser.add_argument('output', help='输出文件')
scale_parser.add_argument('--width', type=int, required=True, help='宽度')
scale_parser.add_argument('--height', type=int, required=True, help='高度')
# 提取音频命令
audio_parser = subparsers.add_parser('extract-audio', help='提取音频')
audio_parser.add_argument('input', help='输入文件')
audio_parser.add_argument('output', help='输出文件')
audio_parser.add_argument('--acodec', default='libmp3lame', help='音频编解码器')
audio_parser.add_argument('--bitrate', default='192k', help='音频码率')
# 信息命令
info_parser = subparsers.add_parser('info', help='获取信息')
info_parser.add_argument('input', help='输入文件')
args = parser.parse_args()
if args.command == 'transcode':
transcode(args)
elif args.command == 'scale':
scale(args)
elif args.command == 'extract-audio':
extract_audio(args)
elif args.command == 'info':
info(args)
else:
parser.print_help()
if __name__ == '__main__':
main()
使用示例
# 转码
python ffmpeg_tool.py transcode input.mp4 output.mp4 --crf 23 --preset slow
# 缩放
python ffmpeg_tool.py scale input.mp4 output.mp4 --width 1280 --height 720
# 提取音频
python ffmpeg_tool.py extract-audio input.mp4 audio.mp3 --bitrate 320k
# 获取信息
python ffmpeg_tool.py info input.mp4
注意事项
- 错误处理:始终捕获 ffmpeg.Error 异常
- 资源管理:长时间运行的进程注意资源释放
- 并发控制:并行处理时控制线程/进程数量
- 日志记录:记录处理日志便于调试
- 版本兼容:注意 ffmpeg-python 版本与 FFmpeg 版本的兼容性
业务场景
场景 1:视频处理 API
from flask import Flask, request, jsonify
import ffmpeg
import os
import uuid
app = Flask(__name__)
@app.route('/api/transcode', methods=['POST'])
def transcode():
"""转码 API"""
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
filename = f"{uuid.uuid4()}.mp4"
input_path = os.path.join('/tmp', filename)
output_path = os.path.join('/tmp', f"output_{filename}")
file.save(input_path)
try:
ffmpeg.input(input_path).output(
output_path,
vcodec='libx264',
acodec='aac',
crf=23
).overwrite_output().run(quiet=True)
return jsonify({
'success': True,
'output': output_path
})
except ffmpeg.Error as e:
return jsonify({'error': str(e.stderr.decode())}), 500
@app.route('/api/info', methods=['POST'])
def info():
"""获取信息 API"""
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
filename = f"{uuid.uuid4()}.mp4"
input_path = os.path.join('/tmp', filename)
file.save(input_path)
try:
probe = ffmpeg.probe(input_path)
return jsonify({
'success': True,
'info': probe
})
except ffmpeg.Error as e:
return jsonify({'error': str(e.stderr.decode())}), 500
场景 2:视频处理队列
import queue
import threading
import ffmpeg
class VideoQueue:
"""视频处理队列"""
def __init__(self, max_workers: int = 4):
self.queue = queue.Queue()
self.max_workers = max_workers
self.workers = []
self.running = False
def start(self):
"""启动队列"""
self.running = True
for _ in range(self.max_workers):
worker = threading.Thread(target=self._worker)
worker.start()
self.workers.append(worker)
def stop(self):
"""停止队列"""
self.running = False
for worker in self.workers:
worker.join()
def add_task(self, input_file: str, output_file: str, **kwargs):
"""添加任务"""
self.queue.put((input_file, output_file, kwargs))
def _worker(self):
"""工作线程"""
while self.running:
try:
input_file, output_file, kwargs = self.queue.get(timeout=1)
self._process_file(input_file, output_file, **kwargs)
self.queue.task_done()
except queue.Empty:
continue
def _process_file(self, input_file: str, output_file: str, **kwargs):
"""处理文件"""
try:
ffmpeg.input(input_file).output(
output_file, **kwargs
).overwrite_output().run(quiet=True)
print(f"完成: {output_file}")
except ffmpeg.Error as e:
print(f"失败: {output_file} - {e.stderr.decode()}")
# 使用示例
queue = VideoQueue(max_workers=4)
queue.start()
queue.add_task('input1.mp4', 'output1.mp4', vcodec='libx264', crf=23)
queue.add_task('input2.mp4', 'output2.mp4', vcodec='libx264', crf=23)
queue.queue.join()
queue.stop()
扩展阅读
总结
本章介绍了 FFmpeg 的 Python 集成,包括:
- FFmpeg-Python 库的使用
- 子进程调用方法
- 复杂滤镜图构建
- 批量处理工具
- CLI 工具封装
掌握 Python 集成可以帮助您构建自动化的视频处理系统。