Loading... 可设置录像保留天数、分段时间、分辨率、码率 ```python #!/usr/bin/env python3 import sys import os import time import datetime import threading import glob import gi import signal gi.require_version("Gst", "1.0") from gi.repository import Gst, GLib, GObject # ================= 配置 ================= CONFIG = { # 保存位置 "save_dir": "/home/r/Videos/cctv", # 分段时长 "segment_duration_ns": 60 * 1_000_000_000, # 60s # 保留天数 "retention_days": 7, # 端口 "stream_port": 5000, # 摄像头地址 "usb_device": "/dev/video0", # 分辨率 "width": 1280, "height": 720, # 平均码率 "bitrate": 2000000, #峰值码率 "peak-bitrate": 4000000, } # ======================================= class JetsonDVR: def __init__(self): # 初始化 GStreamer Gst.init(None) self.mainloop = GLib.MainLoop() os.makedirs(CONFIG["save_dir"], exist_ok=True) # 清理线程 self.cleanup_thread = threading.Thread(target=self.cleanup_loop, daemon=True) self.cleanup_thread.start() self.pipeline = self.create_pipeline() # 监听总线消息 bus = self.pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self.on_message) # 优雅退出的信号处理 signal.signal(signal.SIGINT, self.handle_sigint) signal.signal(signal.SIGTERM, self.handle_sigint) def create_pipeline(self): print(f"[*] 初始化管道... 设备: {CONFIG['usb_device']}") source = ( f"v4l2src device={CONFIG['usb_device']} ! " f"image/jpeg, width={CONFIG['width']}, height={CONFIG['height']}, framerate=30/1 ! " "jpegdec ! " "nvvidconv ! " "video/x-raw(memory:NVMM), format=I420" ) # encoder = ( # f" ! nvv4l2h264enc maxperf-enable=1 insert-sps-pps=1 bitrate={CONFIG['bitrate']} ! " # "h264parse ! tee name=t" # ) # 使用 h265 encoder = ( " ! nvv4l2h265enc maxperf-enable=1 insert-sps-pps=1 " f"control-rate=1 bitrate={CONFIG['bitrate']} peak-bitrate={CONFIG['peak-bitrate']} ! " "video/x-h265,stream-format=byte-stream ! " "h265parse ! tee name=t" ) record_branch = ( f" t. ! queue ! " f"splitmuxsink name=sink " f"max-size-time={CONFIG['segment_duration_ns']} " f"muxer=mp4mux async-handling=true" ) # Streaming Branch (推流): stream_branch = ( f" t. ! queue leaky=1 ! " f"matroskamux streamable=true ! " f"tcpserversink host=0.0.0.0 port={CONFIG['stream_port']} " f"recover-policy=keyframe sync-method=latest-keyframe" ) pipeline_str = source + encoder + record_branch + stream_branch print("-" * 60) print("GStreamer Pipeline String:") print(pipeline_str) print("-" * 60) try: pipeline = Gst.parse_launch(pipeline_str) except Exception as e: print("[!] 管道构建失败:", e) sys.exit(1) sink = pipeline.get_by_name("sink") if sink: sink.connect("format-location", self.on_format_location) else: print("[!] 警告: 找不到 splitmuxsink") return pipeline def on_format_location(self, splitmux, fragment_id): now = datetime.datetime.now() filename = now.strftime("%Y%m%d-%H%M%S.mp4") return os.path.join(CONFIG["save_dir"], filename) def cleanup_loop(self): retention_sec = CONFIG["retention_days"] * 24 * 3600 while True: try: now = time.time() for f in glob.glob(os.path.join(CONFIG["save_dir"], "*.mp4")): if os.path.isfile(f): if now - os.path.getmtime(f) > retention_sec: os.remove(f) print(f"[-] 删除过期录像: {os.path.basename(f)}") except Exception as e: print("[!] 清理线程错误:", e) time.sleep(3600) def on_message(self, bus, message): mtype = message.type if mtype == Gst.MessageType.ERROR: err, debug = message.parse_error() print("[!] Pipeline Error:", err) if debug: print("[!] Debug Info:", debug) self.quit() elif mtype == Gst.MessageType.EOS: print("[*] End of Stream") self.quit() def handle_sigint(self, sig, frame): print("\n[*] 接收到中断信号,正在停止...") self.quit() def quit(self): if self.mainloop.is_running(): self.mainloop.quit() def run(self): print("[*] 启动 DVR 系统...") self.pipeline.set_state(Gst.State.PLAYING) try: self.mainloop.run() except KeyboardInterrupt: pass finally: self.stop_pipeline() def stop_pipeline(self): print("[*] 发送 EOS 并停止管道...") # 发送 EOS 以正确结束录像文件(防止 MP4 损坏) self.pipeline.send_event(Gst.Event.new_eos()) # 给一点时间让 EOS 传播 time.sleep(1) self.pipeline.set_state(Gst.State.NULL) print("[*] 已退出") if __name__ == "__main__": dvr = JetsonDVR() dvr.run() ``` Last modification:January 8, 2026 © Allow specification reprint Support Appreciate the author AliPayWeChat Like 0 如果觉得我的文章对你有用,请随意赞赏