import os import time from utils.logger import get_logger, log_timing logger = get_logger("mtp") try: import win32com.client import pythoncom except ImportError: win32com = None class MTPHandler: def __init__(self): self.cache_dir = os.path.join(os.getcwd(), "mtp_cache") if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) @log_timing def get_latest_xml_windows(self, device_name, path_str): """ 在 Windows MTP 设备上寻找并复制最新的 XML 文件。 """ logger.info(f"开始同步工作: 设备={device_name}") if win32com is None: return None, "系统未安装 pywin32。" try: pythoncom.CoInitialize() shell = win32com.client.Dispatch("Shell.Application") computer = shell.NameSpace(17) # 此电脑 # 1. 寻找设备 (增加容错:忽略首尾空格和大小写) device_item = None target_name = device_name.strip().lower() logger.debug(f"正在检索设备,目标名称: '{target_name}'") items = computer.Items() available_devices = [] for item in items: available_devices.append(item.Name) if item.Name.strip().lower() == target_name: device_item = item logger.info(f"匹配成功: '{item.Name}'") break if not device_item: logger.warning(f"未找到设备: '{device_name}'") logger.warning(f"当前电脑识别到的设备列表: {available_devices}") return None, f"未找到 MTP 设备: {device_name} (请检查名称)" current_folder = device_item.GetFolder # 2. 深入路径 path_chain = [p.strip() for p in path_str.replace('\\', '/').split('/') if p.strip()] for folder_name in path_chain: found = False for item in current_folder.Items(): if item.Name.lower() == folder_name.lower() and item.IsFolder: current_folder = item.GetFolder found = True break if not found: logger.warning(f"路径中断,找不到文件夹: '{folder_name}'") # 打印一下当前目录下有的东西,帮用户排查 content = [i.Name for i in current_folder.Items()] logger.debug(f"当前所在位置 [ {current_folder.Title} ] 下的可选内容: {content}") return None, f"路径点不存在: {folder_name}" # 3. 扫描文件并识别“最新” logger.info(f"已到达目标目录: [ {current_folder.Title} ]") logger.debug(f"正在列出该目录下所有项目...") logger.debug("-" * 40) xml_items = [] all_items = current_folder.Items() logger.debug(f"目录内总项数: {all_items.Count}") for i in range(all_items.Count): try: item = all_items.Item(i) name = str(item.Name) is_folder = item.IsFolder # 打印每一个文件的详细信息用于排查 # 尝试打印前 6 个详情列,看看哪一列是类型,哪一列是时间 details = [str(current_folder.GetDetailsOf(item, j)) for j in range(6)] if i == 0: logger.debug(f"属性列参考(0-5): {details}") item_type = details[2] # 识别逻辑: # 1. 文件名明确以 .xml 结尾 # 2. 或者类型描述中包含 "XML" (遍历 2, 3, 4 列) # 3. 或者针对您的 PDA 特效:文件名以 "Result_" 开头且名字里没有点 (认为后缀被隐藏了) is_xml_name = name.lower().endswith(".xml") is_xml_type = any("xml" in details[j].lower() for j in [2, 3, 4, 5]) is_pda_result = name.startswith("Result_") and "." not in name if not is_folder and (is_xml_name or is_xml_type or is_pda_result): # 尝试获取最可信的时间 mtime = self._get_best_time(current_folder, item) # 如果文件名包含 Result_YYYYMMDD_HHMMSS,提取它作为辅助排序依据 name_ts = self._extract_time_from_name(name) xml_items.append({ 'item': item, 'name': name if is_xml_name else f"{name}.xml", 'time': mtime, 'name_ts': name_ts }) logger.debug(f"识别 XML: '{name}' | 创建/修改时间: {mtime} | 文件名时间: {name_ts}") else: pass except Exception as e: logger.error(f"读取第 {i} 个项目时出错: {e}") logger.debug("-" * 40) if not xml_items: logger.warning(f"在目录 [ {current_folder.Title} ] 中没有找到任何以 .xml 结尾的文件") return None, "该目录中没有 XML 文件" # 排序逻辑优化: # 1. 优先按照文件名里的时间戳排序 (Result_YYYYMMDD_HHMMSS) # 2. 其次按照系统返回的时间属性排序 # 3. 最后按全名排序 xml_items.sort(key=lambda x: (x['name_ts'], str(x['time']), x['name']), reverse=True) latest = xml_items[0] latest_item = latest['item'] filename = latest['name'] logger.info(f"选定最新文件: {filename}") logger.debug(f" - 属性时间: {latest['time']}") logger.debug(f" - 名称日期: {latest['name_ts']}") # 4. 复制到本地 local_path = os.path.abspath(os.path.join(self.cache_dir, filename)) # 清理旧缓存 for f in os.listdir(self.cache_dir): try: os.remove(os.path.join(self.cache_dir, f)) except: pass logger.info(f"正在拉取文件到本地缓存...") dest_shell_folder = shell.NameSpace(os.path.abspath(self.cache_dir)) dest_shell_folder.CopyHere(latest_item, 4 | 16) # 等待 success = False for i in range(20): if os.path.exists(local_path) and os.path.getsize(local_path) > 0: success = True break time.sleep(0.5) if success: logger.info(f"同步成功: {filename}") return local_path, filename else: return None, "同步超时" except Exception as e: logger.error(f"MTP异常: {e}", exc_info=True) return None, str(e) finally: pythoncom.CoUninitialize() def _extract_time_from_name(self, name): """从文件名 Result_20251222_124145 中提取日期字符串用于排序""" import re # 匹配 8位数字_6位数字 (YYYYMMDD_HHMMSS) match = re.search(r'(\d{8}_\d{6})', name) if match: return match.group(1) # 如果没匹配到,试试只匹配 8位日期 match = re.search(r'(\d{8})', name) if match: return match.group(1) return "" def _get_best_time(self, folder_obj, item_obj): """尝试从 Shell 属性中获取文件时间""" # 索引 3 通常是修改时间,索引 4 通常是创建时间 for idx in [3, 4, 5]: try: t = folder_obj.GetDetailsOf(item_obj, idx) if t and len(t) > 5: # 简单过滤无效短字符串 return t except: pass # 兜底尝试标准属性 try: return str(item_obj.ModifyDate) except: return "" def clear_cache(self): if os.path.exists(self.cache_dir): import shutil shutil.rmtree(self.cache_dir) os.makedirs(self.cache_dir)