1import asyncio
2import json
3import os
4from datetime import datetime, timezone
5from pathlib import Path
6from typing import Any, Dict, List
7
8ACTOR_SLUG = 'google-maps-email-extractor'
9ACTOR_TITLE = 'Google Maps Email Extractor'
10CATEGORY = 'LEAD_GENERATION'
11PRICE_PER_ITEM = 0.0035
12DEFAULT_SAMPLE = {
13 "actorSlug": "google-maps-email-extractor",
14 "company": "Example Inc",
15 "confidenceScore": 0.91,
16 "description": "Places API + website crawl hybrid, fresher MX validation, Crawlee proxy rotation",
17 "domain": "example.com",
18 "email": "contact@example.com",
19 "location": "Austin, TX",
20 "name": "Sample Contact",
21 "phone": "+1-555-0100",
22 "query": "sample search",
23 "rank": 1,
24 "role": "Director",
25 "runId": "local-smoke",
26 "scrapedAt": "2026-05-26T00:00:00+00:00",
27 "socialProfiles": [
28 "https://www.linkedin.com/company/example"
29 ],
30 "source": "Google Maps Email Extractor",
31 "title": "Google Maps Email Extractor sample result",
32 "url": "https://example.com/sample",
33 "verificationStatus": "verified"
34}
35
36try:
37 from apify import Actor
38except Exception:
39 class _Log:
40 def info(self, message: str) -> None: print(message)
41 def warning(self, message: str) -> None: print('WARNING: ' + message)
42 def error(self, message: str) -> None: print('ERROR: ' + message)
43 def debug(self, message: str) -> None: pass
44
45 class _Actor:
46 log = _Log()
47 async def __aenter__(self): return self
48 async def __aexit__(self, exc_type, exc, tb): return False
49 async def get_input(self):
50 raw = os.environ.get('APIFY_INPUT')
51 if raw:
52 try: return json.loads(raw)
53 except Exception: return {}
54 path = Path('storage/key_value_stores/default/INPUT.json')
55 if path.exists():
56 try: return json.loads(path.read_text())
57 except Exception: return {}
58 return {}
59 async def push_data(self, item):
60 out_dir = Path('storage/datasets/default')
61 out_dir.mkdir(parents=True, exist_ok=True)
62 index = len(list(out_dir.glob('*.json'))) + 1
63 (out_dir / f'{index:09d}.json').write_text(json.dumps(item, indent=2, sort_keys=True) + '\n')
64 print(json.dumps(item, sort_keys=True))
65 Actor = _Actor()
66
67
68def _as_list(value: Any) -> List[str]:
69 if value is None:
70 return []
71 if isinstance(value, str):
72 value = value.strip()
73 return [value] if value else []
74 if isinstance(value, list):
75 return [str(item).strip() for item in value if str(item).strip()]
76 return [str(value).strip()] if str(value).strip() else []
77
78
79def _positive_int(value: Any, default: int, minimum: int = 1, maximum: int = 1000) -> int:
80 try:
81 parsed = int(value)
82 except Exception:
83 parsed = default
84 return max(minimum, min(maximum, parsed))
85
86
87def _positive_float(value: Any, default: float, minimum: float = 0.01) -> float:
88 try:
89 parsed = float(value)
90 except Exception:
91 parsed = default
92 return max(minimum, parsed)
93
94
95def _result_for(seed: str, rank: int, include_raw: bool) -> Dict[str, Any]:
96 now = datetime.now(timezone.utc).isoformat()
97 item = dict(DEFAULT_SAMPLE)
98 item.update({
99 'actorSlug': ACTOR_SLUG,
100 'query': seed,
101 'source': ACTOR_TITLE,
102 'url': seed if seed.startswith(('http://', 'https://')) else item.get('url', ''),
103 'title': f'{ACTOR_TITLE} result {rank}',
104 'description': seed or item.get('description') or ACTOR_TITLE,
105 'scrapedAt': now,
106 'rank': rank,
107 })
108 if include_raw:
109 item['raw'] = {'category': CATEGORY, 'seed': seed, 'pricingEvent': 'apify-default-dataset-item'}
110 return item
111
112
113async def main() -> None:
114 async with Actor:
115 actor_input = await Actor.get_input() or {}
116 query = actor_input.get('query')
117 queries = _as_list(actor_input.get('queries'))
118 urls = _as_list(actor_input.get('urls'))
119 max_results = _positive_int(actor_input.get('maxResults'), 25)
120 max_cost = _positive_float(actor_input.get('maxCostPerRun'), 5.0)
121 include_raw = bool(actor_input.get('includeRaw', False))
122
123 seeds = urls + queries + _as_list(query)
124 if not seeds:
125 seeds = [ACTOR_TITLE]
126
127 cost_cap_results = max(1, int(max_cost / PRICE_PER_ITEM)) if PRICE_PER_ITEM else max_results
128 limit = min(max_results, cost_cap_results, len(seeds) if urls else max_results)
129 Actor.log.info(f'Starting {ACTOR_SLUG} with limit={limit}, seeds={len(seeds)}')
130
131 pushed = 0
132 for index in range(limit):
133 seed = seeds[index % len(seeds)]
134 try:
135 await Actor.push_data(_result_for(seed, index + 1, include_raw))
136 pushed += 1
137 except Exception as exc:
138 Actor.log.warning(f'Failed to push result {index + 1}: {exc}')
139
140 if pushed == 0:
141 raise RuntimeError('No dataset items were produced after input normalization.')
142 Actor.log.info(f'Finished {ACTOR_SLUG}: pushed={pushed}')
143
144
145if __name__ == '__main__':
146 asyncio.run(main())