Apify Run Queue avatar
Apify Run Queue

Pricing

Pay per usage

Go to Store
Apify Run Queue

Apify Run Queue

Developed by

Lexis Solutions

Maintained by Community

Apify utility to queue runs until memory is available. Workaround to memory exceeded errors on Apify. This actor will retry starting actor starts with a delay. Open source and free!

5.0 (3)

Pricing

Pay per usage

7

Total users

5

Monthly users

5

Last modified

3 days ago

.dockerignore

1# configurations
2.idea
3
4# crawlee storage folder
5storage
6
7# installed files
8node_modules

.gitignore

1storage
2node_modules
3.venv

Dockerfile

1FROM apify/actor-node:latest AS builder
2
3WORKDIR /app
4
5COPY package.json yarn.lock ./
6
7# install dependencies
8RUN yarn install --production=false
9
10# copy the source code
11COPY . ./
12
13RUN yarn build
14
15# runner image
16
17FROM apify/actor-node:latest
18
19WORKDIR /app
20
21COPY package.json yarn.lock ./
22
23RUN yarn install --production
24
25COPY --from=builder /app/dist ./dist
26
27COPY . .
28
29CMD ["yarn", "start"]

package.json

1{
2  "name": "apify-run-queue",
3  "version": "1.0.0",
4  "type": "module",
5  "description": "",
6  "dependencies": {
7    "apify": "^3.2.6",
8    "apify-client": "^2.12.2",
9    "zod": "^3.24.3"
10  },
11  "devDependencies": {
12    "@apify/tsconfig": "^0.1.0",
13    "@types/node": "^20.0.0",
14    "tsx": "^4.4.0",
15    "typescript": "~5.6.0"
16  },
17  "scripts": {
18    "start": "node dist/main.js",
19    "dev": "tsx src/main.ts",
20    "build": "tsc",
21    "test": "echo \"Error: oops, the actor has no tests yet, sad!\" && exit 1"
22  },
23  "author": "",
24  "license": "ISC"
25}

tsconfig.json

1{
2  "extends": "@apify/tsconfig",
3  "compilerOptions": {
4    "module": "ESNext",
5    "target": "ESNext",
6    "moduleResolution": "node",
7    "outDir": "dist",
8    "lib": ["ESNext", "DOM"],
9    "strict": true,
10    "esModuleInterop": true,
11    "resolveJsonModule": true,
12    "skipLibCheck": true
13  },
14  "include": ["./src/**/*"]
15}

.actor/actor.json

1{
2  "actorSpecification": 1,
3  "name": "apify-run-queue",
4  "version": "0.0",
5  "buildTag": "latest",
6  "environmentVariables": {},
7  "input": "./input.json",
8  "defaultMemory": 1024,
9  "storages": {
10    "dataset": "./output.json"
11  }
12}

.actor/input.json

1{
2  "title": "Input schema for Apify Run Queue",
3  "type": "object",
4  "schemaVersion": 1,
5  "properties": {
6    "actorCalls": {
7      "title": "Actor Calls",
8      "type": "array",
9      "description": "The actor calls to run",
10      "editor": "json",
11      "prefill": [
12        {
13          "actorId": "apify/website-content-crawler",
14          "input": {
15            "startUrls": [
16              {
17                "url": "https://www.apify.com"
18              }
19            ]
20          }
21        }
22      ]
23    }
24  },
25  "required": ["actorCalls"]
26}

.actor/output.json

1{
2  "actorSpecification": 1,
3  "views": {
4    "overview": {
5      "title": "Overview",
6      "transformation": {
7        "fields": [
8          "success",
9          "actorId",
10          "invocationId",
11          "runId",
12          "retries",
13          "error"
14        ]
15      },
16      "display": {
17        "component": "table",
18        "properties": {
19          "success": {
20            "label": "Success",
21            "format": "boolean"
22          },
23          "actorId": {
24            "label": "Actor Call ID",
25            "format": "text"
26          },
27          "invocationId": {
28            "label": "Invocation ID",
29            "format": "text"
30          },
31          "runId": {
32            "label": "Run ID",
33            "format": "text"
34          },
35          "retries": {
36            "label": "Retries",
37            "format": "number"
38          },
39          "error": {
40            "label": "Error",
41            "format": "text"
42          }
43        }
44      }
45    }
46  }
47}

