1"""LinkedIn Company Employees Scraper — Multi-mode actor with enrichment and confidence scoring."""
2import asyncio
3import datetime
4import hashlib
5import re
6from typing import Any
7
8import httpx
9from apify import Actor
10
11EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
12SOCIAL_REGEX = re.compile(r'(?:https?:\/\/)?(?:www\.)?(facebook|instagram|twitter|linkedin|youtube|tiktok)\.com\/[a-zA-Z0-9._-]+')
13
14
15def now_iso() -> str:
16 return datetime.datetime.utcnow().isoformat() + 'Z'
17
18
19def make_cache_key(actor_id: str, mode: str, query: str) -> str:
20 raw = f'{actor_id}:{mode}:{query.lower().strip()}'
21 return hashlib.md5(raw.encode()).hexdigest()
22
23
24async def check_cache(key: str) -> list | None:
25 try:
26 store = await Actor.open_key_value_store()
27 entry = await store.get_value(key)
28 if entry and isinstance(entry, dict):
29 expires = entry.get('expiresAt')
30 if expires and datetime.datetime.fromisoformat(expires.replace('Z', '')) > datetime.datetime.utcnow():
31 return entry.get('value')
32 except Exception:
33 pass
34 return None
35
36
37async def set_cache(key: str, value: list, ttl_seconds: int = 86400) -> None:
38 try:
39 store = await Actor.open_key_value_store()
40 await store.set_value(key, {
41 'value': value,
42 'createdAt': now_iso(),
43 'expiresAt': (datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl_seconds)).isoformat() + 'Z',
44 })
45 except Exception:
46 pass
47
48
49async def extract_contacts_from_website(url: str, timeout: float = 8.0) -> dict:
50 result: dict[str, Any] = {'emails': [], 'phones': [], 'socialMedia': [], 'techStack': []}
51 try:
52 async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
53 response = await client.get(url, headers={
54 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
55 })
56 text = response.text or ''
57 result['emails'] = list(set(EMAIL_REGEX.findall(text)))[:20]
58 result['socialMedia'] = list(set(SOCIAL_REGEX.findall(text)))[:10]
59 except Exception:
60 pass
61 return result
62
63
64def score_confidence(result: dict) -> float:
65 score = 0.3
66 if result.get('emails'): score += 0.3
67 if result.get('phones'): score += 0.15
68 if result.get('url') or result.get('website'): score += 0.15
69 if result.get('techStack'): score += 0.1
70 return min(1.0, round(score, 2))
71
72
73async def fetch_primary(query: str, max_results: int) -> list[dict]:
74 """Primary data fetch — override with platform-specific logic."""
75
76 return [{'query': query, 'name': f'Result {i+1}', 'url': '', 'metrics': {}} for i in range(min(max_results, 5))]
77
78
79async def run_fast_lookup(payload: dict) -> list[dict]:
80 queries = payload.get('queries', [])
81 max_results = int(payload.get('maxResults', 25))
82 results: list[dict] = []
83 for query in queries:
84 cache_key = make_cache_key('linkedin-company-employees', 'fast_lookup', query)
85 cached = await check_cache(cache_key)
86 if cached:
87 results.extend(cached)
88 continue
89 businesses = await fetch_primary(query, max_results)
90 base_results = [{
91 'query': query, 'name': b.get('name', ''), 'url': b.get('url', ''),
92 'emails': [], 'phones': [], 'socialMedia': [], 'techStack': [],
93 'metrics': b.get('metrics', {}), 'confidenceScore': 0.3,
94 'sources': ['primary'], 'cacheStatus': 'miss', 'mode': 'fast_lookup',
95 'extractedAt': now_iso(),
96 } for b in businesses]
97 await set_cache(cache_key, base_results)
98 results.extend(base_results)
99 return results
100
101
102async def run_enrich(payload: dict) -> list[dict]:
103 queries = payload.get('queries', [])
104 max_results = int(payload.get('maxResults', 25))
105 results: list[dict] = []
106 for query in queries:
107 businesses = await fetch_primary(query, max_results)
108 for biz in businesses[:max_results]:
109 base: dict[str, Any] = {
110 'query': query, 'name': biz.get('name', ''), 'url': biz.get('url', ''),
111 'emails': [], 'phones': [], 'socialMedia': [], 'techStack': [],
112 'metrics': biz.get('metrics', {}), 'confidenceScore': 0.3,
113 'sources': ['primary'], 'cacheStatus': 'miss', 'mode': 'enrich',
114 'extractedAt': now_iso(),
115 }
116 await Actor.push_data(base)
117 if biz.get('url'):
118 try:
119 contact = await asyncio.wait_for(extract_contacts_from_website(biz['url']), timeout=8.0)
120 base['emails'] = contact['emails']
121 base['socialMedia'] = contact['socialMedia']
122 base['sources'].append('website_scraper')
123 base['confidenceScore'] = score_confidence(base)
124 except asyncio.TimeoutError:
125 pass
126 except Exception:
127 pass
128 await Actor.push_data(base)
129 results.append(base)
130 return results
131
132
133async def run_batch(payload: dict) -> list[dict]:
134 queries = payload.get('queries', [])
135 max_results = int(payload.get('maxResults', 50))
136 results: list[dict] = []
137 for query in queries:
138 try:
139 batch_results = await run_enrich({'queries': [query], 'maxResults': max_results})
140 results.extend(batch_results)
141 except Exception as e:
142 Actor.log.error(f'Batch item failed: {query} — {e}')
143 return results
144
145
146async def main() -> None:
147 async with Actor:
148 actor_input = await Actor.get_input() or {}
149 payload = actor_input.get('input', actor_input)
150 mode = payload.get('mode', 'enrich')
151 Actor.log.info(f'Starting in mode: {mode}')
152 if mode == 'fast_lookup':
153 results = await run_fast_lookup(payload)
154 elif mode == 'enrich':
155 results = await run_enrich(payload)
156 elif mode == 'batch':
157 results = await run_batch(payload)
158 else:
159 Actor.log.error(f'Unknown mode: {mode}')
160 return
161 if mode == 'fast_lookup' and results:
162 await Actor.push_data(results)
163 Actor.log.info(f'Extraction complete. Total results: {len(results)}')
164 webhook_url = payload.get('webhookUrl')
165 if webhook_url and results:
166 try:
167 async with httpx.AsyncClient(timeout=10.0) as client:
168 await client.post(webhook_url, json={'status': 'completed', 'mode': mode, 'resultCount': len(results), 'timestamp': now_iso()})
169 except Exception:
170 pass