1import { Actor } from 'apify';
2import { evalFunctionOrThrow, diff, fetchRuns, fetchTaskRuns, fetchActorRuns } from "./utils.js";
3
4await Actor.init();
5const client = Actor.newClient();
6
7const { runIds, actorIds, taskIds, transformInputSource } = await Actor.getInput();
8
9// PREP & VALIDATION
10/**
11 * @param {object} context
12 * @param {object} context.input
13 * @param {object} context.options
14 */
15const transformInput = evalFunctionOrThrow(transformInputSource);
16
17if (!(runIds.length || actorIds.length || taskIds.length)) {
18 throw new Error('Did not receive any run IDs, actor IDs or task IDs. Aborting.');
19}
20
21// FETCH RUNNING RUNS
22const collectedRuns = [];
23
24if (runIds.length) {
25 const runs = await fetchRuns(client, runIds);
26 console.log(`Fetched ${runs.length} runs based on their IDs.`);
27 collectedRuns.push(...runs);
28}
29
30if (actorIds.length) {
31 const runs = await fetchActorRuns(client, actorIds)
32 collectedRuns.push(...runs);
33}
34
35if (taskIds.length) {
36 const runs = await fetchTaskRuns(client, taskIds)
37 collectedRuns.push(...runs);
38}
39
40// TRANSFORM, ABORT, RESURRECT
41const processedRunPromises = collectedRuns.map(async (run) => {
42 const runClient = client.run(run.id);
43 const kvsClient = runClient.keyValueStore();
44
45 console.log(`[${run.id}] Fetching input.`);
46 const { value: input } = await kvsClient.getRecord('INPUT');
47 const newInput = JSON.parse(JSON.stringify(input));
48 // The run object contains the options in a different
49 // format than the API expects them in resurrect.
50 const apiOptions = {
51 build: run.options.build,
52 memory: run.options.memoryMbytes,
53 timeout: run.options.timeoutSecs,
54 }
55 const newOptions = { ...apiOptions };
56
57 console.log(`[${run.id}] Transforming input with transformInput function.`);
58 transformInput({
59 input: newInput,
60 options: newOptions,
61 })
62 const inputDiff = diff(input, newInput);
63 if (inputDiff) {
64 console.log(`[${run.id}] Changed input:`, inputDiff);
65 }
66 const optionsDiff = diff(apiOptions, newOptions);
67 if (optionsDiff) {
68 console.log(`[${run.id}] Changed options:`, optionsDiff);
69 }
70
71 console.log(`[${run.id}] Aborting run.`)
72 await runClient.abort({ gracefully: true });
73 // Graceful abort can take some time.
74 await runClient.waitForFinish();
75
76 console.log(`[${run.id}] Saving new input to key-value store.`)
77 await kvsClient.setRecord({
78 key: 'INPUT',
79 value: newInput,
80 })
81
82 console.log(`[${run.id}] Resurrecting run.`)
83 const resurrectedRun = await runClient.resurrect(newOptions);
84
85 console.log(`[${run.id}] Saving input and run information to dataset.`)
86 await Actor.pushData({
87 input,
88 newInput,
89 abortedRun: run,
90 resurrectedRun,
91 })
92})
93
94const processedRuns = await Promise.all(processedRunPromises);
95console.log(`Aborted and resurrected ${processedRuns.length} runs. The end.`);
96await Actor.exit();