1"""RSS & Atom Feed to JSON — Apify Actor.
2
3Fetches one or more RSS 2.0 or Atom feeds and normalizes every item into a
4single clean JSON schema, regardless of source format. Handles the messy
5reality of feeds: RSS vs Atom element names, multiple date formats, namespaced
6elements (content:encoded, dc:creator, media:*), and CDATA.
7
8The value here is the normalization: the same tidy record shape whether the
9source is RSS 2.0 or Atom, so downstream code never has to branch on format.
10"""
11
12from __future__ import annotations
13
14import asyncio
15import re
16from datetime import datetime, timezone
17from email.utils import parsedate_to_datetime
18from xml.etree import ElementTree as ET
19
20import httpx
21from apify import Actor
22
23NS = {
24 "atom": "http://www.w3.org/2005/Atom",
25 "content": "http://purl.org/rss/1.0/modules/content/",
26 "dc": "http://purl.org/dc/elements/1.1/",
27 "media": "http://search.yahoo.com/mrss/",
28}
29
30
31def _strip(s: str | None) -> str | None:
32 if s is None:
33 return None
34 s = s.strip()
35 return s or None
36
37
38def _normalize_date(raw: str | None) -> str | None:
39 """Convert RSS (RFC 822) or Atom (ISO 8601) dates to ISO 8601 UTC."""
40 if not raw:
41 return None
42 raw = raw.strip()
43
44 try:
45 dt = parsedate_to_datetime(raw)
46 if dt is not None:
47 if dt.tzinfo is None:
48 dt = dt.replace(tzinfo=timezone.utc)
49 return dt.astimezone(timezone.utc).isoformat()
50 except (TypeError, ValueError):
51 pass
52
53 try:
54 dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
55 if dt.tzinfo is None:
56 dt = dt.replace(tzinfo=timezone.utc)
57 return dt.astimezone(timezone.utc).isoformat()
58 except ValueError:
59 return raw
60
61
62def _localname(tag: str) -> str:
63 return tag.rsplit("}", 1)[-1]
64
65
66def _find_text(el, names: list[str]) -> str | None:
67 """Find first child whose local name matches any of `names`."""
68 for child in el:
69 if _localname(child.tag) in names and child.text:
70 return _strip(child.text)
71 return None
72
73
74def parse_rss_item(item) -> dict:
75 """Parse an RSS 2.0 <item>."""
76
77 link = _find_text(item, ["link"])
78
79 content = item.find("content:encoded", NS)
80 content_text = _strip(content.text) if content is not None and content.text else None
81 description = _find_text(item, ["description"])
82 creator = item.find("dc:creator", NS)
83 author = (_strip(creator.text) if creator is not None and creator.text else None) or _find_text(item, ["author"])
84
85 cats = [_strip(c.text) for c in item if _localname(c.tag) == "category" and c.text]
86
87 guid = _find_text(item, ["guid"])
88
89 media_url = None
90 for c in item:
91 if _localname(c.tag) == "enclosure" and c.get("url"):
92 media_url = c.get("url")
93 break
94 return {
95 "title": _find_text(item, ["title"]),
96 "link": link,
97 "summary": description,
98 "content": content_text,
99 "author": author,
100 "published": _normalize_date(_find_text(item, ["pubDate", "date"])),
101 "updated": None,
102 "categories": [c for c in cats if c],
103 "guid": guid,
104 "mediaUrl": media_url,
105 }
106
107
108def parse_atom_entry(entry) -> dict:
109 """Parse an Atom <entry>."""
110
111 link = None
112 for ln in entry.findall("atom:link", NS):
113 if ln.get("rel") in (None, "alternate") and ln.get("href"):
114 link = ln.get("href")
115 break
116 if link is None:
117 any_link = entry.find("atom:link", NS)
118 link = any_link.get("href") if any_link is not None else None
119
120 content_el = entry.find("atom:content", NS)
121 content_text = _strip(content_el.text) if content_el is not None and content_el.text else None
122 summary = _find_text(entry, ["summary"])
123
124 authors = [
125 _strip(n.text)
126 for n in entry.findall("atom:author/atom:name", NS)
127 if n is not None and n.text
128 ]
129 cats = [c.get("term") for c in entry.findall("atom:category", NS) if c.get("term")]
130
131 return {
132 "title": _find_text(entry, ["title"]),
133 "link": link,
134 "summary": summary,
135 "content": content_text,
136 "author": authors[0] if authors else None,
137 "published": _normalize_date(_find_text(entry, ["published", "issued"])),
138 "updated": _normalize_date(_find_text(entry, ["updated", "modified"])),
139 "categories": cats,
140 "guid": _find_text(entry, ["id"]),
141 "mediaUrl": None,
142 }
143
144
145def parse_feed(xml_text: str) -> tuple[dict, list[dict], str]:
146 """Parse a feed -> (feed_meta, items, feed_type)."""
147 root = ET.fromstring(xml_text)
148 tag = _localname(root.tag)
149
150 if tag == "rss":
151 channel = root.find("channel")
152 if channel is None:
153 return {}, [], "rss"
154 meta = {
155 "feedTitle": _find_text(channel, ["title"]),
156 "feedLink": _find_text(channel, ["link"]),
157 "feedDescription": _find_text(channel, ["description"]),
158 }
159 items = [parse_rss_item(it) for it in channel.findall("item")]
160 return meta, items, "rss"
161
162 if tag == "feed":
163 meta = {
164 "feedTitle": _find_text(root, ["title"]),
165 "feedLink": None,
166 "feedDescription": _find_text(root, ["subtitle"]),
167 }
168 for ln in root.findall("atom:link", NS):
169 if ln.get("rel") in (None, "alternate") and ln.get("href"):
170 meta["feedLink"] = ln.get("href")
171 break
172 items = [parse_atom_entry(e) for e in root.findall("atom:entry", NS)]
173 return meta, items, "atom"
174
175
176 items = [parse_rss_item(it) for it in root if _localname(it.tag) == "item"]
177 return {"feedTitle": None}, items, "rdf"
178
179
180async def main() -> None:
181 async with Actor:
182 actor_input = await Actor.get_input() or {}
183 urls = [u.strip() for u in (actor_input.get("feedUrls", []) or []) if u and u.strip()]
184 max_items = int(actor_input.get("maxItemsPerFeed", 100))
185 include_content = bool(actor_input.get("includeContent", True))
186
187 if not urls:
188 Actor.log.warning("No feed URLs provided. Add one or more RSS/Atom feed URLs.")
189 await Actor.push_data([])
190 return
191
192 Actor.log.info(f"Fetching {len(urls)} feed(s).")
193
194 async with httpx.AsyncClient(
195 timeout=40.0, follow_redirects=True,
196 headers={"User-Agent": "scrapeworks-rss-to-json/0.1", "Accept": "application/rss+xml, application/atom+xml, application/xml, text/xml"},
197 ) as client:
198 for url in urls:
199 xml_text = None
200 for attempt in range(1, 4):
201 try:
202 resp = await client.get(url)
203 resp.raise_for_status()
204 xml_text = resp.text
205 break
206 except httpx.HTTPError as exc:
207 Actor.log.warning(f"Fetch attempt {attempt} failed for {url}: {exc}")
208 if attempt < 3:
209 await asyncio.sleep(attempt * 2)
210
211 if xml_text is None:
212 await Actor.push_data([{"feedUrl": url, "error": "Failed to fetch feed"}])
213 continue
214
215 try:
216 meta, items, ftype = parse_feed(xml_text)
217 except ET.ParseError as exc:
218 Actor.log.warning(f"Could not parse {url}: {exc}")
219 await Actor.push_data([{"feedUrl": url, "error": f"Invalid feed XML: {exc}"}])
220 continue
221
222 Actor.log.info(f" {url}: {ftype} feed, {len(items)} items")
223 batch = []
224 for it in items[:max_items]:
225 if not include_content:
226 it.pop("content", None)
227 it["feedUrl"] = url
228 it["feedTitle"] = meta.get("feedTitle")
229 batch.append(it)
230 if batch:
231 await Actor.push_data(batch)
232
233 Actor.log.info("Done.")