☰ 补单逻辑管理
系统状态:运行中 ● 👤 admin
补单逻辑管理
修改代码需要操作密码确认;查看、复制不需要密码。保存前自动备份,可回滚最近版本。
"""补单/查单逻辑窄补丁区。 这里保留"只处理已经判断可补单的订单"的原则: 1. 查询:在在线充值列表按订单号查状态,返回订单号/状态/金额/UID等。 2. 补单:点击订单左侧审核 -> 操作类型选择人工到账 -> 备注bd -> Commit/确认/确定。 本版最终逻辑:时间采用请求拦截强制修改,绕过UI时间选择器。 时间来源:直接从订单号解析 查询范围:订单时间前后各1分钟 新增逻辑: - 登录状态检查,未登录则不执行查询 - 未找到订单:格式化输出通知 - 订单非成功状态:扩展查询同UID同金额成功订单,如有则通知掉单群 """ from __future__ import annotations import time import json import re from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from audit_db import setting def _cmd_log(msg: str): """CMD实时日志:完整打印,带毫秒时间,方便定位慢点。""" try: ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] print(f"[{ts}] {msg}", flush=True) except Exception: pass def _tick() -> float: return time.perf_counter() def _cost(label: str, start: float): try: _cmd_log(f"[耗时] {label} {int((time.perf_counter() - start) * 1000)}ms") except Exception: pass def _run_log(case_id, action: str, detail: str = "", result: str = "成功"): """CMD 全量打印;后台运行日志只写关键节点。""" _cmd_log(f"[{action}] {result} {detail}") if not case_id: return db_actions = {"查询条件", "查询结果", "查询命中", "查询异常", "扩展查询", "掉单群通知", "登录检查"} if action not in db_actions and result != "失败": return try: from ocr_engine import add_run_log add_run_log(action, detail, result, case_id) except Exception as e: _cmd_log(f"[运行日志写入失败] action={action} error={e}") def _safe_name(value: str, default: str = "na") -> str: s = str(value or "").strip() s = re.sub(r"[^0-9A-Za-z_-]", "_", s) return s or default async def _capture_login_debug_artifacts(page, case_id=None, order_no: str = "", reason: str = "") -> dict: """登录失败留证:保存截图/HTML/元数据,便于定位卡点。""" ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] case_tag = _safe_name(f"case_{case_id}" if case_id else "case_na") order_tag = _safe_name(order_no, "order_na") prefix = f"{ts}_{case_tag}_{order_tag}" out_dir = Path("debug_logs") / "login_failures" out_dir.mkdir(parents=True, exist_ok=True) png_path = out_dir / f"{prefix}.png" html_path = out_dir / f"{prefix}.html" meta_path = out_dir / f"{prefix}.txt" screenshot_error = "" html_error = "" title = "" current_url = "" try: current_url = page.url except Exception as e: current_url = f"<read_url_failed:{e}>" try: title = await page.title() except Exception as e: title = f"<read_title_failed:{e}>" try: await page.screenshot(path=str(png_path), full_page=True) except Exception as e: screenshot_error = str(e) try: html = await page.content() html_path.write_text(html, encoding="utf-8") except Exception as e: html_error = str(e) meta = [ f"time={datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"case_id={case_id}", f"order_no={order_no}", f"reason={reason}", f"url={current_url}", f"title={title}", f"screenshot={png_path.as_posix()}", f"html={html_path.as_posix()}", f"screenshot_error={screenshot_error}", f"html_error={html_error}", ] try: meta_path.write_text("\n".join(meta) + "\n", encoding="utf-8") except Exception: pass _cmd_log(f"[登录留证] 已保存: {meta_path.as_posix()}") return { "meta": meta_path.as_posix(), "screenshot": png_path.as_posix(), "html": html_path.as_posix(), "screenshot_error": screenshot_error, "html_error": html_error, } async def _js_click_button_by_text(page, texts): """极速 JS 点击按钮:绕过 Playwright actionability/scroll 动画。""" if isinstance(texts, str): texts = [texts] try: return await page.evaluate( """ (texts) => { const visible = (el) => { const r = el.getBoundingClientRect(); const st = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && st.display !== 'none' && st.visibility !== 'hidden'; }; const disabled = (el) => el.disabled || el.getAttribute('disabled') !== null || el.getAttribute('aria-disabled') === 'true' || (el.className || '').toString().includes('disabled'); const btns = Array.from(document.querySelectorAll('button, .arco-btn, .el-button, [role="button"]')); for (let i = btns.length - 1; i >= 0; i--) { const b = btns[i]; const txt = (b.innerText || b.textContent || '').replace(/\s+/g, '').trim(); if (!visible(b) || disabled(b)) continue; if (texts.some(t => txt.includes(String(t).replace(/\s+/g, '')))) { b.click(); return {ok: true, text: txt, index: i}; } } return {ok: false, error: 'button_not_found'}; } """, texts, ) except Exception as e: return {"ok": False, "error": str(e)} async def _click_button_by_text(page, texts, timeout: int = 800): """按钮点击:默认先用 JS click,避免 Playwright actionability/slow_mo 拖慢;失败再真点击兜底。""" if isinstance(texts, str): texts = [texts] ret = await _js_click_button_by_text(page, texts) if ret.get("ok"): ret["method"] = "js_fast" return ret last_error = ret.get("error") or "" for t in texts: selectors = [ f"button:has-text('{t}')", f".arco-btn:has-text('{t}')", f"[role='button']:has-text('{t}')", ] for sel in selectors: try: loc = page.locator(sel).last if await loc.count(): await loc.click(timeout=timeout) return {"ok": True, "text": t, "method": "playwright_fallback", "selector": sel} except Exception as e: last_error = str(e) return {"ok": False, "error": last_error or "button_not_found"} def parse_datetime_from_order_no(order_no: str) -> str: """从订单号解析下单时间。 订单号格式:s032605051412372959708 时间部分:跳过前3位 's03',取12位:260505141237 解析:26(年份后两位) + 0505(月日) + 141237(时分秒) -> 2026-05-05 14:12:37 """ if not order_no or len(order_no) < 15: return "" # 跳过前3位,从第4位开始取12位 time_part = order_no[3:15] if len(time_part) != 12: _cmd_log(f"[订单号解析] 时间部分长度不对: {time_part}, 订单号: {order_no}") return "" _cmd_log(f"[订单号解析] 订单号: {order_no}, 时间部分: {time_part}") # 解析:26 0505 141237 year_suffix = time_part[0:2] # 26 month_day = time_part[2:6] # 0505 hour_min_sec = time_part[6:12] # 141237 year = 2000 + int(year_suffix) time_str = f"{year}-{month_day[0:2]}-{month_day[2:4]} {hour_min_sec[0:2]}:{hour_min_sec[2:4]}:{hour_min_sec[4:6]}" try: dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") _cmd_log(f"[订单号解析] 成功: {order_no} -> {dt}") return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: _cmd_log(f"[订单号解析] 失败: {e}, time_str={time_str}") return "" def _parse_backend_pay_time(pay_time: str): """严格解析后台查询时间""" if not pay_time: return None v = str(pay_time).strip().replace('T', ' ').replace('/', '-') if len(v) >= 19: try: return datetime.strptime(v[:19], '%Y-%m-%d %H:%M:%S') except Exception: pass if len(v) >= 16: try: return datetime.strptime(v[:16], '%Y-%m-%d %H:%M') except Exception: pass return None def _calc_time_range_from_order_no(order_no: str) -> tuple[str, str]: """ 从订单号解析时间,并计算查询时间范围。 逻辑:订单时间 - 1分钟 ~ 订单时间 + 1分钟 """ time_str = parse_datetime_from_order_no(order_no) if not time_str: raise ValueError(f"无法从订单号解析时间: {order_no}") base = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") # 开始 = 订单时间 - 1分钟 start_time = base - timedelta(minutes=1) # 结束 = 订单时间 + 1分钟 end_time = base + timedelta(minutes=1) _cmd_log(f"[时间计算] 订单号: {order_no} -> 订单时间: {base} -> 查询范围: {start_time} ~ {end_time}") return start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S') def _calc_prev_minutes_range(base: datetime, minutes: int, exact_end: bool = False) -> tuple[str, str]: if minutes < 1: minutes = 1 start_time = base - timedelta(minutes=minutes) if exact_end: end_time = base else: end_time = base.replace(second=59, microsecond=0) return start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S') def _resolve_query_base_dt(order_no: str, pay_time: str): base = _parse_backend_pay_time(pay_time) if base: return base, 'ocr_pay_time' order_time = parse_datetime_from_order_no(order_no) base = _parse_any_dt(order_time) if base: return base, 'order_no_time' return None, 'no_query_time' def _calc_primary_query_range(order_no: str, pay_time: str, range_mode: str = 'prev6') -> tuple[str, str, str]: base, source = _resolve_query_base_dt(order_no, pay_time) if not base: raise ValueError(f"无法确定后台查询时间: order={order_no}") mode = str(range_mode or 'prev6').strip().lower() if mode == 'prev6': start_time, end_time = _calc_prev_minutes_range(base, 6, exact_end=False) elif mode == 'prev6_exact': start_time, end_time = _calc_prev_minutes_range(base, 6, exact_end=True) else: start_time, end_time = _calc_time_range_from_order_no(order_no) return start_time, end_time, source def _calc_extended_time_range(pay_time: str) -> tuple[str, str]: """计算 UID 扩展查询时间范围:默认回看6分钟,结束于支付时间。""" base = _parse_backend_pay_time(pay_time) if not base: return "", "" try: lookback_minutes = int(str(setting("extended_query_lookback_minutes", "6") or "6").strip()) except Exception: lookback_minutes = 6 if lookback_minutes < 1: lookback_minutes = 1 return _calc_prev_minutes_range(base, lookback_minutes, exact_end=True) def _calc_days_window_range(pay_time: str, days: int) -> tuple[str, str]: base = _parse_backend_pay_time(pay_time) if not base: return "", "" if days < 1: days = 1 start_time = base - timedelta(days=days) end_time = base return start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S') def _extract_uid_from_text(uid_text: str) -> str: """提取 UID 数字,只取括号前的数字 示例:1458559(HAMZA K) -> 1458559 """ s = str(uid_text or '').strip() # 如果有括号,取括号前的部分 if '(' in s: left = s.split('(', 1)[0].strip() if left.isdigit(): return left # 否则提取数字 m = re.search(r"\d{5,}", s) return m.group(0) if m else "" def _extract_uid_raw(uid_text: str) -> str: """提取 UID 原始文本(包含括号内容)""" return str(uid_text or '').strip() def _parse_any_dt(value: str): """宽松解析时间字符串,失败返回 None。""" s = str(value or '').strip().replace('T', ' ').replace('/', '-') if not s: return None for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M'): try: return datetime.strptime(s[:19], fmt) except Exception: pass return None def _parse_amount(value) -> float: """宽松解析金额,兼容 1,000.00 / ₹1000 / 空值。""" s = str(value or '').strip().replace(',', '') s = re.sub(r'[^0-9.\-]', '', s) if s in ('', '-', '.', '-.'): return 0.0 try: return float(s) except Exception: return 0.0 def _extract_third_party_name(row: list) -> str: """从后台表格行提取“第三方支付”名称。""" if not row: return "" # 当前后台表通常第7列为第三方支付(索引6)。 if len(row) > 6: v = str(row[6] or '').strip() if v and v not in ('-', '--', 'N/A') and not re.fullmatch(r"\d{8,}", v): return v # 兜底:扫描整行,抓取类似 "YD3-xxx" 的支付通道名。 for cell in row: text = str(cell or '').strip() if re.search(r"[A-Za-z]{2,}\d*[-_][A-Za-z0-9_-]{2,}", text): return text return "" def _parse_backend_row(row: list) -> dict: uid_raw = row[2] if len(row) > 2 else "" order_no = row[1] if len(row) > 1 else "" status_text = (row[9] if len(row) > 9 else "").strip() amount_text = row[11] if len(row) > 11 else "0" pay_time_str = row[17] if len(row) > 17 else "" pay_time_dt = _parse_any_dt(pay_time_str) if not pay_time_dt: row_time_from_order = parse_datetime_from_order_no(order_no) pay_time_dt = _parse_any_dt(row_time_from_order) utr_text = row[6] if len(row) > 6 else "" if not utr_text or utr_text == "-": utr_text = row[15] if len(row) > 15 else "" return { 'order_no': order_no, 'uid': _extract_uid_from_text(uid_raw), 'uid_raw': _extract_uid_raw(uid_raw), 'status': status_text, 'amount': _parse_amount(amount_text), 'amount_text': amount_text, 'pay_time': pay_time_str, 'pay_time_dt': pay_time_dt, 'third_party_name': _extract_third_party_name(row), 'utr': utr_text if utr_text and utr_text != '-' else '', 'raw_row': row, } def _pick_best_duplicate_order(candidates: list[dict], base_time) -> Optional[dict]: """优先返回最接近支付时间且在其之前的成功订单;若没有之前订单则返回最近的一笔。""" if not candidates: return None if not base_time: return candidates[0] before = [] around = [] for item in candidates: dt = item.get('pay_time_dt') if not dt: continue if dt <= base_time: before.append(item) around.append(item) if before: before.sort(key=lambda x: x['pay_time_dt'], reverse=True) return before[0] if around: around.sort(key=lambda x: abs((x['pay_time_dt'] - base_time).total_seconds())) return around[0] return candidates[0] async def check_login_status(page, case_id=None) -> tuple[bool, str]: """检查后台是否已登录 返回: (是否登录, 状态描述) """ try: current_url = page.url lower_url = str(current_url or '').lower() # 检查页面是否有明显的登录元素 login_check = await page.evaluate(""" () => { // 检查是否有退出/注销按钮 const elements = Array.from(document.querySelectorAll('button, a, span, div')); const hasLogoutBtn = elements.some(el => { const text = (el.innerText || '').trim(); return text === '退出' || text === '注销' || text === 'Logout' || text === 'Sign out'; }); // 检查是否有登录表单 const hasLoginForm = !!document.querySelector('input[type="password"]'); const hasLoginBtn = elements.some(el => { const text = (el.innerText || '').trim(); return text === '登录' || text === 'Login' || text === 'Sign in'; }); // 检查是否有用户信息 const userInfo = document.querySelector('.user-info, .avatar, [class*="user"]'); return { hasLogoutBtn: hasLogoutBtn, hasLoginForm: hasLoginForm, hasLoginBtn: hasLoginBtn, hasUserInfo: !!userInfo }; } """) _cmd_log(f"[登录检查] 检查结果: {login_check} url={current_url}") # 如果有退出按钮,说明已登录 if login_check.get('hasLogoutBtn'): return True, "已登录" # 页面关键字优先判定为已登录,避免 URL 误判。 body_text = await page.locator("body").inner_text() if any(keyword in body_text for keyword in ["在线充值", "Online Recharge", "订单管理", "订单", "会员ID", "充值"]): return True, "检测到后台页面关键字" # 单独识别验证码/二步验证页面,避免报成“未登录”。 if any(keyword in body_text for keyword in ["谷歌验证码", "动态码", "Authenticator", "两步验证", "6位", "验证码"]): if any(ch.isdigit() for ch in body_text) or "验证码" in body_text: return False, "等待验证码/二步验证" # 只有同时检测到登录表单 + 登录按钮时,才判定未登录。 if login_check.get('hasLoginForm') and login_check.get('hasLoginBtn'): return False, "检测到登录表单" # 检查页面内容 if any(keyword in body_text for keyword in ["请先登录", "请登录", "登录后查看", "Unauthorized", "401"]): return False, "页面包含未登录提示" # 如果有用户信息区域,可能已登录 if login_check.get('hasUserInfo'): return True, "检测到用户信息" # URL 包含 login 且未命中后台关键字,按未登录处理。 if "login" in lower_url: _cmd_log(f"[登录检查] 当前在登录页面: {current_url}") return False, "当前在登录页面" # 默认认为已登录 return True, "假定已登录" except Exception as e: _cmd_log(f"[登录检查] 检查失败: {e}") return False, f"检查异常: {e}" async def ensure_logged_in(page, account, case_id=None) -> tuple[bool, str]: """确保后台账号已登录,如果未登录则尝试自动登录""" # 先检查当前是否已登录 is_logged, status = await check_login_status(page, case_id) if is_logged: return True, status if status and ('验证码' in status or '二步' in status): _run_log(case_id, "登录检查", f"后台账号待验证: {status}", "警告") await _capture_login_debug_artifacts(page, case_id=case_id, reason=f"待验证:{status}") return False, status _run_log(case_id, "登录检查", f"后台账号未登录: {status},尝试自动登录", "警告") try: # 获取登录信息 username = account.get("username", "") password = account.get("password", "") login_url = account.get("login_url") or setting("backend_login_url", "") online_url = account.get("online_url") or setting("backend_online_url", "") if not username or not password: _run_log(case_id, "登录检查", "缺少用户名或密码,无法自动登录", "失败") return False, "缺少用户名或密码" # 优先跳转业务页验证会话,避免一上来强制落到登录页。 target = online_url or login_url if target: await page.goto(target, wait_until="domcontentloaded", timeout=30000) await page.wait_for_timeout(2000) # 如果跳转后已经在线,直接返回。 is_logged, status_after_nav = await check_login_status(page, case_id) if is_logged: return True, status_after_nav if status_after_nav and ('验证码' in status_after_nav or '二步' in status_after_nav): return False, status_after_nav # 填写用户名 user_selectors = [ "input[placeholder*='用户名']", "input[placeholder*='账号']", "input[name='username']", "input[type='text']:first-of-type" ] user_filled = False for sel in user_selectors: loc = page.locator(sel) if await loc.count(): await loc.first.fill(username) user_filled = True break if not user_filled: await _capture_login_debug_artifacts(page, case_id=case_id, reason="自动登录失败:未找到用户名输入框") return False, "未找到用户名输入框" # 填写密码 pwd_selectors = [ "input[placeholder*='密码']", "input[name='password']", "input[type='password']" ] pwd_filled = False for sel in pwd_selectors: loc = page.locator(sel) if await loc.count(): await loc.first.fill(password) pwd_filled = True break if not pwd_filled: await _capture_login_debug_artifacts(page, case_id=case_id, reason="自动登录失败:未找到密码输入框") return False, "未找到密码输入框" # 点击登录按钮 login_selectors = [ "button:has-text('登录')", "button:has-text('Login')", "button[type='submit']" ] login_clicked = False for sel in login_selectors: loc = page.locator(sel) if await loc.count(): await loc.first.click() login_clicked = True break if not login_clicked: await _capture_login_debug_artifacts(page, case_id=case_id, reason="自动登录失败:未找到登录按钮") return False, "未找到登录按钮" await page.wait_for_timeout(3000) # 检查登录是否成功 is_logged, new_status = await check_login_status(page, case_id) if is_logged: _run_log(case_id, "登录检查", "自动登录成功", "成功") return True, "自动登录成功" else: if new_status and ('验证码' in new_status or '二步' in new_status): _run_log(case_id, "登录检查", f"自动登录后待验证: {new_status}", "警告") await _capture_login_debug_artifacts(page, case_id=case_id, reason=f"自动登录后待验证:{new_status}") return False, new_status _run_log(case_id, "登录检查", f"自动登录失败: {new_status}", "失败") await _capture_login_debug_artifacts(page, case_id=case_id, reason=f"自动登录失败:{new_status}") return False, f"自动登录失败: {new_status}" except Exception as e: _run_log(case_id, "登录检查", f"登录异常: {e}", "失败") await _capture_login_debug_artifacts(page, case_id=case_id, reason=f"自动登录异常:{e}") return False, f"登录异常: {e}" async def _js_fill_identity(page, order_no: str = "", uid: str = ""): """填写查询身份条件:订单号和 UID 强制互斥。订单号优先。""" order_no = str(order_no or "").strip() uid = str(uid or "").strip() if order_no: uid = "" elif uid: order_no = "" try: return await page.evaluate( """ ({orderNo, uid}) => { const visible = (el) => { const r = el.getBoundingClientRect(); const st = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && st.display !== 'none' && st.visibility !== 'hidden'; }; const setVal = (node, val) => { if (!node) return false; const proto = node instanceof HTMLTextAreaElement ? HTMLTextAreaElement.prototype : HTMLInputElement.prototype; const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set; node.scrollIntoView({block:'center', inline:'center'}); try { node.click(); } catch(e) {} node.focus(); if (setter) setter.call(node, ''); else node.value = ''; node.setAttribute('value', ''); try { node.dispatchEvent(new InputEvent('input', {bubbles:true, inputType:'deleteContentBackward', data:null})); } catch(e) { node.dispatchEvent(new Event('input', {bubbles:true})); } node.dispatchEvent(new Event('change', {bubbles:true})); if (setter) setter.call(node, val); else node.value = val; node.setAttribute('value', val); try { node.dispatchEvent(new InputEvent('input', {bubbles:true, inputType: val ? 'insertText' : 'deleteContentBackward', data: val || null})); } catch(e) { node.dispatchEvent(new Event('input', {bubbles:true})); } node.dispatchEvent(new Event('change', {bubbles:true})); node.dispatchEvent(new KeyboardEvent('keydown', {bubbles:true, key:'Enter', code:'Enter', keyCode:13, which:13})); node.dispatchEvent(new KeyboardEvent('keyup', {bubbles:true, key:'Enter', code:'Enter', keyCode:13, which:13})); node.blur(); node.dispatchEvent(new Event('blur', {bubbles:true})); return true; }; const uidDiv = document.querySelector('#uid'); let uidInput = null; if (uidDiv) { uidInput = uidDiv.querySelector('input.arco-input, input'); } const orderDiv = document.querySelector('#order_id'); let orderInput = null; if (orderDiv) { orderInput = orderDiv.querySelector('input.arco-input, input'); } if (!uidInput && !orderInput) { return {ok: false, error: 'uid_and_order_inputs_not_found'}; } if (orderNo && !orderInput) return {ok: false, error: 'order_input_not_found'}; if (uid && !uidInput) return {ok: false, error: 'uid_input_not_found'}; if (uidInput) setVal(uidInput, ''); if (orderInput) setVal(orderInput, ''); if (orderNo) setVal(orderInput, orderNo); else if (uid) setVal(uidInput, uid); return { ok: true, mode: orderNo ? 'order_no' : (uid ? 'uid' : 'empty'), orderValue: orderInput ? (orderInput.value || '') : null, uidValue: uidInput ? (uidInput.value || '') : null, }; } """, {"orderNo": order_no, "uid": uid}, ) except Exception as e: return {"ok": False, "error": str(e)} async def _js_fill_order_no(page, order_no: str): """兼容旧调用:按订单号查询时,JS 填订单号并清空 UID。""" return await _js_fill_identity(page, order_no=order_no, uid="") async def _force_query_with_time_range( page, order_no: str, uid: str, start_time: str, end_time: str, case_id=None ) -> dict: """ 通过拦截并修改 GET 请求,强制替换下单时间参数。 支持按订单号或 UID 查询;两者互斥。 """ _run_log(case_id, "请求拦截", f"强制设置时间范围: {start_time} ~ {end_time}") order_no = str(order_no or '').strip() uid = str(uid or '').strip() if order_no: uid = "" try: extended_page_size = int(str(setting("extended_query_page_size", "200") or "200").strip()) except Exception: extended_page_size = 200 if extended_page_size < 20: extended_page_size = 20 request_intercepted = False async def handle_route(route): nonlocal request_intercepted request = route.request url = request.url method = request.method # 拦截充值列表的 GET 请求 if method == "GET" and "/api/recharge/online/list" in url: _cmd_log(f"[请求拦截] 🎯 捕获 GET 请求") request_intercepted = True parsed = urlparse(url) params = parse_qs(parsed.query) # 构建新参数,替换 create_time new_params = [] for key, values in params.items(): if key == "create_time": new_params.append(("create_time", start_time)) new_params.append(("create_time", end_time)) _cmd_log(f"[请求拦截] ✏️ 替换 create_time: {start_time} 和 {end_time}") elif key == "pageSize" and uid and not order_no: new_params.append(("pageSize", str(extended_page_size))) elif key == "current" and uid and not order_no: new_params.append(("current", "1")) elif key == "order_id" and not order_no: continue elif key in ("uid", "member_id"): if uid: new_params.append((key, uid)) continue else: for v in values: new_params.append((key, v)) if order_no: new_params.append(("order_id", order_no)) elif uid and not any(k in ("uid", "member_id") for k, _ in new_params): new_params.append(("uid", uid)) new_query = urlencode(new_params) new_url = urlunparse(parsed._replace(query=new_query)) _cmd_log(f"[请求拦截] ✅ 修改后 URL: {new_url[:150]}...") _cmd_log(f"[请求拦截] ✅ 修改后参数: {new_query}") response = await route.fetch(url=new_url, method=method, headers=request.headers) await route.fulfill(response=response) return await route.continue_() await page.route("**/*", handle_route) try: await page.wait_for_timeout(1000) # 先设置页面上的时间输入框 await page.evaluate(f""" () => {{ const picker = document.querySelector('.arco-picker-range'); if (picker) {{ const inputs = picker.querySelectorAll('input'); if (inputs.length >= 2) {{ inputs[0].value = '{start_time}'; inputs[0].dispatchEvent(new Event('input', {{bubbles: true}})); inputs[0].dispatchEvent(new Event('change', {{bubbles: true}})); inputs[1].value = '{end_time}'; inputs[1].dispatchEvent(new Event('input', {{bubbles: true}})); inputs[1].dispatchEvent(new Event('change', {{bubbles: true}})); }} }} }} """) identity_ret = await _js_fill_identity(page, order_no=order_no, uid=uid) if not identity_ret.get("ok"): return {"ok": False, "error": f"填写订单号/UID失败: {identity_ret}", "order_no": order_no, "uid": uid} if order_no: _run_log(case_id, "页面行为", f"填写订单号: {order_no}") elif uid: _run_log(case_id, "页面行为", f"扩展查询,填写UID: {uid}") else: _run_log(case_id, "页面行为", "扩展查询,订单号和UID都为空") await page.wait_for_timeout(500) # 点击查询按钮 _run_log(case_id, "页面行为", "点击查询按钮(拦截模式)") # 先尝试 Playwright 点击 query_btn = page.locator("button:has-text('查询')").first if await query_btn.count(): await query_btn.click() _cmd_log("[点击查询] 使用 Playwright 点击") else: await page.evaluate("""() => { const btns = Array.from(document.querySelectorAll('button')); const q = btns.find(b => b.innerText.includes('查询')); if (q) q.click(); }""") _cmd_log("[点击查询] 使用 JS 点击") # 等待请求发送 await page.wait_for_timeout(3000) if not request_intercepted: _cmd_log("[请求拦截] ⚠️ 未拦截到请求") return {"ok": False, "error": "未拦截到请求", "order_no": order_no, "uid": uid} # 获取表格数据 await page.wait_for_timeout(2000) rows = await _dump_rows_preview(page, max_rows=50) matched_row = None if order_no: for row in rows: if any(str(order_no) in str(cell) for cell in row): matched_row = row break return { "ok": True, "order_no": order_no, "uid": uid, "matched": matched_row is not None, "rows": rows, "matched_row": matched_row } except Exception as e: _cmd_log(f"[请求拦截] ❌ 异常: {e}") return {"ok": False, "error": str(e), "order_no": order_no, "uid": uid} finally: await page.unroute("**/*", handle_route) async def _dump_rows_preview(page, max_rows: int = 50): """获取表格预览数据""" try: rows = await page.evaluate( """ (maxRows) => { const selectors = [ '.el-table__body-wrapper table tbody tr', '.el-table__body tbody tr', 'table tbody tr', '.ant-table-tbody > tr', '.arco-table-tbody tr' ]; let trs = []; for (const sel of selectors) { trs = Array.from(document.querySelectorAll(sel)); if (trs.length) break; } const isVisible = (el) => { const r = el.getBoundingClientRect(); const st = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0'; }; return trs .filter(tr => isVisible(tr)) .slice(0, maxRows) .map(tr => Array.from(tr.querySelectorAll('td')).map(td => (td.textContent || '').replace(/\s+/g,' ').trim())) .filter(r => r.length); } """, max_rows, ) return rows or [] except Exception: return [] async def query_order_status_on_page(page, order_no: str, pay_time: str = '', range_mode: str = 'prev6', case_id=None, account: Optional[Dict[str, Any]] = None) -> dict: """订单查询 - 使用请求拦截模式,完全绕过 UI 时间选择器""" _query_total_t = _tick() result = { "order_no": order_no, "status": "unknown", "matched_order_no": "", "amount": None, "uid": "", "uid_raw": "", "raw_text": "", "time_range_applied": "", "rows_preview": [], "same_amount_order": None, "utr": "", "pay_time": "", "third_party_name": "", "order_time": "", "drop_notice": "", "same_amount_success_orders": [], "same_amount_non_success_orders": [], } try: _run_log(case_id, "后台查询开始", f"order={order_no}") # ========== 检查登录状态 ========== is_logged, login_status = await check_login_status(page, case_id) if not is_logged and account: _run_log(case_id, "登录检查", f"检测到未登录({login_status}),尝试自动重新登录", "警告") relogin_ok, relogin_msg = await ensure_logged_in(page, account, case_id) if relogin_ok: is_logged = True login_status = relogin_msg or login_status else: login_status = f"{login_status}; 自动重登失败: {relogin_msg}" if not is_logged: result["status"] = "not_logged_in" result["error"] = f"后台账号未登录: {login_status}" artifacts = await _capture_login_debug_artifacts( page, case_id=case_id, order_no=order_no, reason=f"query_order_status_not_logged_in:{login_status}", ) _run_log(case_id, "登录检查", f"后台账号未登录: {login_status},停止查询", "失败") _run_log(case_id, "登录检查", f"登录留证: {artifacts.get('meta', '')}", "警告") msg = f"""【查询失败】 订单号:{order_no} 原因:后台账号未登录 ({login_status}) 请先登录后再查询""" from ocr_engine import add_run_log add_run_log("掉单群通知", msg, "需人工处理", case_id) result["drop_notice"] = msg return result _run_log(case_id, "登录检查", f"登录状态正常: {login_status}", "成功") # 主单查询优先走 OCR 支付时间;没有时才回退订单号时间。 try: start_time, end_time, time_source = _calc_primary_query_range(order_no, pay_time, range_mode=range_mode) result["order_time"] = parse_datetime_from_order_no(order_no) _run_log(case_id, "查询条件", f"下单时间范围: {start_time} ~ {end_time} source={time_source} mode={range_mode or 'prev6'}") result["time_range_applied"] = f"{start_time} ~ {end_time}" except ValueError as e: result["status"] = "error" result["error"] = str(e) _run_log(case_id, "查询异常", result["error"], "失败") msg = f"""【查询结果】 订单号:{order_no} UTR:- 金额:- 时间:- 状态:未识别 未识别到订单号/UTR,请人工补充。""" from ocr_engine import add_run_log add_run_log("掉单群通知", msg, "需人工处理", case_id) result["drop_notice"] = msg return result # 使用请求拦截模式进行查询 query_result = await _force_query_with_time_range( page, order_no, "", start_time, end_time, case_id ) if not query_result.get("ok"): result["status"] = "error" result["error"] = query_result.get("error", "拦截查询失败") _run_log(case_id, "查询异常", result["error"], "失败") msg = f"""【查询失败】 订单号:{order_no} 原因:{result['error']}""" from ocr_engine import add_run_log add_run_log("掉单群通知", msg, "需人工处理", case_id) result["drop_notice"] = msg return result # 解析查询结果 rows = query_result.get("rows", []) matched_row = query_result.get("matched_row") result["rows_preview"] = rows # ========== 情况1:没有找到订单 ========== if not matched_row: result["status"] = "not_found" result["matched_order_no"] = "" _run_log(case_id, "查询结果", f"未找到订单 {order_no}") msg = f"""【查询结果】 订单号:{order_no} UTR:- 金额:- 时间:- 状态:未识别 未识别到订单号/UTR,请人工补充。""" from ocr_engine import add_run_log add_run_log("掉单群通知", msg, "需人工处理", case_id) result["drop_notice"] = msg return result # ========== 找到订单,解析基本信息 ========== result["matched_order_no"] = order_no # 解析 UID(只取括号前的数字) matched_info = _parse_backend_row(matched_row) result["uid_raw"] = matched_info["uid_raw"] result["uid"] = matched_info["uid"] result["amount"] = matched_info["amount"] result["backend_status_text"] = matched_info["status"] result["pay_time"] = matched_info["pay_time"] result["utr"] = matched_info["utr"] result["third_party_name"] = matched_info["third_party_name"] status_text = matched_info["status"] _run_log(case_id, "查询命中", f"首次命中金额: order={order_no} amount={result['amount']} status={status_text}") # ========== 情况2:订单是成功状态 ========== success_status = ["已支付", "人工到账", "success", "paid"] if status_text in success_status: result["status"] = "success" _run_log(case_id, "查询命中", f"订单成功: order={order_no} uid={result['uid']} amount={result['amount']} status={status_text}") msg = f"""【查询结果】 订单号:{order_no} UTR:{result['utr'] or '-'} 金额:{result['amount']} 时间:{result['pay_time'] or result['order_time']} 状态:{status_text}""" from ocr_engine import add_run_log add_run_log("查询结果", msg, "成功", case_id) return result # ========== 情况3:订单是非成功状态 ========== result["status"] = "not_success" _run_log(case_id, "查询命中", f"订单非成功状态: order={order_no} status={status_text},开始查找同金额成功订单") # 扩展查询必须用 OCR 识别时间,不允许回退到订单号解析时间 base_time = _parse_backend_pay_time(pay_time) found_duplicate = False duplicate_order_no = "" if not base_time: _run_log(case_id, "扩展查询", "OCR识别时间为空,跳过扩展查询(不允许回退订单号时间)", "失败") if base_time and result["uid"]: extended_start, extended_end = _calc_extended_time_range(base_time.strftime("%Y-%m-%d %H:%M:%S")) if extended_start and extended_end: _run_log( case_id, "扩展查询", f"查找同时间段同金额订单: {extended_start} ~ {extended_end}, UID={result['uid']} lookback_minutes={setting('extended_query_lookback_minutes', '120')} page_size={setting('extended_query_page_size', '200')}" ) extended_result = await _force_query_with_time_range( page, "", result["uid"], extended_start, extended_end, case_id ) if extended_result.get("ok"): extended_rows = extended_result.get("rows", []) _run_log(case_id, "扩展查询结果", f"找到 {len(extended_rows)} 条订单") target_uid = result["uid"] target_amount = result["amount"] candidate_orders = [] non_success_same_amount_orders = [] for row in extended_rows: if len(row) < 12: continue info = _parse_backend_row(row) row_uid = info['uid'] row_amount = info['amount'] row_amount_str = info['amount_text'] row_status = info['status'] row_order_no = info['order_no'] row_third_name = info['third_party_name'] row_pay_time = info['pay_time'] row_pay_dt = info['pay_time_dt'] _run_log( case_id, "扩展查询明细", f"订单号={row_order_no} 三方={row_third_name or '-'} 金额原文={row_amount_str} 金额解析={row_amount} 状态={row_status}" ) if (row_uid == target_uid and abs(row_amount - target_amount) < 0.01 and row_amount > 0): if row_order_no == order_no: continue item = { "order_no": row_order_no, "amount": row_amount_str, "status": row_status, "third_party_name": row_third_name, "pay_time": row_pay_time, "pay_time_dt": row_pay_dt, } if row_status in success_status: candidate_orders.append(item) _run_log(case_id, "找到候选订单", f"订单号: {row_order_no} 金额: {row_amount_str} 状态: {row_status}") else: if row_order_no: non_success_same_amount_orders.append(item) if non_success_same_amount_orders: # 透传给流程层用于“同金额非成功订单逐单发三方”。 seen_non_success = set() uniq_non_success = [] for item in non_success_same_amount_orders: o = str(item.get("order_no") or "").strip() if not o or o in seen_non_success: continue seen_non_success.add(o) uniq_non_success.append(item) result["same_amount_non_success_orders"] = uniq_non_success _run_log(case_id, "同金额非成功候选", f"找到 {len(uniq_non_success)} 条: {[x.get('order_no') for x in uniq_non_success[:50]]}") if candidate_orders: seen_success = set() uniq_success = [] for item in candidate_orders: o = str(item.get("order_no") or "").strip() if not o or o in seen_success: continue seen_success.add(o) uniq_success.append(item) result["same_amount_success_orders"] = uniq_success _run_log(case_id, "同金额成功候选", f"找到 {len(uniq_success)} 条: {[x.get('order_no') for x in uniq_success[:50]]}") if candidate_orders: matched_same_amount = _pick_best_duplicate_order(candidate_orders, base_time) or candidate_orders[0] found_duplicate = True duplicate_order_no = matched_same_amount["order_no"] result["same_amount_order"] = matched_same_amount result["same_amount_order_no"] = duplicate_order_no msg = f"""【查询结果】 订单号:{order_no} UTR:{result['utr'] or '-'} 金额:{result['amount']} 时间:{result['pay_time'] or result['order_time']} 备注:该会员同时间段已有相同金额订单到账,订单号:{duplicate_order_no}""" from ocr_engine import add_run_log add_run_log("掉单群通知", msg, "需人工处理", case_id) _run_log(case_id, "掉单群通知", msg) result["drop_notice"] = msg result["status"] = "duplicate_payment" if not found_duplicate: msg = f"""【查询结果】 订单号:{order_no} UTR:{result['utr'] or '-'} 金额:{result['amount']} 时间:{result['pay_time'] or result['order_time']} 状态:{status_text}""" from ocr_engine import add_run_log add_run_log("查询结果", msg, f"状态={status_text}", case_id) _cost("后台查询总耗时", _query_total_t) return result except Exception as e: result["status"] = "error" result["error"] = str(e) _run_log(case_id, "查询异常", f"order={order_no} error={e}", "失败") _cost("后台查询总耗时", _query_total_t) return result async def query_uid_orders_on_page(page, uid: str, pay_time: str, days: int = 90, case_id=None, account: Optional[Dict[str, Any]] = None) -> dict: result = { 'ok': False, 'uid': str(uid or '').strip(), 'orders': [], 'error': '', 'time_range_applied': '', } uid = str(uid or '').strip() if not uid: result['error'] = 'uid为空' return result is_logged, login_status = await check_login_status(page, case_id) if not is_logged and account: _run_log(case_id, '登录检查', f"90天查询检测到未登录({login_status}),尝试自动重新登录", '警告') relogin_ok, relogin_msg = await ensure_logged_in(page, account, case_id) if relogin_ok: is_logged = True else: login_status = f"{login_status}; 自动重登失败: {relogin_msg}" if not is_logged: result['error'] = f"后台账号未登录: {login_status}" artifacts = await _capture_login_debug_artifacts( page, case_id=case_id, order_no='', reason=f"query_uid_orders_not_logged_in:{login_status}", ) _run_log(case_id, '登录检查', f"登录留证: {artifacts.get('meta', '')}", '警告') return result start_time, end_time = _calc_days_window_range(pay_time, days) if not start_time or not end_time: result['error'] = '支付时间为空,无法执行90天查询' return result result['time_range_applied'] = f"{start_time} ~ {end_time}" _run_log(case_id, '90天扩展查询', f"UID={uid} time={result['time_range_applied']}") query_result = await _force_query_with_time_range(page, '', uid, start_time, end_time, case_id) if not query_result.get('ok'): result['error'] = query_result.get('error') or '90天扩展查询失败' return result base_time = _parse_backend_pay_time(pay_time) seen = set() orders = [] for row in query_result.get('rows', []): if len(row) < 12: continue info = _parse_backend_row(row) order_no = str(info.get('order_no') or '').strip() if not order_no or order_no in seen: continue if info.get('uid') != uid: continue pay_dt = info.get('pay_time_dt') if base_time and pay_dt and pay_dt > base_time: continue seen.add(order_no) orders.append(info) orders.sort(key=lambda x: x.get('pay_time_dt') or datetime.min, reverse=True) result['ok'] = True result['orders'] = orders return result async def perform_manual_reissue(page, order_no: str, remark: str = "bd") -> dict: """点击审核 -> 人工到账 -> 备注bd -> Commit/确认/确定。""" try: row = None if order_no and await page.locator(f"tr:has-text('{order_no}')").count(): row = page.locator(f"tr:has-text('{order_no}')").first target = row if row else page clicked = False for sel in ["text=审核", "button:has-text('审核')", "a:has-text('审核')"]: loc = target.locator(sel) if await loc.count(): await loc.first.click() clicked = True break if not clicked: raise RuntimeError("未找到审核按钮") await page.wait_for_timeout(800) dialog = page.locator(".el-dialog:visible, .modal:visible, [role='dialog']:visible, .arco-modal:visible").first if not await dialog.count(): dialog = page for sel in ["text=请选择", ".arco-select", ".el-select", "input[placeholder*='请选择']"]: if await dialog.locator(sel).count(): await dialog.locator(sel).first.click() await page.wait_for_timeout(300) break for sel in ["text=人工到账", ".arco-select-option:has-text('人工到账')", ".el-select-dropdown:visible text=人工到账"]: if await page.locator(sel).count(): await page.locator(sel).last.click() await page.wait_for_timeout(300) break filled = False for sel in ["textarea[placeholder*='备注']", "input[placeholder*='备注']", "textarea", "input[name*='remark']"]: if await dialog.locator(sel).count(): await dialog.locator(sel).first.fill(remark) filled = True break btn = dialog.locator("button:has-text('Commit'), button:has-text('确认'), button:has-text('确定'), button:has-text('提交')") if not await btn.count(): btn = page.locator("button:has-text('Commit'), button:has-text('确认'), button:has-text('确定'), button:has-text('提交')") await btn.first.click() await page.wait_for_timeout(1500) return {"ok": True, "order_no": order_no, "remark_filled": filled} except Exception as e: return {"ok": False, "order_no": order_no, "error": str(e)}
保存代码
重新加载
最近备份
文件
时间
操作
暂无备份