1"""
2Trustpilot Scraper Pro
3Enhanced scraper with dual-API (Firecrawl primary, Tavily backup) and AI enrichment.
4Built for Trustpilot review and business data extraction.
5"""
6
7import os
8import json
9import asyncio
10import aiohttp
11import requests
12from typing import Dict, Any, List, Optional
13from bs4 import BeautifulSoup
14from apify import Actor
15
16
17FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "")
18TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "")
19APIFY_TOKEN = os.getenv("APIFY_TOKEN", "")
20
21class TrustpilotScraperPro:
22 """Enhanced scraper with multi-API fallback and enrichment."""
23
24 def __init__(self):
25 self.session = requests.Session()
26 self.session.headers.update({
27 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
28 })
29
30 async def scrape(self, search_term: str, location: str, max_results: int) -> List[Dict]:
31 """
32 Main scraping method with Firecrawl primary, Tavily backup.
33 """
34 Actor.log.info(f"Starting scrape: {search_term} in {location} (max: {max_results})")
35
36 results = []
37
38
39 if FIRECRAWL_API_KEY:
40 try:
41 Actor.log.info("Attempting scrape via Firecrawl API...")
42 results = await self._scrape_firecrawl(search_term, location, max_results)
43 Actor.log.info(f"Firecrawl returned {len(results)} results")
44 except Exception as e:
45 Actor.log.warning(f"Firecrawl failed: {e}. Falling back to Tavily...")
46 results = []
47
48
49 if not results and TAVILY_API_KEY:
50 try:
51 Actor.log.info("Attempting scrape via Tavily API...")
52 results = await self._scrape_tavily(search_term, location, max_results)
53 Actor.log.info(f"Tavily returned {len(results)} results")
54 except Exception as e:
55 Actor.log.error(f"Tavily also failed: {e}")
56
57
58 if not results:
59 Actor.log.warning("Both APIs failed. Trying direct scrape...")
60 results = await self._scrape_direct(search_term, location, max_results)
61
62
63 enriched_results = await self._add_enrichment(results)
64
65
66 final_results = await self._add_sentiment_analysis(enriched_results)
67
68 return final_results[:max_results]
69
70 async def _scrape_firecrawl(self, search_term: str, location: str, max_results: int) -> List[Dict]:
71 """Scrape using Firecrawl API (primary method)."""
72 try:
73 from firecrawl import FirecrawlApp
74 app = FirecrawlApp(api_key=FIRECRAWL_API_KEY)
75
76 query = f"{search_term} in {location}"
77 url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}"
78
79 result = app.scrape(url=url)
80
81
82 places = self._parse_firecrawl_result(result.markdown, max_results)
83 return places
84 except Exception as e:
85 Actor.log.error(f"Firecrawl scrape error: {e}")
86 raise
87
88 def _parse_firecrawl_result(self, markdown: str, max_results: int) -> List[Dict]:
89 """Parse Firecrawl markdown output into structured data."""
90 places = []
91
92 return places[:max_results]
93
94 async def _scrape_tavily(self, search_term: str, location: str, max_results: int) -> List[Dict]:
95 """Scrape using Tavily API (fallback method)."""
96 url = "https://api.tavily.com/search"
97 payload = {
98 "api_key": TAVILY_API_KEY,
99 "query": f"{search_term} {location}",
100 "search_depth": "basic",
101 "max_results": max_results
102 }
103
104 response = requests.post(url, json=payload, timeout=30)
105 response.raise_for_status()
106 data = response.json()
107
108 results = []
109 for item in data.get('results', [])[:max_results]:
110 results.append({
111 'title': item.get('title', ''),
112 'url': item.get('url', ''),
113 'snippet': item.get('snippet', ''),
114 'source': 'tavily'
115 })
116 return results
117
118 async def _scrape_direct(self, search_term: str, location: str, max_results: int) -> List[Dict]:
119 """Direct scrape fallback (minimal, no external API)."""
120 Actor.log.warning("Using direct scrape - limited data")
121 return [{
122 'title': f'{search_term} results for {location}',
123 'location': location,
124 'source': 'direct_fallback'
125 }]
126
127 async def _add_enrichment(self, places: List[Dict]) -> List[Dict]:
128 """Add FREE enrichment via Tavily (contact info, social, etc.)."""
129 for place in places:
130 try:
131 if place.get('url') and TAVILY_API_KEY:
132 company_url = place['url']
133 tavily_payload = {
134 "api_key": TAVILY_API_KEY,
135 "query": f"site:{company_url} contact email phone",
136 "search_depth": "basic",
137 "max_results": 2
138 }
139
140 response = requests.post(
141 "https://api.tavily.com/search",
142 json=tavily_payload,
143 timeout=10
144 )
145
146 if response.status_code == 200:
147 tavily_data = response.json()
148 enrichment = place.get('enrichment', {})
149 enrichment.update({
150 'emails': self._extract_emails(tavily_data),
151 'phones': self._extract_phones(tavily_data),
152 'enrichment_source': 'tavily'
153 })
154 place['enrichment'] = enrichment
155 except Exception as e:
156 Actor.log.warning(f"Enrichment error: {e}")
157 return places
158
159 def _extract_emails(self, tavily_data: Dict) -> List[str]:
160 """Extract emails from Tavily results."""
161 emails = []
162 for result in tavily_data.get('results', []):
163 text = (result.get('snippet', '') + ' ' + result.get('content', '')).lower()
164 import re
165 found = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text)
166 emails.extend(found)
167 return list(set(emails))[:5]
168
169 def _extract_phones(self, tavily_data: Dict) -> List[str]:
170 """Extract phone numbers from Tavily results."""
171 phones = []
172 for result in tavily_data.get('results', []):
173 text = result.get('snippet', '') + ' ' + result.get('content', '')
174 import re
175 found = re.findall(r'(?:\+1[-.\s]?)?(?:\([2-9][0-9]{2}\)[-.\s]?|[2-9][0-9]{2}[-.\s]?)?[0-9]{3}[-.\s]?[0-9]{4}', text)
176 phones.extend(found)
177 return list(set(phones))[:3]
178
179 async def _add_sentiment_analysis(self, places: List[Dict]) -> List[Dict]:
180 """Add FREE sentiment analysis (keyword-based, upgradeable to LLM)."""
181 for place in places:
182 try:
183 text = ' '.join(place.get('reviews', [])[:3])
184 if not text:
185 text = place.get('snippet', '')
186
187 positive = ['good', 'great', 'excellent', 'amazing', 'best', 'love', 'happy']
188 negative = ['bad', 'terrible', 'awful', 'worst', 'hate', 'poor', 'disappointed']
189
190 text_lower = text.lower()
191 pos = sum(1 for w in positive if w in text_lower)
192 neg = sum(1 for w in negative if w in text_lower)
193
194 if pos > neg:
195 sentiment = 'positive'
196 score = min(0.5 + (pos * 0.1), 1.0)
197 elif neg > pos:
198 sentiment = 'negative'
199 score = max(0.5 - (neg * 0.1), 0.0)
200 else:
201 sentiment = 'neutral'
202 score = 0.5
203
204 place['ai_analysis'] = {
205 'review_sentiment': sentiment,
206 'average_sentiment_score': round(score, 2),
207 'analysis_source': 'keyword_basic'
208 }
209 except Exception as e:
210 Actor.log.warning(f"Sentiment error: {e}")
211 return places
212
213
214async def main():
215 """Apify actor entry point."""
216 async with Actor as actor:
217 actor_input = await actor.get_input() or {}
218
219 search_term = actor_input.get('searchTerm', 'restaurant')
220 location = actor_input.get('location', 'New York')
221 max_results = actor_input.get('maxResults', 100)
222
223 actor.log.info(f"{__class__.__name__}: Scraping {search_term} in {location}")
224
225 scraper = TrustpilotScraperPro()
226 results = await scraper.scrape(search_term, location, max_results)
227
228 await actor.push_data(results)
229 actor.log.info(f"Scraped {len(results)} results")