1"""robots.txt Parser & Tester — Apify Actor.
2
3Fetches and parses robots.txt for one or more sites, returning a structured
4breakdown: per-user-agent allow/disallow rules, crawl-delay, declared sitemaps,
5and (optionally) a verdict on whether specific test URLs are allowed for a
6chosen user-agent.
7
8Implements the standard robots.txt matching semantics: longest-match wins
9between Allow and Disallow, `*` wildcards, and `$` end-anchors.
10"""
11
12from __future__ import annotations
13
14import asyncio
15import re
16from urllib.parse import urljoin, urlparse
17
18import httpx
19from apify import Actor
20
21
22def parse_robots(text: str) -> dict:
23 """Parse robots.txt content into structured groups + sitemaps.
24
25 Returns {'groups': {ua: {'allow':[...], 'disallow':[...], 'crawlDelay':x}},
26 'sitemaps': [...]}.
27 """
28 groups: dict = {}
29 sitemaps: list[str] = []
30 current_uas: list[str] = []
31
32
33 last_was_ua = False
34
35 for raw in text.splitlines():
36 line = raw.split("#", 1)[0].strip()
37 if not line:
38 continue
39 if ":" not in line:
40 continue
41 field, _, value = line.partition(":")
42 field = field.strip().lower()
43 value = value.strip()
44
45 if field == "user-agent":
46 if not last_was_ua:
47 current_uas = []
48 ua = value.lower()
49 current_uas.append(ua)
50 groups.setdefault(ua, {"allow": [], "disallow": [], "crawlDelay": None})
51 last_was_ua = True
52 continue
53
54 last_was_ua = False
55
56 if field == "sitemap":
57 if value:
58 sitemaps.append(value)
59 elif field in ("allow", "disallow") and current_uas:
60 for ua in current_uas:
61 groups[ua][field].append(value)
62 elif field == "crawl-delay" and current_uas:
63 try:
64 cd = float(value)
65 except ValueError:
66 cd = None
67 for ua in current_uas:
68 groups[ua]["crawlDelay"] = cd
69
70 return {"groups": groups, "sitemaps": sitemaps}
71
72
73def _rule_to_regex(pattern: str) -> re.Pattern:
74 """Convert a robots.txt path pattern to a regex (handles * and $)."""
75
76 out = []
77 i = 0
78 for ch in pattern:
79 if ch == "*":
80 out.append(".*")
81 elif ch == "$":
82 out.append("$")
83 else:
84 out.append(re.escape(ch))
85 return re.compile("^" + "".join(out))
86
87
88def _match_len(rules: list[str], path: str) -> int:
89 """Return the length of the longest matching rule pattern, or -1 if none."""
90 best = -1
91 for r in rules:
92 if r == "":
93 continue
94 try:
95 if _rule_to_regex(r).match(path):
96
97 best = max(best, len(r))
98 except re.error:
99 continue
100 return best
101
102
103def select_group(groups: dict, user_agent: str) -> dict | None:
104 """Pick the rule group for a user-agent (exact match, else '*')."""
105 ua = user_agent.lower()
106 if ua in groups:
107 return groups[ua]
108
109 for key in groups:
110 if key != "*" and key and key in ua:
111 return groups[key]
112 return groups.get("*")
113
114
115def is_allowed(parsed: dict, url_path: str, user_agent: str) -> bool:
116 """Determine if url_path is allowed for user_agent per longest-match rule."""
117 group = select_group(parsed["groups"], user_agent)
118 if not group:
119 return True
120 allow_len = _match_len(group["allow"], url_path)
121 disallow_len = _match_len(group["disallow"], url_path)
122 if disallow_len == -1:
123 return True
124 if allow_len == -1:
125 return False
126
127 return allow_len >= disallow_len
128
129
130async def fetch_robots(client: httpx.AsyncClient, robots_url: str, log) -> tuple[str | None, int | None]:
131 for attempt in range(1, 4):
132 try:
133 resp = await client.get(robots_url)
134 return (resp.text if resp.status_code < 400 else None), resp.status_code
135 except httpx.HTTPError as exc:
136 log.warning(f"Fetch attempt {attempt} failed for {robots_url}: {exc}")
137 if attempt < 3:
138 await asyncio.sleep(attempt * 2)
139 return None, None
140
141
142async def main() -> None:
143 async with Actor:
144 actor_input = await Actor.get_input() or {}
145 sites = [s.strip() for s in (actor_input.get("sites", []) or []) if s and s.strip()]
146 user_agent = (actor_input.get("userAgent") or "*").strip() or "*"
147 test_paths = [p.strip() for p in (actor_input.get("testPaths", []) or []) if p and p.strip()]
148
149 if not sites:
150 Actor.log.warning("No sites provided.")
151 await Actor.push_data([])
152 return
153
154 async with httpx.AsyncClient(
155 timeout=30.0, follow_redirects=True,
156 headers={"User-Agent": "scrapeworks-robots-txt/0.1 (https://apify.com/scrapeworks)"},
157 ) as client:
158 for site in sites:
159
160 if not re.match(r"^https?://", site, re.I):
161 site = "https://" + site
162 parsed_url = urlparse(site)
163 robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
164
165 text, status = await fetch_robots(client, robots_url, Actor.log)
166 if text is None:
167 await Actor.push_data([{
168 "site": site, "robotsUrl": robots_url, "success": False,
169 "statusCode": status, "error": "No robots.txt found or not reachable",
170 }])
171 Actor.log.info(f" {robots_url}: not available (status {status})")
172 continue
173
174 parsed = parse_robots(text)
175
176
177 group = select_group(parsed["groups"], user_agent)
178 record = {
179 "site": site,
180 "robotsUrl": robots_url,
181 "success": True,
182 "statusCode": status,
183 "userAgentChecked": user_agent,
184 "sitemaps": parsed["sitemaps"],
185 "userAgentsDeclared": sorted(parsed["groups"].keys()),
186 "appliedGroupAllow": group["allow"] if group else [],
187 "appliedGroupDisallow": group["disallow"] if group else [],
188 "crawlDelay": group["crawlDelay"] if group else None,
189 }
190
191
192 if test_paths:
193 results = []
194 for p in test_paths:
195
196 path = urlparse(p).path or "/" if p.startswith("http") else (p if p.startswith("/") else "/" + p)
197 results.append({"path": p, "allowed": is_allowed(parsed, path, user_agent)})
198 record["testResults"] = results
199
200 await Actor.push_data([record])
201 Actor.log.info(f" {robots_url}: parsed, {len(parsed['sitemaps'])} sitemap(s), "
202 f"{len(parsed['groups'])} UA group(s)")
203
204 Actor.log.info("Done.")