| |
|
| |
|
| | import json
|
| | import threading
|
| | import queue
|
| | from collections import deque
|
| | from dataclasses import dataclass, field
|
| | from typing import Dict, Optional, Tuple
|
| | from playwright._impl._api_structures import StorageState
|
| |
|
| | import cv2
|
| | import numpy as np
|
| | from playwright.sync_api import sync_playwright, Page, BrowserContext
|
| | from playwright_stealth import Stealth
|
| | from PIL import Image, ImageDraw
|
| | from io import BytesIO
|
| |
|
| |
|
| |
|
| |
|
| | task_queue: "queue.Queue[dict]" = queue.Queue()
|
| | result_queue: "queue.Queue[Tuple[str, Optional[Image.Image]]]" = queue.Queue()
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | @dataclass
|
| | class BrowserState:
|
| | running: bool = False
|
| | thread: Optional[threading.Thread] = None
|
| | pages: Dict[str, Page] = field(default_factory=dict)
|
| | active_page: Optional[str] = None
|
| | network_logs: deque = field(default_factory=lambda: deque(maxlen=500))
|
| | console_logs: deque = field(default_factory=lambda: deque(maxlen=500))
|
| | recording: bool = False
|
| | macro: list = field(default_factory=list)
|
| | tab_counter: int = 0
|
| |
|
| |
|
| | BROWSER_STATE = BrowserState()
|
| | BROWSER_LOCK = threading.Lock()
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def take_screenshot(page: Optional[Page]) -> Optional[Image.Image]:
|
| | if page is None:
|
| | return None
|
| | try:
|
| | img_bytes = page.screenshot()
|
| | return Image.open(BytesIO(img_bytes))
|
| | except Exception:
|
| | return None
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class PlaywrightWorker:
|
| | def __init__(self, state: BrowserState, storage_state: StorageState=None):
|
| | self.state = state
|
| | self.context: Optional[BrowserContext] = None
|
| | self.storage_state = storage_state
|
| |
|
| | def run(self):
|
| | with Stealth().use_sync(sync_playwright()) as p:
|
| | browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
|
| | self.context = browser.new_context(storage_state=self.storage_state)
|
| | self._attach_network_listeners(self.context)
|
| |
|
| |
|
| | self._create_new_tab()
|
| |
|
| |
|
| | handlers = {
|
| | "eval": self.handle_eval,
|
| | "goto": self.handle_goto,
|
| | "click": self.handle_click,
|
| | "click_xy": self.handle_click_xy,
|
| | "type": self.handle_type,
|
| | "new_tab": self.handle_new_tab,
|
| | "close_tab": self.handle_close_tab,
|
| | "switch_tab": self.handle_switch_tab,
|
| | "inspect": self.handle_inspect,
|
| | "get_network_logs": self.handle_get_network_logs,
|
| | "get_console_logs": self.handle_get_console_logs,
|
| | "clear_logs": self.handle_clear_logs,
|
| | "start_record": self.handle_start_record,
|
| | "stop_record": self.handle_stop_record,
|
| | "play_macro": self.handle_play_macro,
|
| | "take_screenshot": self.handle_take_screenshot,
|
| | "find_template": self.handle_find_template,
|
| | }
|
| |
|
| | while True:
|
| | task = task_queue.get()
|
| | cmd = task.get("cmd")
|
| |
|
| | if cmd == "__EXIT__":
|
| | break
|
| |
|
| | result_text = ""
|
| | screenshot = None
|
| |
|
| | try:
|
| | page = self._get_active_page()
|
| |
|
| |
|
| | recordable_cmds = {
|
| | "goto", "click", "type", "new_tab",
|
| | "close_tab", "switch_tab"
|
| | }
|
| | if (
|
| | not task.get("from_macro", False)
|
| | and cmd in recordable_cmds
|
| | ):
|
| | with BROWSER_LOCK:
|
| | if self.state.recording:
|
| | rec = {k: v for k, v in task.items() if k != "from_macro"}
|
| | self.state.macro.append(rec)
|
| |
|
| | handler = handlers.get(cmd, None)
|
| | if handler is None:
|
| | result_text = f"Unknown command: {cmd}"
|
| | screenshot = take_screenshot(page)
|
| | else:
|
| | result_text, screenshot = handler(task, page)
|
| |
|
| | except Exception as e:
|
| | result_text = f"Error: {type(e).__name__}: {e}"
|
| |
|
| | result_queue.put((result_text, screenshot))
|
| |
|
| |
|
| | try:
|
| | browser.close()
|
| | except Exception:
|
| | pass
|
| |
|
| |
|
| | def _attach_console_listener(self, page: Page):
|
| | def on_console(msg):
|
| | with BROWSER_LOCK:
|
| | self.state.console_logs.append(f"[{msg.type}] {msg.text}")
|
| | page.on("console", on_console)
|
| |
|
| | def _attach_network_listeners(self, context: BrowserContext):
|
| | def on_request(request):
|
| | with BROWSER_LOCK:
|
| | self.state.network_logs.append(f"[REQUEST] {request.method} {request.url}")
|
| |
|
| | def on_response(response):
|
| | with BROWSER_LOCK:
|
| | self.state.network_logs.append(f"[RESPONSE] {response.status} {response.url}")
|
| |
|
| | context.on("request", on_request)
|
| | context.on("response", on_response)
|
| |
|
| |
|
| | def _create_new_tab(self) -> Tuple[str, Page]:
|
| | page = self.context.new_page()
|
| | with BROWSER_LOCK:
|
| | self.state.tab_counter += 1
|
| | tab_name = f"Tab-{self.state.tab_counter}"
|
| | self.state.pages[tab_name] = page
|
| | self.state.active_page = tab_name
|
| | self._attach_console_listener(page)
|
| | return tab_name, page
|
| |
|
| | def _get_active_page(self) -> Optional[Page]:
|
| | with BROWSER_LOCK:
|
| | name = self.state.active_page
|
| | page = self.state.pages.get(name) if name else None
|
| | return page
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def handle_eval(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| |
|
| | code = task.get("code", "")
|
| | safe_globals = {"__builtins__": {}}
|
| | safe_locals = {}
|
| |
|
| | try:
|
| | result = eval(code, safe_globals, safe_locals)
|
| | except Exception as e:
|
| | return f"Eval error: {type(e).__name__}: {e}", take_screenshot(page)
|
| |
|
| | text = f"Eval result: {result!r}"
|
| | return text, take_screenshot(page)
|
| |
|
| | def handle_goto(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| | url = task.get("url", "")
|
| | try:
|
| | page.goto(url)
|
| | return f"Navigated to {url}", take_screenshot(page)
|
| | except Exception as e:
|
| | return f"Goto error: {e}", take_screenshot(page)
|
| |
|
| | def handle_click(self, task: dict, page: Optional[Page]):
|
| | """
|
| | CSS Selector Based Click (NOT XY click)
|
| | Used for:
|
| | β’ Normal click button
|
| | β’ Macro playback
|
| | """
|
| | if page is None:
|
| | return "No active page.", None
|
| |
|
| | selector = task.get("selector", "")
|
| | if not selector:
|
| | return "No selector provided.", take_screenshot(page)
|
| |
|
| | try:
|
| | page.wait_for_selector(selector, timeout=5000)
|
| | page.click(selector)
|
| | return f"Clicked selector: {selector}", take_screenshot(page)
|
| |
|
| | except Exception as e:
|
| | return f"Click error on '{selector}': {e}", take_screenshot(page)
|
| |
|
| | def handle_click_xy(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| |
|
| | x = task.get("x")
|
| | y = task.get("y")
|
| | img_w = task.get("img_w")
|
| | img_h = task.get("img_h")
|
| | click_type = task.get("click_type", "left")
|
| |
|
| |
|
| | vp = page.viewport_size or {"width": img_w, "height": img_h}
|
| |
|
| | real_x = x * (vp["width"] / img_w)
|
| | real_y = y * (vp["height"] / img_h)
|
| |
|
| | if click_type == "left":
|
| | page.mouse.click(real_x, real_y)
|
| | elif click_type == "double":
|
| | page.mouse.dblclick(real_x, real_y)
|
| | elif click_type == "right":
|
| | page.mouse.click(real_x, real_y, button="right")
|
| | elif click_type == "hover":
|
| | page.mouse.move(real_x, real_y)
|
| |
|
| | return f"{click_type} click at {real_x},{real_y}", take_screenshot(page)
|
| |
|
| | def handle_type(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| | selector = task.get("selector", "")
|
| | text = task.get("text", "")
|
| | try:
|
| | page.fill(selector, text)
|
| | return f"Typed into {selector}: {text}", take_screenshot(page)
|
| | except Exception as e:
|
| | return f"Type error: {e}", take_screenshot(page)
|
| |
|
| | def handle_new_tab(self, task: dict, page: Optional[Page]):
|
| | tab_name, new_page = self._create_new_tab()
|
| | return f"Opened new tab: {tab_name}", take_screenshot(new_page)
|
| |
|
| | def handle_close_tab(self, task: dict, page: Optional[Page]):
|
| | name = task.get("tab", "")
|
| | with BROWSER_LOCK:
|
| | if name in self.state.pages:
|
| | try:
|
| | self.state.pages[name].close()
|
| | except Exception:
|
| | pass
|
| | del self.state.pages[name]
|
| | msg = f"Closed {name}"
|
| | if self.state.active_page == name:
|
| | if self.state.pages:
|
| | self.state.active_page = list(self.state.pages.keys())[0]
|
| | else:
|
| | self.state.active_page = None
|
| | else:
|
| | msg = f"Tab {name} not found."
|
| |
|
| | active_name = self.state.active_page
|
| | active_page = self.state.pages.get(active_name) if active_name else None
|
| |
|
| | return msg, take_screenshot(active_page)
|
| |
|
| | def handle_switch_tab(self, task: dict, page: Optional[Page]):
|
| | name = task.get("tab", "")
|
| | with BROWSER_LOCK:
|
| | if name in self.state.pages:
|
| | self.state.active_page = name
|
| | active_page = self.state.pages[name]
|
| | msg = f"Switched to {name}"
|
| | else:
|
| | active_page = self._get_active_page()
|
| | msg = f"Tab {name} not found."
|
| |
|
| | return msg, take_screenshot(active_page)
|
| |
|
| | def handle_inspect(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| | selector = task.get("selector", "")
|
| | try:
|
| | el = page.query_selector(selector)
|
| | if not el:
|
| | return f"No element found for selector: {selector}", take_screenshot(page)
|
| |
|
| | inner_text = el.inner_text()
|
| | inner_html = el.inner_html()
|
| | attrs = page.evaluate(
|
| | """(el) => {
|
| | const out = {};
|
| | for (const a of el.attributes) out[a.name] = a.value;
|
| | return out;
|
| | }""",
|
| | el
|
| | )
|
| | xpath = page.evaluate(
|
| | """(el) => {
|
| | function getXPath(node) {
|
| | if (node.id)
|
| | return 'id(\"' + node.id + '\")';
|
| | if (node === document.body)
|
| | return '/html/body';
|
| | let ix = 0;
|
| | const siblings = node.parentNode ? node.parentNode.childNodes : [];
|
| | for (let i=0; i<siblings.length; i++) {
|
| | const sibling = siblings[i];
|
| | if (sibling === node)
|
| | return getXPath(node.parentNode) + '/' + node.tagName.toLowerCase() + '[' + (ix+1) + ']';
|
| | if (sibling.nodeType === 1 && sibling.tagName === node.tagName)
|
| | ix++;
|
| | }
|
| | }
|
| | return getXPath(el);
|
| | }""",
|
| | el
|
| | )
|
| |
|
| | info = (
|
| | f"Selector: {selector}\n"
|
| | f"XPath: {xpath}\n\n"
|
| | f"Inner Text:\n{inner_text}\n\n"
|
| | f"Attributes:\n{attrs}\n\n"
|
| | f"Inner HTML (truncated):\n{inner_html[:1000]}"
|
| | )
|
| | return info, take_screenshot(page)
|
| | except Exception as e:
|
| | return f"Inspect error: {e}", take_screenshot(page)
|
| |
|
| | def handle_get_network_logs(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | logs = list(self.state.network_logs)[-100:]
|
| | text = "=== Network Logs (last 100) ===\n" + "\n".join(logs)
|
| | return text, take_screenshot(page)
|
| |
|
| | def handle_get_console_logs(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | logs = list(self.state.console_logs)[-100:]
|
| | text = "=== Console Logs (last 100) ===\n" + "\n".join(logs)
|
| | return text, take_screenshot(page)
|
| |
|
| | def handle_clear_logs(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | self.state.network_logs.clear()
|
| | self.state.console_logs.clear()
|
| | return "Network & console logs cleared.", take_screenshot(page)
|
| |
|
| | def handle_start_record(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | self.state.recording = True
|
| | self.state.macro = []
|
| | return "Macro recording started.", take_screenshot(page)
|
| |
|
| | def handle_stop_record(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | self.state.recording = False
|
| | steps = len(self.state.macro)
|
| | return f"Macro recording stopped. {steps} steps recorded.", take_screenshot(page)
|
| |
|
| | def handle_play_macro(self, task: dict, page: Optional[Page]):
|
| | with BROWSER_LOCK:
|
| | macro_steps = list(self.state.macro)
|
| |
|
| | if not macro_steps:
|
| | return "Macro is empty.", take_screenshot(self._get_active_page())
|
| |
|
| | last_result = ""
|
| | current_page = self._get_active_page()
|
| |
|
| | for step in macro_steps:
|
| | step_cmd = dict(step)
|
| | step_cmd["from_macro"] = True
|
| | cmd = step_cmd.get("cmd")
|
| |
|
| | if cmd == "goto":
|
| | current_page = self._get_active_page()
|
| | last_result, _ = self.handle_goto(step_cmd, current_page)
|
| | elif cmd == "click":
|
| | current_page = self._get_active_page()
|
| | last_result, _ = self.handle_click(step_cmd, current_page)
|
| | elif cmd == "type":
|
| | current_page = self._get_active_page()
|
| | last_result, _ = self.handle_type(step_cmd, current_page)
|
| | elif cmd == "new_tab":
|
| | last_result, _ = self.handle_new_tab(step_cmd, current_page)
|
| | current_page = self._get_active_page()
|
| | elif cmd == "close_tab":
|
| | last_result, _ = self.handle_close_tab(step_cmd, current_page)
|
| | current_page = self._get_active_page()
|
| | elif cmd == "switch_tab":
|
| | last_result, _ = self.handle_switch_tab(step_cmd, current_page)
|
| | current_page = self._get_active_page()
|
| |
|
| | final_page = self._get_active_page()
|
| | return f"Macro executed. {len(macro_steps)} steps.\nLast step: {last_result}", take_screenshot(final_page)
|
| |
|
| | def handle_take_screenshot(self, task: dict, page: Optional[Page]):
|
| | if page is None:
|
| | return "No active page.", None
|
| | return "Screenshot captured.", take_screenshot(page)
|
| |
|
| | def handle_find_template(self, task: dict, page: Optional[Page]):
|
| | template = task.get("template")
|
| | if page is None:
|
| | return "No active page.", None
|
| | if template is None:
|
| | return "Upload template first.", take_screenshot(page)
|
| |
|
| | img = take_screenshot(page)
|
| | if img is None:
|
| | return "Screenshot error.", None
|
| |
|
| | img_np = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
|
| | tpl_np = cv2.cvtColor(np.array(template), cv2.COLOR_RGB2GRAY)
|
| |
|
| | res = cv2.matchTemplate(img_np, tpl_np, cv2.TM_CCOEFF_NORMED)
|
| | _, max_val, _, max_loc = cv2.minMaxLoc(res)
|
| |
|
| | if max_val < 0.55:
|
| | return f"No strong match. Score={max_val:.2f}", img
|
| |
|
| | th, tw = tpl_np.shape
|
| |
|
| | draw = ImageDraw.Draw(img)
|
| | draw.rectangle(
|
| | [
|
| | max_loc[0], max_loc[1],
|
| | max_loc[0] + tw, max_loc[1] + th
|
| | ],
|
| | outline="yellow",
|
| | width=3
|
| | )
|
| |
|
| | return f"Match: score={max_val:.2f}", img
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def start_worker_thread(storage_state:StorageState):
|
| | worker = PlaywrightWorker(BROWSER_STATE,storage_state)
|
| | t = threading.Thread(target=worker.run, daemon=True)
|
| | t.start()
|
| | with BROWSER_LOCK:
|
| | BROWSER_STATE.thread = t
|
| | BROWSER_STATE.running = True
|
| | BROWSER_STATE.pages.clear()
|
| | BROWSER_STATE.active_page = None
|
| | BROWSER_STATE.network_logs.clear()
|
| | BROWSER_STATE.console_logs.clear()
|
| | BROWSER_STATE.macro.clear()
|
| | BROWSER_STATE.recording = False
|
| | BROWSER_STATE.tab_counter = 0
|
| |
|
| |
|
| | def stop_worker_thread():
|
| | with BROWSER_LOCK:
|
| | if not BROWSER_STATE.running:
|
| | return
|
| | BROWSER_STATE.running = False
|
| | t = BROWSER_STATE.thread
|
| |
|
| | task_queue.put({"cmd": "__EXIT__"})
|
| | if t is not None:
|
| | t.join(timeout=5) |