1import json
2import os
3import time
4import urllib.request
5import urllib.parse
6from typing import Optional
7
8from apify import Actor
9
10
11import hmac
12import hashlib
13import asyncio
14
15def compute_signature(secret: str, ts: int, method: str, path: str) -> str:
16 message = f"{ts}:{method}:{path}".encode()
17 return hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()
18
19
20def build_auth_header(key_id: str, secret: str, method: str, path: str, ts: Optional[int] = None) -> str:
21 if ts is None:
22 ts = int(time.time())
23 sig = compute_signature(secret, ts, method, path)
24 return f"HMAC keyId={key_id},ts={ts},sig={sig}"
25
26
27def _split_base_url(raw: str) -> tuple[str, str]:
28 """Split APIFY_YC_BASE_URL into (origin, path_prefix).
29 origin = scheme://host[:port]
30 path_prefix = normalized leading path, e.g. '' or '/api' or '/custom/yc'
31 """
32 parsed = urllib.parse.urlparse(raw)
33 if not parsed.scheme or not parsed.netloc:
34 raise RuntimeError(f"Invalid APIFY_YC_BASE_URL: {raw}")
35 origin = f"{parsed.scheme}://{parsed.netloc}"
36
37 prefix = parsed.path or ''
38 if prefix and not prefix.startswith('/'):
39 prefix = '/' + prefix
40 if prefix.endswith('/') and len(prefix) > 1:
41 prefix = prefix[:-1]
42 return origin, prefix
43
44
45def http_request(base_url: str, method: str, path: str, headers: dict, data: Optional[dict] = None):
46 url = urllib.parse.urljoin(base_url.rstrip('/') + '/', path.lstrip('/'))
47 body_bytes = None
48 if data is not None:
49 body_bytes = json.dumps(data).encode('utf-8')
50 headers.setdefault('Content-Type', 'application/json')
51 req = urllib.request.Request(url=url, data=body_bytes, headers=headers, method=method)
52 with urllib.request.urlopen(req, timeout=60) as resp:
53 resp_body = resp.read().decode('utf-8')
54 status = resp.getcode()
55 return status, json.loads(resp_body) if resp_body else None
56
57
58async def main() -> None:
59 async with Actor:
60
61 input_ = await Actor.get_input() or {}
62 search_url: str = input_.get('url')
63 if not search_url:
64 raise RuntimeError('Input must contain "url"')
65
66 APIFY_YC_BASE_URL = os.getenv('APIFY_YC_BASE_URL')
67 APIFY_YC_KEY_ID = os.getenv('APIFY_YC_KEY_ID')
68 APIFY_YC_KEY_SECRET = os.getenv('APIFY_YC_KEY_SECRET')
69
70 if not APIFY_YC_BASE_URL or not APIFY_YC_KEY_ID or not APIFY_YC_KEY_SECRET:
71 raise RuntimeError('Missing APIFY_YC_BASE_URL, APIFY_YC_KEY_ID or APIFY_YC_KEY_SECRET environment variables')
72
73
74 try:
75 base_origin, path_prefix = _split_base_url(APIFY_YC_BASE_URL)
76 except Exception as e:
77 raise RuntimeError(str(e))
78
79 Actor.log.info('Triggering YC search job...')
80
81 raw_path_search = '/yc/search'
82 effective_path_search = (path_prefix + raw_path_search) or raw_path_search
83 auth_header = build_auth_header(APIFY_YC_KEY_ID, APIFY_YC_KEY_SECRET, 'POST', effective_path_search)
84 Actor.log.debug(f"POST path={effective_path_search}")
85 status, payload = http_request(
86 base_origin, 'POST', effective_path_search,
87 headers={'Authorization': auth_header},
88 data={'url': search_url},
89 )
90 if status != 200 or not payload or 'job_id' not in payload:
91 raise RuntimeError(f'Failed to trigger search job: status={status}, payload={payload}')
92
93 job_id = payload['job_id']
94 Actor.log.info(f'Job queued: {job_id}')
95
96
97 deadline = time.time() + 10 * 60
98 interval = 30
99 last_status = None
100 while time.time() < deadline:
101 raw_path_status = f'/yc/async/job/{job_id}'
102 effective_path_status = (path_prefix + raw_path_status) or raw_path_status
103 auth_header = build_auth_header(APIFY_YC_KEY_ID, APIFY_YC_KEY_SECRET, 'GET', effective_path_status)
104 try:
105 Actor.log.debug(f"GET path={effective_path_status}")
106 status_code, job_payload = http_request(
107 base_origin, 'GET', effective_path_status,
108 headers={'Authorization': auth_header},
109 )
110 except Exception as e:
111 Actor.log.warning(f'Error polling job status: {e}')
112 await asyncio.sleep(interval)
113 continue
114
115 if status_code != 200 or not job_payload:
116 Actor.log.warning(f'Unexpected job status response: {status_code} {job_payload}')
117 await asyncio.sleep(interval)
118 continue
119
120 last_status = job_payload.get('status')
121 Actor.log.info(f'Job status: {last_status}')
122 if last_status in ('completed', 'failed'):
123 await Actor.set_value('RESULT', job_payload)
124 if last_status == 'failed':
125 Actor.log.error('Job failed.')
126 else:
127 await Actor.push_data(job_payload)
128 Actor.log.info('Job completed successfully.')
129 return
130
131 await asyncio.sleep(interval)
132
133
134 Actor.log.error('Job did not complete within timeout. Returning last known status.')
135 await Actor.set_value('RESULT', {
136 'job_id': job_id,
137 'status': last_status or 'unknown',
138 'message': 'timeout',
139 })