import os import cv2 import time import threading import logging import numpy as np from glob import glob from datetime import datetime from PySide6.QtCore import QObject, Signal class LocalImagePlayer(QObject): """本地图像播放器,用于读取和播放本地图像序列 特点: 1. 异步加载图像,不阻塞UI线程 2. 可调整帧率 3. 可循环播放 4. 支持暂停和恢复 5. 自动排序图像按时间序列播放 """ # 信号定义 signal_frame_ready = Signal(object) # 帧准备好信号 (frame) signal_status = Signal(str) # 状态信号 (message) signal_progress = Signal(int, int) # 进度信号 (current, total) signal_error = Signal(str) # 错误信号 (message) signal_completed = Signal() # 播放完成信号 def __init__(self): super().__init__() # 初始化状态变量 self.folder_path = "" # 图像文件夹路径 self.file_patterns = [".jpg", ".jpeg", ".png", ".bmp"] # 支持的图像格式 self.framerate = 15 # 播放帧率 self.loop = True # 是否循环播放 self.images = [] # 图像文件列表 self.current_index = 0 # 当前播放索引 # 播放控制 self.is_playing = False # 是否正在播放 self.is_paused = False # 是否暂停 self.stop_event = threading.Event() # 停止事件 self.playback_thread = None # 播放线程 # 加载线程 self.loading_thread = None self.is_loading = False def set_folder(self, folder_path): """设置图像文件夹路径 Args: folder_path: 图像文件夹路径 Returns: bool: 是否成功设置 """ if not os.path.exists(folder_path) or not os.path.isdir(folder_path): self.signal_error.emit(f"文件夹不存在: {folder_path}") return False self.folder_path = folder_path # 异步加载图像列表 self.load_images_async() return True def set_file_patterns(self, patterns): """设置图像文件格式 Args: patterns: 支持的图像后缀列表,如['.jpg', '.png'] """ if patterns and isinstance(patterns, list): self.file_patterns = patterns def set_framerate(self, framerate): """设置播放帧率 Args: framerate: 帧率,范围1-60 """ if 1 <= framerate <= 60: self.framerate = framerate def set_loop(self, loop): """设置是否循环播放 Args: loop: 是否循环播放 """ self.loop = loop def load_images_async(self): """异步加载图像列表""" if self.is_loading: return self.is_loading = True self.loading_thread = threading.Thread(target=self._load_images_thread) self.loading_thread.daemon = True self.loading_thread.start() def _load_images_thread(self): """加载图像列表的线程函数""" try: self.signal_status.emit("正在加载图像...") # 获取所有支持格式的图像文件 image_files = [] for pattern in self.file_patterns: # 确保pattern是以.开头的扩展名 if not pattern.startswith('.'): pattern = '.' + pattern # 查找所有匹配的文件 pattern_files = glob(os.path.join(self.folder_path, f"*{pattern}")) image_files.extend(pattern_files) # 按文件名排序(通常包含时间信息) image_files.sort() if not image_files: self.signal_error.emit(f"未找到图像文件,支持的格式: {', '.join(self.file_patterns)}") self.is_loading = False return # 更新图像列表 self.images = image_files self.current_index = 0 # 发出加载完成信号 self.signal_status.emit(f"已加载 {len(self.images)} 张图像") self.signal_progress.emit(0, len(self.images)) logging.info(f"已加载 {len(self.images)} 张图像,从 {self.folder_path}") except Exception as e: logging.error(f"加载图像时发生错误: {e}") self.signal_error.emit(f"加载图像失败: {str(e)}") finally: self.is_loading = False def start_playback(self): """开始播放图像序列""" if not self.images: self.signal_error.emit("没有可播放的图像") return False if self.is_playing: return True # 重置停止事件 self.stop_event.clear() self.is_playing = True self.is_paused = False # 创建播放线程 self.playback_thread = threading.Thread(target=self._playback_thread) self.playback_thread.daemon = True self.playback_thread.start() logging.info("开始播放本地图像序列") return True def pause_playback(self): """暂停播放""" self.is_paused = True logging.info("暂停播放本地图像序列") def resume_playback(self): """恢复播放""" self.is_paused = False logging.info("恢复播放本地图像序列") def stop_playback(self): """停止播放""" if not self.is_playing: return # 设置停止事件 self.stop_event.set() # 等待线程结束 if self.playback_thread and self.playback_thread.is_alive(): self.playback_thread.join(timeout=1.0) self.is_playing = False self.is_paused = False logging.info("停止播放本地图像序列") def _playback_thread(self): """播放线程函数""" try: frame_interval = 1.0 / self.framerate logging.warning(f"====> 播放线程开始,帧率: {self.framerate} fps,帧间隔: {frame_interval} 秒") while not self.stop_event.is_set(): if self.is_paused: time.sleep(0.1) # 暂停时降低CPU使用率 continue # 记录帧开始时间 start_time = time.time() # 获取当前帧 if 0 <= self.current_index < len(self.images): image_path = self.images[self.current_index] frame = cv2.imread(image_path) if frame is not None: # OpenCV以BGR格式读取,需要转换为RGB才适合大多数显示 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 输出调试信息 logging.warning(f"====> 读取图像: {image_path}, 尺寸: {frame_rgb.shape}") # 保存最后一帧,以便重新连接时使用 self.last_frame = frame_rgb # 发送帧准备好信号 self.signal_frame_ready.emit(frame_rgb) # 确保不使用cv2.imshow # 禁用任何可能的cv2.imshow调用 # 更新进度 self.signal_progress.emit(self.current_index + 1, len(self.images)) else: logging.error(f"无法读取图像文件: {image_path}") self.signal_error.emit(f"无法读取图像: {os.path.basename(image_path)}") # 更新索引 self.current_index += 1 # 检查是否播放结束 if self.current_index >= len(self.images): if self.loop: self.current_index = 0 logging.debug("本地图像序列播放完成,循环播放") else: logging.info("本地图像序列播放完成") self.signal_completed.emit() break # 计算需要等待的时间 elapsed = time.time() - start_time sleep_time = max(0, frame_interval - elapsed) # 输出调试信息 logging.debug(f"帧处理时间: {elapsed:.4f}秒,等待时间: {sleep_time:.4f}秒") # 等待直到下一帧时间或停止事件被设置 if sleep_time > 0: self.stop_event.wait(sleep_time) except Exception as e: logging.error(f"播放图像序列时发生错误: {e}") self.signal_error.emit(f"播放错误: {str(e)}") finally: self.is_playing = False logging.warning("====> 播放线程结束")