src/main.ts

1import { Actor, ApifyClient } from 'apify';
2import { z } from 'zod';
3
4const inputSchema = z.object({
5  actorCalls: z.array(
6    z.object({
7      invocationId: z.string().optional().nullable().default(null),
8      actorId: z.string(),
9      input: z.record(z.any()),
10      timeoutSecs: z.number().optional(),
11      webhooks: z.array(z.any()).optional().default([]),
12      maxItems: z.number().optional(),
13      memory: z.number().optional(),
14      build: z.string().optional(),
15    })
16  ),
17  throttleNotificationWebhook: z.string().optional(),
18  retryDelaySecs: z.number().optional().default(30),
19  webhooks: z.array(z.any()).optional().default([]),
20});
21
22const MEMORY_LIMIT_MESSAGE =
23  'By launching this job you will exceed the memory limit of';
24
25Actor.main(async () => {
26  const input = await Actor.getInput();
27  const parsedInput = inputSchema.parse(input);
28  const apify = new ApifyClient({
29    token: process.env.APIFY_TOKEN,
30  });
31
32  const actorCalls = parsedInput.actorCalls;
33  const throttleNotificationWebhook = parsedInput.throttleNotificationWebhook;
34
35  let retries = 0;
36  for (let i = 0; i < actorCalls.length; ) {
37    const actorCall = actorCalls[i];
38    const actor = await apify.actor(actorCall.actorId);
39    const actorInfo = await actor.get();
40
41    if (!actorInfo) {
42      throw new Error(`Actor ${actorCall.actorId} not found`);
43    }
44
45    try {
46      const run = await actor.start(actorCall.input, {
47        timeout: actorCall.timeoutSecs,
48        maxItems: actorCall.maxItems,
49        memory: actorCall.memory,
50        build: actorCall.build,
51        webhooks: [...parsedInput.webhooks, ...actorCall.webhooks],
52      });
53
54      await Actor.pushData({
55        invocationId: actorCall.invocationId,
56        actorId: actorCall.actorId,
57        success: true,
58        runId: run.id,
59        retries,
60        error: null,
61      });
62
63      i += 1;
64      retries = 0;
65    } catch (error) {
66      if (
67        error instanceof Error &&
68        error.message.includes(MEMORY_LIMIT_MESSAGE)
69      ) {
70        const errorMessage =
71          retries > 0
72            ? `Memory limit exceeded when calling ${actorCall.actorId}, retries: ${retries}`
73            : `Memory limit exceeded when calling ${actorCall.actorId}`;
74
75        console.log(errorMessage);
76        if (throttleNotificationWebhook) {
77          await sendThrottleNotification({
78            webhookUrl: throttleNotificationWebhook,
79            actorId: actorInfo.id,
80            invocationId: actorCall.invocationId,
81          });
82        }
83
84        await new Promise((resolve) =>
85          setTimeout(resolve, parsedInput.retryDelaySecs * 1000)
86        );
87        retries += 1;
88      } else {
89        await Actor.pushData({
90          invocationId: actorCall.invocationId,
91          actorId: actorCall.actorId,
92          success: false,
93          runId: null,
94          retries,
95          error: error instanceof Error ? error.message : 'Unknown error',
96        });
97        i += 1;
98        retries = 0;
99      }
100    }
101  }
102});
103
104type ThrottleNotificationParams = {
105  webhookUrl: string;
106  actorId: string;
107  invocationId: string | null;
108};
109
110function sendThrottleNotification({
111  webhookUrl,
112  actorId,
113  invocationId,
114}: ThrottleNotificationParams) {
115  console.log(`Sending throttle notification for ${actorId}`);
116  fetch(webhookUrl, {
117    method: 'POST',
118    body: JSON.stringify({
119      createdAt: new Date().toISOString(),
120      eventType: 'ACTOR.RUN.THROTTLED',
121      eventData: {
122        actorId,
123      },
124      resource: {
125        invocationId,
126        actId: actorId,
127      },
128      status: 'THROTTLED',
129    }),
130  }).catch((err) => {
131    console.error(
132      `Failed to send throttle notification webhook for ${actorId}: ${err}`
133    );
134  });
135}