1import { Actor, ApifyClient } from 'apify';
2import { z } from 'zod';
3
4const inputSchema = z.object({
5 actorCalls: z.array(
6 z.object({
7 isTask: z.boolean().optional().default(false),
8 invocationId: z.string().optional().nullable().default(null),
9 actorId: z.string(),
10 input: z.record(z.any()),
11 timeoutSecs: z.number().optional(),
12 webhooks: z.array(z.any()).optional().default([]),
13 maxItems: z.number().optional(),
14 memory: z.number().optional(),
15 build: z.string().optional(),
16 })
17 ),
18 throttleNotificationWebhook: z.string().optional(),
19 retryDelaySecs: z.number().optional().default(30),
20 webhooks: z.array(z.any()).optional().default([]),
21});
22
23const MEMORY_LIMIT_MESSAGE =
24 'By launching this job you will exceed the memory limit of';
25
26Actor.main(async () => {
27 const input = await Actor.getInput();
28 const parsedInput = inputSchema.parse(input);
29 const apify = new ApifyClient({
30 token: process.env.APIFY_TOKEN,
31 });
32
33 const actorCalls = parsedInput.actorCalls;
34 const throttleNotificationWebhook = parsedInput.throttleNotificationWebhook;
35
36 let retries = 0;
37 for (let i = 0; i < actorCalls.length; ) {
38 const actorCall = actorCalls[i];
39 const runner = actorCall.isTask
40 ? await apify.task(actorCall.actorId)
41 : await apify.actor(actorCall.actorId);
42 const actorInfo = await runner.get();
43
44 if (!actorInfo) {
45 throw new Error(`Actor ${actorCall.actorId} not found`);
46 }
47
48 try {
49 const run = await runner.start(actorCall.input, {
50 timeout: actorCall.timeoutSecs,
51 maxItems: actorCall.maxItems,
52 memory: actorCall.memory,
53 build: actorCall.build,
54 webhooks: [...parsedInput.webhooks, ...actorCall.webhooks],
55 });
56
57 await Actor.pushData({
58 invocationId: actorCall.invocationId,
59 actorId: actorCall.actorId,
60 isTask: actorCall.isTask || false,
61 success: true,
62 runId: run.id,
63 retries,
64 error: null,
65 });
66
67 i += 1;
68 retries = 0;
69 } catch (error) {
70 if (
71 error instanceof Error &&
72 error.message.includes(MEMORY_LIMIT_MESSAGE)
73 ) {
74 const errorMessage =
75 retries > 0
76 ? `Memory limit exceeded when calling ${actorCall.actorId}, retries: ${retries}`
77 : `Memory limit exceeded when calling ${actorCall.actorId}`;
78
79 console.log(errorMessage);
80 if (throttleNotificationWebhook) {
81 await sendThrottleNotification({
82 webhookUrl: throttleNotificationWebhook,
83 actorId: actorInfo.id,
84 invocationId: actorCall.invocationId,
85 });
86 }
87
88 await new Promise((resolve) =>
89 setTimeout(resolve, parsedInput.retryDelaySecs * 1000)
90 );
91 retries += 1;
92 } else {
93 await Actor.pushData({
94 invocationId: actorCall.invocationId,
95 actorId: actorCall.actorId,
96 success: false,
97 runId: null,
98 retries,
99 error: error instanceof Error ? error.message : 'Unknown error',
100 });
101 i += 1;
102 retries = 0;
103 }
104 }
105 }
106});
107
108type ThrottleNotificationParams = {
109 webhookUrl: string;
110 actorId: string;
111 invocationId: string | null;
112};
113
114function sendThrottleNotification({
115 webhookUrl,
116 actorId,
117 invocationId,
118}: ThrottleNotificationParams) {
119 console.log(`Sending throttle notification for ${actorId}`);
120 fetch(webhookUrl, {
121 method: 'POST',
122 body: JSON.stringify({
123 createdAt: new Date().toISOString(),
124 eventType: 'ACTOR.RUN.THROTTLED',
125 eventData: {
126 actorId,
127 },
128 resource: {
129 invocationId,
130 actId: actorId,
131 },
132 status: 'THROTTLED',
133 }),
134 }).catch((err) => {
135 console.error(
136 `Failed to send throttle notification webhook for ${actorId}: ${err}`
137 );
138 });
139}