1"""Module defines the main entry point for the Apify Actor.
2
3Feel free to modify this file to suit your specific needs.
4
5To build Apify Actors, utilize the Apify SDK toolkit, read more at the official documentation:
6https://docs.apify.com/sdk/python
7"""
8
9from __future__ import annotations
10
11import json
12import os
13import time
14import urllib.request
15import urllib.parse
16import asyncio
17from typing import Optional
18
19
20from apify import Actor
21
22
23def _split_base_url(raw: str) -> tuple[str, str]:
24 """Split APIFY_ESP_CATCHALL_BASEURL into (origin, path_prefix).
25 origin = scheme://host[:port]
26 path_prefix = normalized leading path, e.g. '' or '/api' or '/custom/yc'
27 """
28 parsed = urllib.parse.urlparse(raw)
29 if not parsed.scheme or not parsed.netloc:
30 raise RuntimeError(f"Invalid APIFY_ESP_CATCHALL_BASEURL: {raw}")
31 origin = f"{parsed.scheme}://{parsed.netloc}"
32
33 prefix = parsed.path or ''
34 if prefix and not prefix.startswith('/'):
35 prefix = '/' + prefix
36 if prefix.endswith('/') and len(prefix) > 1:
37 prefix = prefix[:-1]
38 return origin, prefix
39
40
41
42def http_request(base_url: str, method: str, path: str, headers: dict, data: Optional[dict] = None):
43 url = urllib.parse.urljoin(base_url.rstrip('/') + '/', path.lstrip('/'))
44 body_bytes = None
45 if data is not None:
46 body_bytes = json.dumps(data).encode('utf-8')
47 headers.setdefault('Content-Type', 'application/json')
48 req = urllib.request.Request(url=url, data=body_bytes, headers=headers, method=method)
49 with urllib.request.urlopen(req, timeout=60) as resp:
50 resp_body = resp.read().decode('utf-8')
51 status = resp.getcode()
52 return status, json.loads(resp_body) if resp_body else None
53
54
55
56async def main() -> None:
57 async with Actor:
58
59 input_ = await Actor.get_input() or {}
60 emails = input_.get('emails', [])
61
62 if not emails:
63 raise RuntimeError('Input must contain "emails" array with email addresses to verify')
64
65
66 base_url = os.getenv('APIFY_ESP_CATCHALL_BASEURL')
67 api_token = os.getenv('APIFY_API_TOKEN')
68
69 if not base_url:
70 raise RuntimeError('Missing APIFY_ESP_CATCHALL_BASEURL environment variable')
71
72 if not api_token:
73 raise RuntimeError('Missing APIFY_API_TOKEN environment variable')
74
75
76 try:
77 base_origin, path_prefix = _split_base_url(base_url)
78 except Exception as e:
79 raise RuntimeError(str(e))
80
81
82 is_paid_user = os.getenv('ACTOR_MAX_PAID_DATASET_ITEMS') is not None
83
84 if not is_paid_user:
85 Actor.log.info('Free account detected - limiting to 1 email per job. Upgrade to paid for unlimited emails.')
86 if len(emails) > 1:
87 disregarded_count = len(emails) - 1
88 emails = emails[:1]
89 Actor.log.info(f'Disregarding {disregarded_count} additional emails due to free account limit.')
90 else:
91 Actor.log.info('Paid account detected - no email limits applied.')
92
93 Actor.log.info('Starting email verification job...')
94
95
96 job_data = {
97 "emails": emails
98 }
99
100
101 raw_path_job = '/attempt/job'
102 effective_path_job = (path_prefix + raw_path_job) or raw_path_job
103 auth_header = f"Bearer {api_token}"
104
105 status, payload = http_request(
106 base_origin, 'POST', effective_path_job,
107 headers={'Authorization': auth_header},
108 data=job_data,
109 )
110
111
112
113 if (status // 100) != 2 or not payload or 'job_id' not in payload:
114 raise RuntimeError(f'Failed to trigger email verification job: status={status}, payload={payload}')
115
116 job_id = payload['job_id']
117 Actor.log.info(f'Email verification job queued: {job_id}')
118
119
120 dropped_emails = payload.get('dropped_emails', [])
121 if dropped_emails:
122 dropped_count = len([d for d in dropped_emails if not d.get('email', '').startswith('...and ')])
123 Actor.log.info(f'Dropped {dropped_count} unsupported emails: {[d["email"] for d in dropped_emails if not d["email"].startswith("...and ")]}')
124
125
126 deadline = time.time() + 90 * 60
127 interval = 30
128 last_status = None
129
130 while time.time() < deadline:
131
132 raw_path_status = f'/attempt/job/{job_id}'
133 effective_path_status = (path_prefix + raw_path_status) or raw_path_status
134
135 try:
136 status_code, job_payload = http_request(
137 base_origin, 'GET', effective_path_status,
138 headers={'Authorization': auth_header},
139 )
140 except Exception as e:
141 Actor.log.warning(f'Error polling job status: {e}')
142 await asyncio.sleep(interval)
143 continue
144
145 if status_code != 200 or not job_payload:
146 Actor.log.warning(f'Unexpected job status response: {status_code} {job_payload}')
147 await asyncio.sleep(interval)
148 continue
149
150
151 results = job_payload.get('results', [])
152 if isinstance(results, list) and len(results) > 0:
153 Actor.log.info(f'Found {len(results)} results. Job completed successfully.')
154 await Actor.set_value('RESULT', job_payload)
155
156
157 successful_results = [
158 result for result in results
159 if result.get("success", False)
160 ]
161
162
163 if successful_results:
164 await Actor.push_data(successful_results)
165 Actor.log.info(f'Successfully pushed {len(successful_results)} verified emails to dataset.')
166 else:
167 Actor.log.info('No successful email verifications found.')
168
169 return
170
171 last_status = job_payload.get('status')
172 Actor.log.info(f'Job status: {last_status} - waiting for results...')
173
174 await asyncio.sleep(interval)
175
176
177 Actor.log.error('Email verification job did not complete within timeout. Returning last known status.')
178 await Actor.set_value('RESULT', {
179 'job_id': job_id,
180 'status': last_status or 'unknown',
181 'message': 'timeout',
182 })