1import { Actor, ApifyClient } from 'apify';
2import { z } from 'zod';
3
4const inputSchema = z.object({
5 actorCalls: z.array(
6 z.object({
7 invocationId: z.string().optional().nullable().default(null),
8 actorId: z.string(),
9 input: z.record(z.any()),
10 timeoutSecs: z.number().optional(),
11 webhooks: z.array(z.any()).optional().default([]),
12 maxItems: z.number().optional(),
13 memory: z.number().optional(),
14 build: z.string().optional(),
15 })
16 ),
17 throttleNotificationWebhook: z.string().optional(),
18 retryDelaySecs: z.number().optional().default(30),
19 webhooks: z.array(z.any()).optional().default([]),
20});
21
22const MEMORY_LIMIT_MESSAGE =
23 'By launching this job you will exceed the memory limit of';
24
25Actor.main(async () => {
26 const input = await Actor.getInput();
27 const parsedInput = inputSchema.parse(input);
28 const apify = new ApifyClient({
29 token: process.env.APIFY_TOKEN,
30 });
31
32 const actorCalls = parsedInput.actorCalls;
33 const throttleNotificationWebhook = parsedInput.throttleNotificationWebhook;
34
35 let retries = 0;
36 for (let i = 0; i < actorCalls.length; ) {
37 const actorCall = actorCalls[i];
38 const actor = await apify.actor(actorCall.actorId);
39 const actorInfo = await actor.get();
40
41 if (!actorInfo) {
42 throw new Error(`Actor ${actorCall.actorId} not found`);
43 }
44
45 try {
46 const run = await actor.start(actorCall.input, {
47 timeout: actorCall.timeoutSecs,
48 maxItems: actorCall.maxItems,
49 memory: actorCall.memory,
50 build: actorCall.build,
51 webhooks: [...parsedInput.webhooks, ...actorCall.webhooks],
52 });
53
54 await Actor.pushData({
55 invocationId: actorCall.invocationId,
56 actorId: actorCall.actorId,
57 success: true,
58 runId: run.id,
59 retries,
60 error: null,
61 });
62
63 i += 1;
64 retries = 0;
65 } catch (error) {
66 if (
67 error instanceof Error &&
68 error.message.includes(MEMORY_LIMIT_MESSAGE)
69 ) {
70 const errorMessage =
71 retries > 0
72 ? `Memory limit exceeded when calling ${actorCall.actorId}, retries: ${retries}`
73 : `Memory limit exceeded when calling ${actorCall.actorId}`;
74
75 console.log(errorMessage);
76 if (throttleNotificationWebhook) {
77 await sendThrottleNotification({
78 webhookUrl: throttleNotificationWebhook,
79 actorId: actorInfo.id,
80 invocationId: actorCall.invocationId,
81 });
82 }
83
84 await new Promise((resolve) =>
85 setTimeout(resolve, parsedInput.retryDelaySecs * 1000)
86 );
87 retries += 1;
88 } else {
89 await Actor.pushData({
90 invocationId: actorCall.invocationId,
91 actorId: actorCall.actorId,
92 success: false,
93 runId: null,
94 retries,
95 error: error instanceof Error ? error.message : 'Unknown error',
96 });
97 i += 1;
98 retries = 0;
99 }
100 }
101 }
102});
103
104type ThrottleNotificationParams = {
105 webhookUrl: string;
106 actorId: string;
107 invocationId: string | null;
108};
109
110function sendThrottleNotification({
111 webhookUrl,
112 actorId,
113 invocationId,
114}: ThrottleNotificationParams) {
115 console.log(`Sending throttle notification for ${actorId}`);
116 fetch(webhookUrl, {
117 method: 'POST',
118 body: JSON.stringify({
119 createdAt: new Date().toISOString(),
120 eventType: 'ACTOR.RUN.THROTTLED',
121 eventData: {
122 actorId,
123 },
124 resource: {
125 invocationId,
126 actId: actorId,
127 },
128 status: 'THROTTLED',
129 }),
130 }).catch((err) => {
131 console.error(
132 `Failed to send throttle notification webhook for ${actorId}: ${err}`
133 );
134 });
135}