jiateng_ws/utils/local_image_player.py

265 lines
9.3 KiB
Python
Raw Normal View History

import os
import cv2
import time
import threading
import logging
import numpy as np
from glob import glob
from datetime import datetime
from PySide6.QtCore import QObject, Signal
class LocalImagePlayer(QObject):
"""本地图像播放器,用于读取和播放本地图像序列
特点
1. 异步加载图像不阻塞UI线程
2. 可调整帧率
3. 可循环播放
4. 支持暂停和恢复
5. 自动排序图像按时间序列播放
"""
# 信号定义
signal_frame_ready = Signal(object) # 帧准备好信号 (frame)
signal_status = Signal(str) # 状态信号 (message)
signal_progress = Signal(int, int) # 进度信号 (current, total)
signal_error = Signal(str) # 错误信号 (message)
signal_completed = Signal() # 播放完成信号
def __init__(self):
super().__init__()
# 初始化状态变量
self.folder_path = "" # 图像文件夹路径
self.file_patterns = [".jpg", ".jpeg", ".png", ".bmp"] # 支持的图像格式
self.framerate = 15 # 播放帧率
self.loop = True # 是否循环播放
self.images = [] # 图像文件列表
self.current_index = 0 # 当前播放索引
# 播放控制
self.is_playing = False # 是否正在播放
self.is_paused = False # 是否暂停
self.stop_event = threading.Event() # 停止事件
self.playback_thread = None # 播放线程
# 加载线程
self.loading_thread = None
self.is_loading = False
def set_folder(self, folder_path):
"""设置图像文件夹路径
Args:
folder_path: 图像文件夹路径
Returns:
bool: 是否成功设置
"""
if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
self.signal_error.emit(f"文件夹不存在: {folder_path}")
return False
self.folder_path = folder_path
# 异步加载图像列表
self.load_images_async()
return True
def set_file_patterns(self, patterns):
"""设置图像文件格式
Args:
patterns: 支持的图像后缀列表['.jpg', '.png']
"""
if patterns and isinstance(patterns, list):
self.file_patterns = patterns
def set_framerate(self, framerate):
"""设置播放帧率
Args:
framerate: 帧率范围1-60
"""
if 1 <= framerate <= 60:
self.framerate = framerate
def set_loop(self, loop):
"""设置是否循环播放
Args:
loop: 是否循环播放
"""
self.loop = loop
def load_images_async(self):
"""异步加载图像列表"""
if self.is_loading:
return
self.is_loading = True
self.loading_thread = threading.Thread(target=self._load_images_thread)
self.loading_thread.daemon = True
self.loading_thread.start()
def _load_images_thread(self):
"""加载图像列表的线程函数"""
try:
self.signal_status.emit("正在加载图像...")
# 获取所有支持格式的图像文件
image_files = []
for pattern in self.file_patterns:
# 确保pattern是以.开头的扩展名
if not pattern.startswith('.'):
pattern = '.' + pattern
# 查找所有匹配的文件
pattern_files = glob(os.path.join(self.folder_path, f"*{pattern}"))
image_files.extend(pattern_files)
# 按文件名排序(通常包含时间信息)
image_files.sort()
if not image_files:
self.signal_error.emit(f"未找到图像文件,支持的格式: {', '.join(self.file_patterns)}")
self.is_loading = False
return
# 更新图像列表
self.images = image_files
self.current_index = 0
# 发出加载完成信号
self.signal_status.emit(f"已加载 {len(self.images)} 张图像")
self.signal_progress.emit(0, len(self.images))
logging.info(f"已加载 {len(self.images)} 张图像,从 {self.folder_path}")
except Exception as e:
logging.error(f"加载图像时发生错误: {e}")
self.signal_error.emit(f"加载图像失败: {str(e)}")
finally:
self.is_loading = False
def start_playback(self):
"""开始播放图像序列"""
if not self.images:
self.signal_error.emit("没有可播放的图像")
return False
if self.is_playing:
return True
# 重置停止事件
self.stop_event.clear()
self.is_playing = True
self.is_paused = False
# 创建播放线程
self.playback_thread = threading.Thread(target=self._playback_thread)
self.playback_thread.daemon = True
self.playback_thread.start()
logging.info("开始播放本地图像序列")
return True
def pause_playback(self):
"""暂停播放"""
self.is_paused = True
logging.info("暂停播放本地图像序列")
def resume_playback(self):
"""恢复播放"""
self.is_paused = False
logging.info("恢复播放本地图像序列")
def stop_playback(self):
"""停止播放"""
if not self.is_playing:
return
# 设置停止事件
self.stop_event.set()
# 等待线程结束
if self.playback_thread and self.playback_thread.is_alive():
self.playback_thread.join(timeout=1.0)
self.is_playing = False
self.is_paused = False
logging.info("停止播放本地图像序列")
def _playback_thread(self):
"""播放线程函数"""
try:
frame_interval = 1.0 / self.framerate
logging.warning(f"====> 播放线程开始,帧率: {self.framerate} fps帧间隔: {frame_interval}")
while not self.stop_event.is_set():
if self.is_paused:
time.sleep(0.1) # 暂停时降低CPU使用率
continue
# 记录帧开始时间
start_time = time.time()
# 获取当前帧
if 0 <= self.current_index < len(self.images):
image_path = self.images[self.current_index]
frame = cv2.imread(image_path)
if frame is not None:
# OpenCV以BGR格式读取需要转换为RGB才适合大多数显示
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 输出调试信息
logging.warning(f"====> 读取图像: {image_path}, 尺寸: {frame_rgb.shape}")
# 保存最后一帧,以便重新连接时使用
self.last_frame = frame_rgb
# 发送帧准备好信号
self.signal_frame_ready.emit(frame_rgb)
# 确保不使用cv2.imshow
# 禁用任何可能的cv2.imshow调用
# 更新进度
self.signal_progress.emit(self.current_index + 1, len(self.images))
else:
logging.error(f"无法读取图像文件: {image_path}")
self.signal_error.emit(f"无法读取图像: {os.path.basename(image_path)}")
# 更新索引
self.current_index += 1
# 检查是否播放结束
if self.current_index >= len(self.images):
if self.loop:
self.current_index = 0
logging.debug("本地图像序列播放完成,循环播放")
else:
logging.info("本地图像序列播放完成")
self.signal_completed.emit()
break
# 计算需要等待的时间
elapsed = time.time() - start_time
sleep_time = max(0, frame_interval - elapsed)
# 输出调试信息
logging.debug(f"帧处理时间: {elapsed:.4f}秒,等待时间: {sleep_time:.4f}")
# 等待直到下一帧时间或停止事件被设置
if sleep_time > 0:
self.stop_event.wait(sleep_time)
except Exception as e:
logging.error(f"播放图像序列时发生错误: {e}")
self.signal_error.emit(f"播放错误: {str(e)}")
finally:
self.is_playing = False
logging.warning("====> 播放线程结束")