1"""
2Enhanced MCP Server for Video-to-Playwright Automation
3Improved video analysis and script generation
4"""
5
6import asyncio
7import json
8import os
9from pathlib import Path
10from typing import Optional, Any, List
11import google.generativeai as genai
12from mcp.server import Server
13from mcp.types import Tool, TextContent
14import mcp.server.stdio
15from apify import Actor
16
17
18GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
19if GEMINI_API_KEY:
20 genai.configure(api_key=GEMINI_API_KEY)
21
22class VideoPlaywrightMCP:
23 def __init__(self):
24 self.server = Server(os.getenv('MCP_SERVER_NAME', 'video-playwright-automation'))
25 self.model = genai.GenerativeModel(os.getenv('GEMINI_MODEL', 'gemini-2.5-pro'))
26 self.conversation_history = []
27 self.generated_script = None
28 self.video_upload_dir = Path(os.getenv('VIDEO_UPLOAD_DIR', 'c:/Users/dilip/OneDrive/Desktop/AI/apify/playwright-mcp/videos'))
29 self.video_upload_dir.mkdir(parents=True, exist_ok=True)
30 self.max_video_size_mb = int(os.getenv('MAX_VIDEO_SIZE_MB', '100'))
31
32 self.setup_tools()
33
34 def setup_tools(self):
35 """Register MCP tools"""
36
37 @self.server.list_tools()
38 async def list_tools() -> list[Tool]:
39 return [
40 Tool(
41 name="analyze_video",
42 description="Analyze a video and generate a Playwright automation script using Gemini AI",
43 inputSchema={
44 "type": "object",
45 "properties": {
46 "video_path": {
47 "type": "string",
48 "description": "Path to the video file (supports mp4, avi, mov, webm)"
49 },
50 "task_description": {
51 "type": "string",
52 "description": "Optional description of the task shown in the video"
53 },
54 "include_screenshots": {
55 "type": "boolean",
56 "description": "Include screenshot capture in generated script",
57 "default": False
58 },
59 "slow_mo": {
60 "type": "integer",
61 "description": "Slow motion delay in milliseconds for debugging",
62 "default": 0
63 }
64 },
65 "required": ["video_path"]
66 }
67 ),
68 Tool(
69 name="modify_script",
70 description="Modify the generated Playwright script based on user feedback",
71 inputSchema={
72 "type": "object",
73 "properties": {
74 "modification_request": {
75 "type": "string",
76 "description": "Natural language description of changes to make"
77 }
78 },
79 "required": ["modification_request"]
80 }
81 ),
82 Tool(
83 name="execute_script",
84 description="Execute the generated Playwright script",
85 inputSchema={
86 "type": "object",
87 "properties": {
88 "headless": {
89 "type": "boolean",
90 "description": "Run browser in headless mode",
91 "default": True
92 },
93 "save_output": {
94 "type": "boolean",
95 "description": "Save execution results to Apify dataset",
96 "default": False
97 }
98 }
99 }
100 ),
101 Tool(
102 name="get_script",
103 description="Retrieve the current generated Playwright script",
104 inputSchema={
105 "type": "object",
106 "properties": {
107 "format": {
108 "type": "string",
109 "enum": ["python", "json"],
110 "description": "Output format",
111 "default": "python"
112 }
113 }
114 }
115 ),
116 Tool(
117 name="save_script",
118 description="Save the generated script to Apify key-value store",
119 inputSchema={
120 "type": "object",
121 "properties": {
122 "filename": {
123 "type": "string",
124 "description": "Filename to save the script as",
125 "default": "playwright_script.py"
126 }
127 }
128 }
129 )
130 ]
131
132 @self.server.call_tool()
133 async def call_tool(name: str, arguments: Any) -> list[TextContent]:
134 try:
135 if name == "analyze_video":
136 return await self.analyze_video(
137 arguments.get("video_path"),
138 arguments.get("task_description"),
139 arguments.get("include_screenshots", False),
140 arguments.get("slow_mo", 0)
141 )
142 elif name == "modify_script":
143 return await self.modify_script(arguments.get("modification_request"))
144 elif name == "execute_script":
145 return await self.execute_script(
146 arguments.get("headless", True),
147 arguments.get("save_output", False)
148 )
149 elif name == "get_script":
150 return await self.get_script(arguments.get("format", "python"))
151 elif name == "save_script":
152 return await self.save_script(arguments.get("filename", "playwright_script.py"))
153 else:
154 raise ValueError(f"Unknown tool: {name}")
155 except Exception as e:
156 Actor.log.error(f"Error in {name}: {str(e)}")
157 return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
158
159 async def analyze_video(
160 self,
161 video_path: str,
162 task_description: Optional[str] = None,
163 include_screenshots: bool = False,
164 slow_mo: int = 0
165 ) -> list[TextContent]:
166 """Analyze video and generate Playwright script with enhanced accuracy"""
167 try:
168 input_path = Path(video_path)
169 if not input_path.is_absolute():
170 input_path = self.video_upload_dir / input_path
171
172 if not input_path.exists():
173 raise FileNotFoundError(f"Video not found at: {input_path}")
174
175 size_mb = input_path.stat().st_size / (1024 * 1024)
176 if size_mb > self.max_video_size_mb:
177 raise ValueError(f"Video size {size_mb:.1f}MB exceeds limit {self.max_video_size_mb}MB")
178
179 Actor.log.info(f"Analyzing video: {input_path}")
180
181
182 video_file = genai.upload_file(path=str(input_path))
183 Actor.log.info("Video uploaded, waiting for processing...")
184
185
186 while video_file.state.name == "PROCESSING":
187 await asyncio.sleep(2)
188 video_file = genai.get_file(video_file.name)
189
190 if video_file.state.name == "FAILED":
191 raise ValueError("Video processing failed")
192
193 Actor.log.info("Video processed successfully")
194
195
196 prompt = f"""
197Analyze this video FRAME BY FRAME and identify EVERY single user interaction in chronological order.
198
199{"Task Context: " + task_description if task_description else ""}
200
201CRITICAL ANALYSIS STEPS:
2021. **Watch the entire video carefully** - Note every mouse movement, click, keyboard input, scroll, and navigation
2032. **Identify the starting URL** - What webpage does the video begin on?
2043. **Track each interaction** - For EACH action, note:
205 - What element is being interacted with? (button, input field, link, dropdown, etc.)
206 - What is the visible text or label of that element?
207 - What type of action? (click, type, press Enter, select, scroll, etc.)
208 - What happens after the action? (page loads, dropdown opens, search results appear, etc.)
2094. **Note timing** - Identify when to wait for elements, page loads, or animations
2105. **Identify text inputs** - What exact text is typed into each field?
211
212IMPORTANT RULES:
213- If the user searches for something, USE THE SEARCH FUNCTIONALITY instead of expecting content on homepage
214- If a specific video/content is clicked, search for it first to ensure it's available
215- Use simple, reliable selectors that work across sessions
216- Do NOT assume personalized content (like YouTube homepage videos) will be the same
217- Add proper error handling for dynamic content
218
219Generate a script that will ACTUALLY WORK in any session, not just replay the exact video scenario.
220
221PLAYWRIGHT SCRIPT REQUIREMENTS:
222
223```python
224from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
225import asyncio
226
227async def main():
228 async with async_playwright() as p:
229 # Launch browser
230 browser = await p.chromium.launch(
231 headless=False,
232 slow_mo={slow_mo}
233 )
234
235 context = await browser.new_context(
236 viewport={{'width': 1920, 'height': 1080}},
237 user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
238 )
239
240 page = await context.new_page()
241
242 try:
243 # STEP 1: Navigate to starting URL
244 print("Step 1: Navigating to [URL]...")
245 await page.goto('[URL]', wait_until='domcontentloaded', timeout=30000)
246 await asyncio.sleep(2) # Wait for page to settle
247
248 # STEP 2: [First interaction]
249 print("Step 2: [Description]...")
250 # Use page.locator() with CSS selector or text
251 element = page.locator('[SELECTOR]')
252 await element.wait_for(state='visible', timeout=15000)
253 await element.click()
254 await asyncio.sleep(1)
255
256 # IMPORTANT: If video shows clicking on specific content (like a YouTube video):
257 # - First search for it using the search box
258 # - Then click on the search result
259 # Example for YouTube (click search icon first to activate search):
260 # await page.click('ytd-masthead #search-icon-legacy')
261 # await asyncio.sleep(1)
262 # await page.fill('input[name="search_query"]', 'video title')
263 # await page.keyboard.press('Enter')
264 # await asyncio.sleep(3)
265 # video = page.locator('a#video-title').nth(0)
266 # await video.wait_for(state='visible', timeout=15000)
267 # await video.click()
268
269 # Add more steps as needed for each action in the video
270 # CORRECT PATTERNS:
271 # element = page.locator('selector').nth(0)
272 # await element.wait_for(state='visible', timeout=15000)
273 # await element.click()
274 # OR
275 # await page.fill('input#id', 'text')
276 # await page.click('button#id')
277
278 {"# Take screenshots" if include_screenshots else ""}
279 {'''await page.screenshot(path='screenshot.png')
280 print("Screenshot saved")''' if include_screenshots else ""}
281
282 print("[SUCCESS] Automation completed!")
283
284 except PlaywrightTimeoutError as e:
285 print(f"[TIMEOUT] Element not found: {{e}}")
286 await page.screenshot(path='error.png')
287
288 except Exception as e:
289 print(f"[ERROR] {{type(e).__name__}}: {{e}}")
290 await page.screenshot(path='error.png')
291
292 finally:
293 await asyncio.sleep(3) # Keep browser open briefly
294 await context.close()
295 await browser.close()
296 print("[CLEANUP] Browser closed")
297
298if __name__ == "__main__":
299 asyncio.run(main())
300```
301
302CRITICAL SELECTOR STRATEGY (MUST FOLLOW EXACTLY):
3031. Use `page.locator('css-selector')` and store in a variable
3042. Use `page.locator('text=Exact Text')` for buttons/links with visible text
3053. Use `page.fill('#input-id', 'text')` for input fields
3064. NEVER use `.first()` - instead use `.nth(0)` or make selector more specific
3075. Pattern: `element = page.locator('selector'); await element.wait_for(state='visible'); await element.click()`
3086. Add `await asyncio.sleep(1-2)` after major actions to let page settle
309
310CORRECT Examples:
311```python
312# Click first matching element
313search_btn = page.locator('button.search').nth(0)
314await search_btn.wait_for(state='visible', timeout=15000)
315await search_btn.click()
316
317# Or use more specific selector
318video_link = page.locator('a#video-title').filter(has_text='Minecraft')
319await video_link.wait_for(state='visible', timeout=15000)
320await video_link.click()
321
322# Fill input
323await page.fill('input#search', 'search term')
324await asyncio.sleep(1)
325```
326
327For YouTube:
328- Search box: Try clicking search icon first, then fill `input[name="search_query"]` or `input#search`
329- Pattern: await page.click('button#search-icon-legacy'); await asyncio.sleep(1); await page.fill('input[name="search_query"]', 'text')
330- Search button: `button#search-icon-legacy`
331- Video links: `a#video-title`
332- Handle consent: Check for button with aria-label containing "Accept" or "Reject"
333
334For searches:
335- If user clicks on specific content, search for it first rather than expecting it on homepage
336- Example: await page.fill('input#search', 'search term'); await page.keyboard.press('Enter'); await asyncio.sleep(2)
337
338WRONG - DO NOT USE:
339- `.first()` followed by parentheses
340- `.get_by_role()` without proper chaining
341
342Generate the COMPLETE, EXECUTABLE script with ALL steps from the video.
343Include detailed comments for each step explaining what you observed in the video.
344The script must work end-to-end without modifications.
345"""
346
347
348 Actor.log.info("Generating enhanced Playwright script...")
349 response = self.model.generate_content([video_file, prompt])
350
351 script_content = response.text
352
353
354 if "```python" in script_content:
355 script_content = script_content.split("```python")[1].split("```")[0].strip()
356 elif "```" in script_content:
357 script_content = script_content.split("```")[1].split("```")[0].strip()
358
359 self.generated_script = script_content
360 self.conversation_history.append({
361 "role": "user",
362 "content": f"Video: {video_path}, Task: {task_description or 'Not specified'}"
363 })
364 self.conversation_history.append({
365 "role": "assistant",
366 "content": script_content
367 })
368
369
370 try:
371 await Actor.set_value('generated_script.py', script_content)
372 Actor.log.info("Script saved to key-value store as 'generated_script.py'")
373 except Exception as save_error:
374 Actor.log.warning(f"Could not save script: {save_error}")
375
376 Actor.log.info("Enhanced script generated successfully")
377
378 return [
379 TextContent(
380 type="text",
381 text=f"✅ Video analyzed with enhanced detection!\n\n**Generated Playwright Script:**\n\n```python\n{script_content}\n```\n\n**Next Steps:**\n- Review the script to ensure all steps match your video\n- Use `modify_script` if any steps are missing or incorrect\n- Use `execute_script` to test the automation\n- Use `save_script` to store in Apify KV store"
382 )
383 ]
384
385 except Exception as e:
386 Actor.log.error(f"Error analyzing video: {str(e)}")
387 return [TextContent(type="text", text=f"❌ Error analyzing video: {str(e)}")]
388
389 async def modify_script(self, modification_request: str) -> list[TextContent]:
390 """Modify the generated script based on user feedback"""
391 if not self.generated_script:
392 return [TextContent(
393 type="text",
394 text="❌ No script has been generated yet. Use `analyze_video` first."
395 )]
396
397 try:
398 Actor.log.info(f"Modifying script: {modification_request}")
399
400 prompt = f"""
401Here is the current Playwright script:
402```python
403{self.generated_script}
404```
405
406User modification request: {modification_request}
407
408Please modify the script according to the request. Ensure:
4091. All actions are properly sequenced
4102. Appropriate waits are added (wait_for_selector, wait_for_load_state, wait_for_timeout)
4113. Selectors are robust and specific
4124. Error handling is comprehensive
4135. The script remains complete and executable
4146. Comments explain what each step does
415
416If the user mentions missing steps or actions that didn't work:
417- Add explicit waits before interactions
418- Try alternative selectors
419- Add visibility/enabled checks
420- Consider if consent dialogs or popups need to be handled first
421
422Return ONLY the complete updated Python code with detailed comments.
423"""
424
425 response = self.model.generate_content(prompt)
426 modified_script = response.text
427
428
429 if "```python" in modified_script:
430 modified_script = modified_script.split("```python")[1].split("```")[0].strip()
431 elif "```" in modified_script:
432 modified_script = modified_script.split("```")[1].split("```")[0].strip()
433
434 self.generated_script = modified_script
435 self.conversation_history.append({
436 "role": "user",
437 "content": f"Modify: {modification_request}"
438 })
439 self.conversation_history.append({
440 "role": "assistant",
441 "content": modified_script
442 })
443
444 Actor.log.info("Script modified successfully")
445
446 return [
447 TextContent(
448 type="text",
449 text=f"✅ Script modified successfully!\n\n```python\n{modified_script}\n```"
450 )
451 ]
452
453 except Exception as e:
454 Actor.log.error(f"Error modifying script: {str(e)}")
455 return [TextContent(type="text", text=f"❌ Error modifying script: {str(e)}")]
456
457 async def execute_script(self, headless: bool = True, save_output: bool = False) -> list[TextContent]:
458 """Execute the generated Playwright script"""
459 if not self.generated_script:
460 return [TextContent(
461 type="text",
462 text="❌ No script has been generated yet. Use `analyze_video` first."
463 )]
464
465 try:
466 Actor.log.info("Executing Playwright script...")
467
468
469 script_path = self.video_upload_dir / "temp_playwright_script.py"
470 script_path.write_text(self.generated_script, encoding="utf-8")
471
472
473
474 env = os.environ.copy()
475 env["PYTHONUTF8"] = "1"
476 env["PYTHONIOENCODING"] = "utf-8"
477 process = await asyncio.create_subprocess_exec(
478 "python", str(script_path),
479 stdout=asyncio.subprocess.PIPE,
480 stderr=asyncio.subprocess.PIPE,
481 env=env
482 )
483
484 stdout, stderr = await process.communicate()
485
486
487 stdout_text = stdout.decode('utf-8', errors='replace')
488 stderr_text = stderr.decode('utf-8', errors='replace')
489
490
491 Actor.log.info(f"Script output:\n{stdout_text}")
492 if stderr_text:
493 Actor.log.warning(f"Script errors:\n{stderr_text}")
494
495 result = {
496 "success": process.returncode == 0,
497 "stdout": stdout_text,
498 "stderr": stderr_text,
499 "return_code": process.returncode
500 }
501
502 if save_output:
503 await Actor.push_data(result)
504 Actor.log.info("Execution results saved to dataset")
505
506 if result["success"]:
507 Actor.log.info("Script executed successfully")
508 return [TextContent(
509 type="text",
510 text=f"✅ Script executed successfully!\n\n**Output:**\n```\n{result['stdout']}\n```"
511 )]
512 else:
513 Actor.log.error(f"Script execution failed: {result['stderr']}")
514 return [TextContent(
515 type="text",
516 text=f"❌ Script execution failed:\n```\n{result['stderr']}\n```\n\n**Tip:** Use `modify_script` to fix the issues. Common problems:\n- Incorrect selectors\n- Missing waits\n- Elements not visible/enabled\n- Page not loaded"
517 )]
518
519 except Exception as e:
520 Actor.log.error(f"Error executing script: {str(e)}")
521 return [TextContent(type="text", text=f"❌ Error executing script: {str(e)}")]
522
523 async def get_script(self, format: str = "python") -> list[TextContent]:
524 """Retrieve the current script"""
525 if not self.generated_script:
526 return [TextContent(type="text", text="❌ No script has been generated yet.")]
527
528 if format == "json":
529 script_data = {
530 "script": self.generated_script,
531 "conversation_history": self.conversation_history,
532 "format": "python"
533 }
534 return [TextContent(
535 type="text",
536 text=f"```json\n{json.dumps(script_data, indent=2)}\n```"
537 )]
538 else:
539 return [TextContent(
540 type="text",
541 text=f"**Current Playwright Script:**\n\n```python\n{self.generated_script}\n```"
542 )]
543
544 async def save_script(self, filename: str) -> list[TextContent]:
545 """Save script to Apify key-value store"""
546 if not self.generated_script:
547 return [TextContent(type="text", text="❌ No script has been generated yet.")]
548
549 try:
550 await Actor.set_value(filename, self.generated_script)
551 Actor.log.info(f"Script saved to key-value store as {filename}")
552 return [TextContent(
553 type="text",
554 text=f"✅ Script saved to Apify key-value store as `{filename}`"
555 )]
556 except Exception as e:
557 Actor.log.error(f"Error saving script: {str(e)}")
558 return [TextContent(type="text", text=f"❌ Error saving script: {str(e)}")]
559
560 async def run(self):
561 """Run the MCP server"""
562 async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
563 await self.server.run(
564 read_stream,
565 write_stream,
566 self.server.create_initialization_options()
567 )
568
569async def main():
570 """Main entry point for Apify Actor"""
571 async with Actor:
572 Actor.log.info("Starting Enhanced Video-to-Playwright MCP Server...")
573
574 if not GEMINI_API_KEY:
575 Actor.log.error("GEMINI_API_KEY not found in environment variables!")
576 raise ValueError("GEMINI_API_KEY is required")
577
578 mcp = VideoPlaywrightMCP()
579
580
581 if os.getenv('AUTO_ANALYZE_VIDEO', 'false').lower() == 'true':
582 video_file = os.getenv('VIDEO_FILE', 'test_1.mp4')
583 include_screenshots = os.getenv('INCLUDE_SCREENSHOTS', 'false').lower() == 'true'
584 execute_after = os.getenv('EXECUTE_AFTER', 'true').lower() == 'true'
585 slow_mo = int(os.getenv('SLOW_MO', '0'))
586
587 Actor.log.info(f"Auto-analyze enabled. Video: {video_file}")
588 try:
589 await mcp.analyze_video(
590 video_file,
591 include_screenshots=include_screenshots,
592 slow_mo=slow_mo
593 )
594 if execute_after:
595 await mcp.execute_script(
596 headless=os.getenv('PLAYWRIGHT_HEADLESS', 'true').lower() == 'true'
597 )
598 except Exception as e:
599 Actor.log.error(f"Auto-analyze failed: {e}")
600 else:
601 await mcp.run()
602
603if __name__ == "__main__":
604 asyncio.run(main())