1"""Crypto Market Intelligence Scraper — Apify Actor
2多源加密货币数据聚合:Binance 行情 + CoinGecko 排行 + DexScreener 链上数据 + 技术指标。
3所有数据源均为公开 API,无需认证,100% 合法可用。
4"""
5
6import asyncio
7import hashlib
8import hmac
9import time
10from typing import Optional
11
12from apify import Actor
13import httpx
14
15
16
17BINANCE_API = "https://api.binance.com/api/v3"
18COINGECKO_API = "https://api.coingecko.com/api/v3"
19DEXSCREENER_API = "https://api.dexscreener.com"
20USER_AGENT = "ApifyCryptoScraper/1.0"
21HTTP_TIMEOUT = 25.0
22
23
24
25
26def calc_rsi(closes: list[float], period: int = 14) -> Optional[float]:
27 """计算 RSI 相对强弱指数。"""
28 if len(closes) < period + 1:
29 return None
30 gains = []
31 losses = []
32 for i in range(1, period + 1):
33 diff = closes[-(period + 1) + i] - closes[-(period + 1) + i - 1]
34 gains.append(diff if diff > 0 else 0)
35 losses.append(-diff if diff < 0 else 0)
36 avg_gain = sum(gains) / period
37 avg_loss = sum(losses) / period
38 if avg_loss == 0:
39 return 100.0
40 rs = avg_gain / avg_loss
41 return round(100 - 100 / (1 + rs), 2)
42
43
44def calc_ema(closes: list[float], period: int) -> Optional[float]:
45 """计算指数移动平均线。"""
46 if len(closes) < period:
47 return None
48 k = 2 / (period + 1)
49 ema = sum(closes[:period]) / period
50 for price in closes[period:]:
51 ema = price * k + ema * (1 - k)
52 return round(ema, 4)
53
54
55def calc_macd(closes: list[float]) -> dict:
56 """计算 MACD 指标。"""
57 ema12 = calc_ema(closes, 12)
58 ema26 = calc_ema(closes, 26)
59 if ema12 is None or ema26 is None:
60 return {"macd": None, "signal": None, "histogram": None}
61 macd = ema12 - ema26
62
63 return {
64 "macd": round(macd, 6),
65 "signal": None,
66 "histogram": None,
67 }
68
69
70def calc_bollinger(closes: list[float], period: int = 20) -> Optional[dict]:
71 """计算布林带。"""
72 if len(closes) < period:
73 return None
74 recent = closes[-period:]
75 sma = sum(recent) / period
76 variance = sum((x - sma) ** 2 for x in recent) / period
77 std = variance ** 0.5
78 return {
79 "upper": round(sma + 2 * std, 4),
80 "middle": round(sma, 4),
81 "lower": round(sma - 2 * std, 4),
82 "width_pct": round(4 * std / sma * 100, 2),
83 }
84
85
86def compute_indicators(klines: list[list]) -> dict:
87 """从 K 线数据计算所有技术指标。"""
88 closes = [float(k[4]) for k in klines if len(k) >= 5]
89
90 rsi = calc_rsi(closes)
91 ema5 = calc_ema(closes, 5)
92 ema12 = calc_ema(closes, 12)
93 ema26 = calc_ema(closes, 26)
94 macd = calc_macd(closes)
95 bollinger = calc_bollinger(closes)
96
97 return {
98 "rsi_14": rsi,
99 "ema_5": ema5,
100 "ema_12": ema12,
101 "ema_26": ema26,
102 "macd": macd.get("macd"),
103 "bollinger": bollinger,
104 "current_price": closes[-1] if closes else None,
105 "candles_used": len(closes),
106 }
107
108
109
110
111async def fetch_binance_tickers(
112 client: httpx.AsyncClient, symbols: list[str]
113) -> list[dict]:
114 """抓取 Binance 24h 行情数据。"""
115 result = []
116
117 batch_size = 100
118 for i in range(0, len(symbols), batch_size):
119 batch = symbols[i:i + batch_size]
120 symbols_param = ",".join([f'"{s}"' for s in batch])
121 url = f"{BINANCE_API}/ticker/24hr?symbols=[{symbols_param}]"
122 try:
123 resp = await client.get(url, timeout=HTTP_TIMEOUT)
124 resp.raise_for_status()
125 data = resp.json()
126 for item in data:
127 result.append({
128 "symbol": item["symbol"],
129 "price": float(item["lastPrice"]),
130 "price_change_pct": float(item["priceChangePercent"]),
131 "high_24h": float(item["highPrice"]),
132 "low_24h": float(item["lowPrice"]),
133 "volume_24h": float(item["volume"]),
134 "volume_usdt": float(item["quoteVolume"]),
135 })
136 except Exception as e:
137 await _log_warning(f"Binance ticker 失败: {e}")
138 return result
139
140
141async def fetch_binance_klines(
142 client: httpx.AsyncClient, symbols: list[str], timeframes: list[str]
143) -> dict[str, dict[str, list]]:
144 """抓取 Binance K 线数据(每币种每周期取最近 100 根)。"""
145 result: dict[str, dict[str, list]] = {}
146
147 for symbol in symbols:
148 result[symbol] = {}
149 for tf in timeframes:
150 url = f"{BINANCE_API}/klines?symbol={symbol}&interval={tf}&limit=100"
151 try:
152 resp = await client.get(url, timeout=HTTP_TIMEOUT)
153 resp.raise_for_status()
154 result[symbol][tf] = resp.json()
155 await asyncio.sleep(0.05)
156 except Exception as e:
157 await _log_warning(f"K线 {symbol} {tf} 失败: {e}")
158 result[symbol][tf] = []
159
160 return result
161
162
163async def fetch_coingecko_gainers(
164 client: httpx.AsyncClient,
165) -> list[dict]:
166 """抓取 CoinGecko 24h 涨幅榜(Top 30)。"""
167 url = f"{COINGECKO_API}/search/trending"
168 try:
169 resp = await client.get(url, timeout=HTTP_TIMEOUT)
170 resp.raise_for_status()
171 data = resp.json()
172 coins = data.get("coins", [])[:30]
173 return [
174 {
175 "name": c["item"]["name"],
176 "symbol": c["item"]["symbol"],
177 "market_cap_rank": c["item"].get("market_cap_rank"),
178 "score": c["item"].get("score"),
179 }
180 for c in coins
181 ]
182 except Exception as e:
183 await _log_warning(f"CoinGecko trending 失败: {e}")
184 return []
185
186
187async def _log_warning(msg: str):
188 """安全日志(兼容 Apify 环境和本地测试)。"""
189 try:
190 await Actor.log.warning(msg)
191 except TypeError:
192 print(f"[WARN] {msg}")
193
194
195async def fetch_dex_pairs(
196 client: httpx.AsyncClient, max_pairs: int
197) -> list[dict]:
198 """从 DexScreener 抓取热门交易对(搜索主流代币并合并结果)。"""
199 search_terms = ["WETH", "USDC", "WBTC", "SOL", "PEPE"]
200 all_pairs = []
201
202 for term in search_terms:
203 url = f"{DEXSCREENER_API}/latest/dex/search?q={term}"
204 try:
205 resp = await client.get(url, timeout=HTTP_TIMEOUT)
206 resp.raise_for_status()
207 data = resp.json()
208 for p in data.get("pairs", []):
209 vol = float(p.get("volume", {}).get("h24", 0) or 0)
210 all_pairs.append({
211 "chain": p.get("chainId", ""),
212 "dex": p.get("dexId", ""),
213 "pair": f"{p.get('baseToken', {}).get('symbol', '')}/{p.get('quoteToken', {}).get('symbol', '')}",
214 "price_usd": float(p.get("priceUsd", 0) or 0),
215 "volume_24h": vol,
216 "liquidity_usd": float(p.get("liquidity", {}).get("usd", 0) or 0),
217 "price_change_24h": float(p.get("priceChange", {}).get("h24", 0) or 0),
218 "pair_address": p.get("pairAddress", ""),
219 "url": p.get("url", ""),
220 })
221 await asyncio.sleep(0.3)
222 except Exception as e:
223 await _log_warning(f"DexScreener search '{term}' 失败: {e}")
224
225
226 seen = set()
227 unique_pairs = []
228 for p in sorted(all_pairs, key=lambda x: x["volume_24h"], reverse=True):
229 key = p["pair_address"]
230 if key and key not in seen:
231 seen.add(key)
232 unique_pairs.append(p)
233 if len(unique_pairs) >= max_pairs:
234 break
235
236 return unique_pairs
237
238
239async def fetch_coingecko_prices(
240 client: httpx.AsyncClient, coin_ids: list[str]
241) -> dict:
242 """抓取 CoinGecko 价格(作为交叉验证)。"""
243 ids = ",".join(coin_ids[:50])
244 url = f"{COINGECKO_API}/simple/price?ids={ids}&vs_currencies=usd&include_24hr_change=true"
245 try:
246 resp = await client.get(url, timeout=HTTP_TIMEOUT)
247 resp.raise_for_status()
248 return resp.json()
249 except Exception as e:
250 await _log_warning(f"CoinGecko prices 失败: {e}")
251 return {}
252
253
254
255
256async def main():
257 async with Actor:
258 actor_input = await Actor.get_input()
259
260 symbols = actor_input.get("symbols", ["BTCUSDT", "ETHUSDT"])
261 if isinstance(symbols, str):
262 symbols = [s.strip().upper() for s in symbols.split(",") if s.strip()]
263 symbols = [s.strip().upper() for s in symbols if s.strip()]
264
265 timeframes = actor_input.get("timeframes", ["1h", "4h"])
266 include_dex = actor_input.get("includeDexData", True)
267 include_indicators = actor_input.get("includeIndicators", True)
268 include_gainers = actor_input.get("includeTopGainers", True)
269 max_dex_pairs = actor_input.get("maxDexPairs", 50)
270
271 await Actor.log.info(
272 f"Crypto Market Intelligence 启动\n"
273 f" 币种: {len(symbols)} 个\n"
274 f" 周期: {timeframes}\n"
275 f" DEX: {include_dex}, 指标: {include_indicators}, 涨幅: {include_gainers}"
276 )
277
278 async with httpx.AsyncClient(
279 headers={"User-Agent": USER_AGENT},
280 timeout=HTTP_TIMEOUT,
281 limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
282 ) as client:
283
284 tickers_task = fetch_binance_tickers(client, symbols)
285 klines_task = fetch_binance_klines(client, symbols, timeframes)
286 gainers_task = fetch_coingecko_gainers(client) if include_gainers else asyncio.sleep(0)
287 dex_task = fetch_dex_pairs(client, max_dex_pairs) if include_dex else asyncio.sleep(0)
288
289 tickers, klines, gainers, dex_pairs = await asyncio.gather(
290 tickers_task, klines_task, gainers_task, dex_task
291 )
292
293 if isinstance(gainers, type(None)):
294 gainers = []
295 if isinstance(dex_pairs, type(None)):
296 dex_pairs = []
297
298
299 output = {
300 "timestamp": int(time.time()),
301 "symbols": [],
302 "top_gainers": gainers,
303 "dex_top_pairs": dex_pairs,
304 }
305
306 for symbol in symbols:
307 symbol_data = {
308 "symbol": symbol,
309 "ticker": {},
310 "indicators": {},
311 }
312
313
314 for t in tickers:
315 if t["symbol"] == symbol:
316 symbol_data["ticker"] = t
317 break
318
319
320 if include_indicators and symbol in klines:
321 for tf in timeframes:
322 if klines[symbol].get(tf):
323 indicators = compute_indicators(klines[symbol][tf])
324 symbol_data["indicators"][tf] = indicators
325
326 output["symbols"].append(symbol_data)
327
328
329 await Actor.push_data(output)
330
331 await Actor.log.info(
332 f"完成: {len(output['symbols'])} 个币种, "
333 f"{len(gainers)} 个热门币, {len(dex_pairs)} 个 DEX 交易对"
334 )