1import json
2import os
3import time
4import urllib.request
5import urllib.parse
6import asyncio
7from typing import Optional
8
9from apify import Actor
10
11def get_max_items_for_user():
12 """Get max items based on user subscription - 10 for free users, unlimited for paid"""
13 max_paid_items = os.getenv('ACTOR_MAX_PAID_DATASET_ITEMS')
14 if max_paid_items:
15 return int(max_paid_items)
16 return 10
17
18
19def _split_base_url(raw: str) -> tuple[str, str]:
20 """Split APIFY_YC_BASE_URL into (origin, path_prefix).
21 origin = scheme://host[:port]
22 path_prefix = normalized leading path, e.g. '' or '/api' or '/custom/yc'
23 """
24 parsed = urllib.parse.urlparse(raw)
25 if not parsed.scheme or not parsed.netloc:
26 raise RuntimeError(f"Invalid APIFY_YC_BASE_URL: {raw}")
27 origin = f"{parsed.scheme}://{parsed.netloc}"
28
29 prefix = parsed.path or ''
30 if prefix and not prefix.startswith('/'):
31 prefix = '/' + prefix
32 if prefix.endswith('/') and len(prefix) > 1:
33 prefix = prefix[:-1]
34 return origin, prefix
35
36
37def http_request(base_url: str, method: str, path: str, headers: dict, data: Optional[dict] = None):
38 url = urllib.parse.urljoin(base_url.rstrip('/') + '/', path.lstrip('/'))
39 body_bytes = None
40 if data is not None:
41 body_bytes = json.dumps(data).encode('utf-8')
42 headers.setdefault('Content-Type', 'application/json')
43 req = urllib.request.Request(url=url, data=body_bytes, headers=headers, method=method)
44 with urllib.request.urlopen(req, timeout=60) as resp:
45 resp_body = resp.read().decode('utf-8')
46 status = resp.getcode()
47 return status, json.loads(resp_body) if resp_body else None
48
49
50async def main() -> None:
51 async with Actor:
52
53 input_ = await Actor.get_input() or {}
54 search_url: str = input_.get('url')
55 if not search_url:
56 raise RuntimeError('Input must contain "url"')
57
58 APIFY_YC_BASE_URL = os.getenv('APIFY_YC_BASE_URL')
59 APIFY_YC_KEY_ID = os.getenv('APIFY_YC_KEY_ID')
60 APIFY_YC_KEY_SECRET = os.getenv('APIFY_YC_KEY_SECRET')
61
62 if not APIFY_YC_BASE_URL or not APIFY_YC_KEY_ID or not APIFY_YC_KEY_SECRET:
63 raise RuntimeError('Missing APIFY_YC_BASE_URL, APIFY_YC_KEY_ID or APIFY_YC_KEY_SECRET environment variables')
64
65
66 try:
67 base_origin, path_prefix = _split_base_url(APIFY_YC_BASE_URL)
68 except Exception as e:
69 raise RuntimeError(str(e))
70
71 Actor.log.info('Triggering YC search job...')
72
73 raw_path_search = '/yc/search'
74 effective_path_search = (path_prefix + raw_path_search) or raw_path_search
75 status, payload = http_request(
76 base_origin, 'POST', effective_path_search,
77 headers={'Authorization': f'Bearer {APIFY_YC_KEY_ID}:{APIFY_YC_KEY_SECRET}'},
78 data={'url': search_url},
79 )
80 if status != 200 or not payload or 'job_id' not in payload:
81 raise RuntimeError(f'Failed to trigger search job: status={status}, payload={payload}')
82
83 job_id = payload['job_id']
84 Actor.log.info(f'Job queued: {job_id}')
85
86
87 deadline = time.time() + 15 * 60
88 interval = 15
89 last_status = None
90 while time.time() < deadline:
91 raw_path_status = f'/yc/async/job/{job_id}'
92 effective_path_status = (path_prefix + raw_path_status) or raw_path_status
93 try:
94 Actor.log.debug(f"GET path={effective_path_status}")
95 status_code, job_payload = http_request(
96 base_origin, 'GET', effective_path_status,
97 headers={'Authorization': f'Bearer {APIFY_YC_KEY_ID}:{APIFY_YC_KEY_SECRET}'},
98 )
99 except Exception as e:
100 Actor.log.warning(f'Error polling job status: {e}')
101 await asyncio.sleep(interval)
102 continue
103
104 if status_code != 200 or not job_payload:
105 Actor.log.warning(f'Unexpected job status response: {status_code} {job_payload}')
106 await asyncio.sleep(interval)
107 continue
108
109 last_status = job_payload.get('status')
110 Actor.log.info(f'Job status: {last_status}')
111 if last_status in ('completed', 'failed'):
112 await Actor.set_value('RESULT', job_payload)
113 if last_status == 'failed':
114 Actor.log.error('Job failed.')
115 else:
116 companies = job_payload.get("companies") or []
117 max_items = get_max_items_for_user()
118
119
120 limited_companies = companies[:max_items]
121 await Actor.push_data(limited_companies)
122
123 if len(companies) > max_items:
124 Actor.log.info(f'Limited results to {max_items} items due to subscription limits. Free subscription (10), paid (unlimited according to budget).')
125 return
126
127 await asyncio.sleep(interval)
128
129
130 Actor.log.error('Job did not complete within timeout. Returning last known status.')
131 await Actor.set_value('RESULT', {
132 'job_id': job_id,
133 'status': last_status or 'unknown',
134 'message': 'timeout',
135 })