1"""NWS Weather Alerts — Apify Actor.
2
3Fetches active weather alerts from the official US National Weather Service API
4(api.weather.gov) and returns clean, structured JSON for each alert: event type,
5severity, urgency, affected areas, timing, and the headline/description.
6
7The NWS API returns CAP-style alerts wrapped in GeoJSON with deeply nested
8properties; this actor flattens them into tidy records and lets you filter by
9US state/area, severity, and urgency.
10"""
11
12from __future__ import annotations
13
14import asyncio
15from urllib.parse import urlencode
16
17import httpx
18from apify import Actor
19
20ALERTS_URL = "https://api.weather.gov/alerts/active"
21
22
23SEVERITIES = {"Extreme", "Severe", "Moderate", "Minor", "Unknown"}
24URGENCIES = {"Immediate", "Expected", "Future", "Past", "Unknown"}
25
26US_AREAS = {
27 "AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA",
28 "KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
29 "NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT",
30 "VA","WA","WV","WI","WY","DC","PR","VI","GU","AS","MP",
31}
32
33
34def transform_alert(feature: dict) -> dict:
35 """Flatten one NWS GeoJSON alert feature into a clean record."""
36 p = feature.get("properties") or {}
37
38
39 geocode = p.get("geocode") or {}
40 same = geocode.get("SAME") or []
41 ugc = geocode.get("UGC") or []
42
43 return {
44 "id": p.get("id") or feature.get("id"),
45 "event": p.get("event"),
46 "severity": p.get("severity"),
47 "urgency": p.get("urgency"),
48 "certainty": p.get("certainty"),
49 "status": p.get("status"),
50 "messageType": p.get("messageType"),
51 "headline": p.get("headline"),
52 "areaDescription": p.get("areaDesc"),
53 "senderName": p.get("senderName"),
54 "effective": p.get("effective"),
55 "onset": p.get("onset"),
56 "expires": p.get("expires"),
57 "ends": p.get("ends"),
58 "description": p.get("description"),
59 "instruction": p.get("instruction"),
60 "response": p.get("response"),
61 "category": p.get("category"),
62 "sameCodes": same,
63 "ugcCodes": ugc,
64 }
65
66
67def build_url(actor_input: dict) -> str:
68 params: dict = {}
69 area = (actor_input.get("area") or "").strip().upper()
70 if area:
71
72 codes = [c.strip() for c in area.split(",") if c.strip() in US_AREAS]
73 if codes:
74 params["area"] = ",".join(codes)
75
76 severity = (actor_input.get("severity") or "").strip()
77 if severity in SEVERITIES:
78 params["severity"] = severity
79
80 urgency = (actor_input.get("urgency") or "").strip()
81 if urgency in URGENCIES:
82 params["urgency"] = urgency
83
84 event = (actor_input.get("event") or "").strip()
85 if event:
86 params["event"] = event
87
88
89
90
91
92 return f"{ALERTS_URL}?{urlencode(params)}" if params else ALERTS_URL
93
94
95async def main() -> None:
96 async with Actor:
97 actor_input = await Actor.get_input() or {}
98 url = build_url(actor_input)
99 max_items = int(actor_input.get("maxItems", 500))
100
101 Actor.log.info(f"Fetching active NWS alerts: {url}")
102
103 async with httpx.AsyncClient(
104 timeout=40.0,
105 headers={
106
107 "User-Agent": "scrapeworks-nws-weather-alerts/0.1 (https://apify.com/scrapeworks)",
108 "Accept": "application/geo+json",
109 },
110 ) as client:
111 pushed = 0
112 next_url = url
113 page = 0
114
115 while next_url and pushed < max_items and page < 20:
116 data = None
117 for attempt in range(1, 4):
118 try:
119 resp = await client.get(next_url)
120 resp.raise_for_status()
121 data = resp.json()
122 break
123 except (httpx.HTTPError, ValueError) as exc:
124 Actor.log.warning(f"Attempt {attempt} failed: {exc}")
125 if attempt < 3:
126 await asyncio.sleep(attempt * 2)
127
128 if data is None:
129 Actor.log.error("Failed to fetch alerts.")
130 break
131
132 features = data.get("features") or []
133 if page == 0:
134 Actor.log.info(f"NWS returned {len(features)} alerts on first page.")
135
136 if not features:
137 break
138
139 batch = [transform_alert(f) for f in features[: max_items - pushed]]
140 if batch:
141 await Actor.push_data(batch)
142 pushed += len(batch)
143 Actor.log.info(f"Pushed {pushed}/{max_items} alerts.")
144
145
146 pagination = data.get("pagination") or {}
147 next_url = pagination.get("next")
148 page += 1
149 if next_url:
150 await asyncio.sleep(1)
151
152 Actor.log.info(f"Done. Returned {pushed} alerts.")