1
2
3import { Actor, log } from 'apify';
4import { OpenAIProvider, AnthropicProvider, GoogleProvider, getProvider } from './providers/index.js';
5import { Input, OutputItem, ValidatedInput } from './types.js';
6import { parseCustomPreprocessingFunction } from './utils.js';
7
8
9const RATE_LIMIT_PER_MINUTE = 500;
10const REQUEST_INTERVAL_MS = Math.ceil(60000 / RATE_LIMIT_PER_MINUTE);
11
12await Actor.init();
13
14
15function getNestedValue(obj: any, path: string): any {
16 return path.split('.').reduce((current, key) => current && current[key], obj);
17}
18
19
20function isEmpty(value: any): boolean {
21 if (value === undefined || value === null) return true;
22 if (typeof value === 'string') return value.trim() === '';
23 if (Array.isArray(value)) return value.length === 0;
24 if (typeof value === 'object') return Object.keys(value).length === 0;
25 return false;
26}
27
28
29function hasEmptyFields(promptStr: string, item: any): boolean {
30 const fieldMatches = promptStr.match(/\$\{([^}]+)\}/g) || [];
31 return fieldMatches.some((match) => {
32 const field = match.slice(2, -1).trim();
33 const value = getNestedValue(item, field);
34 return isEmpty(value);
35 });
36}
37
38
39function replacePlaceholders(promptStr: string, item: any): string {
40 return promptStr.replace(/\$\{([^}]+)\}/g, (_match, fieldName: string) => {
41 const value = getNestedValue(item, fieldName.trim());
42 return value !== undefined ? String(value) : '';
43 });
44}
45
46async function validateInput(): Promise<ValidatedInput> {
47 const input = await Actor.getInput() as Input;
48 if (!input) {
49 throw new Error('No input provided. Please provide the necessary input parameters.');
50 }
51
52 const {
53 llmProviderApiKey,
54 prompt,
55 model,
56 temperature,
57 maxTokens,
58 skipItemIfEmpty,
59 multipleColumns = false,
60 testPrompt = false,
61 testItemsCount = 3,
62 } = input;
63
64 const inputDatasetId = input?.inputDatasetId || input?.payload?.resource?.defaultDatasetId;
65
66 if (!inputDatasetId) {
67 throw new Error('No inputDatasetId provided. Please provide the necessary input parameters.');
68 }
69
70 const preprocessingFunction = parseCustomPreprocessingFunction(input.preprocessingFunction);
71
72 return {
73 inputDatasetId,
74 llmProviderApiKey,
75 prompt,
76 model,
77 temperature,
78 maxTokens,
79 skipItemIfEmpty: skipItemIfEmpty ?? false,
80 multipleColumns,
81 testPrompt,
82 testItemsCount,
83 preprocessingFunction,
84 };
85}
86
87async function fetchDatasetItems(inputDatasetId: string, testPrompt: boolean, testItemsCount: number): Promise<OutputItem[]> {
88 try {
89 const dataset = await Actor.apifyClient.dataset(inputDatasetId).get();
90 if (!dataset) {
91 throw new Error(`Dataset with ID ${inputDatasetId} does not exist`);
92 }
93
94 const inputDataset = await Actor.openDataset<OutputItem>(inputDatasetId, { forceCloud: true });
95 const { items: fetchedItems } = await inputDataset.getData();
96
97 if (testPrompt) {
98 const itemCount = Math.min(testItemsCount, fetchedItems.length);
99 const items = fetchedItems.slice(0, itemCount);
100 log.info(`Test mode enabled - processing ${itemCount} items out of ${fetchedItems.length}`);
101 return items;
102 }
103
104 log.info(`Fetched ${fetchedItems.length} items from the input dataset.`);
105 return fetchedItems;
106 } catch (error) {
107 if (error instanceof Error) {
108 log.error(`Error accessing dataset: ${error.message}`);
109 } else {
110 log.error('Error accessing dataset: Unknown error occurred');
111 }
112 throw error;
113 }
114}
115
116async function processItems(
117 items: OutputItem[],
118 providers: Record<string, OpenAIProvider | AnthropicProvider | GoogleProvider>,
119 config: ValidatedInput,
120): Promise<void> {
121 const temperatureNum = parseFloat(config.temperature);
122 const { preprocessingFunction } = config;
123
124 for (let i = 0; i < items.length; i++) {
125 const item = preprocessingFunction(items[i]);
126
127 try {
128 if (config.skipItemIfEmpty && hasEmptyFields(config.prompt, item)) {
129 log.info(`Skipping item ${i + 1} due to empty fields`);
130 continue;
131 }
132
133 const finalPrompt = replacePlaceholders(buildFinalPrompt(config.prompt, config.multipleColumns), item);
134 log.info(`Processing item ${i + 1}/${items.length}`, { prompt: finalPrompt });
135
136 const provider = getProvider(config.model);
137 const llmresponse = await providers[provider].call(
138 finalPrompt,
139 config.model,
140 temperatureNum,
141 config.maxTokens,
142 );
143
144 log.info(`Item ${i + 1} response:`, { response: llmresponse });
145
146 await handleItemResponse(item, llmresponse, config.multipleColumns, {
147 provider,
148 model: config.model,
149 temperature: temperatureNum,
150 maxTokens: config.maxTokens,
151 providers,
152 finalPrompt,
153 });
154
155 await new Promise((resolve) => setTimeout(resolve, REQUEST_INTERVAL_MS));
156 } catch (error) {
157 if (error instanceof Error) {
158 log.error(`Error processing item ${i + 1}: ${error.message}`);
159 } else {
160 log.error(`Error processing item ${i + 1}: Unknown error occurred`);
161 }
162 throw error;
163 }
164 }
165}
166
167async function handleItemResponse(
168 item: any,
169 llmresponse: string,
170 multipleColumns: boolean,
171 config: {
172 provider: string;
173 model: string;
174 temperature: number;
175 maxTokens: number;
176 providers: Record<string, OpenAIProvider | AnthropicProvider | GoogleProvider>;
177 finalPrompt: string;
178 },
179): Promise<void> {
180 if (multipleColumns) {
181 let parsedData: any;
182 let attemptsLeft = 2;
183 let currentResponse = llmresponse;
184 let success = false;
185
186 while (attemptsLeft >= 0) {
187 try {
188 parsedData = JSON.parse(currentResponse);
189 success = true;
190 break;
191 } catch (err) {
192 if (attemptsLeft > 0) {
193 log.warning(`Failed to parse JSON. Retrying...`);
194 const retryPrompt = `${config.finalPrompt}\n\nThe last response was not valid JSON. Please return valid JSON this time.`;
195 currentResponse = await config.providers[config.provider].call(
196 retryPrompt,
197 config.model,
198 config.temperature,
199 config.maxTokens,
200 );
201 attemptsLeft--;
202 } else {
203 log.error(`Failed to parse JSON after multiple attempts. Using raw response as single column.`);
204 break;
205 }
206 }
207 }
208
209 if (success && typeof parsedData === 'object' && parsedData !== null) {
210 const outputItem: Record<string, unknown> = { ...item };
211 for (const key of Object.keys(parsedData)) {
212 outputItem[key] = parsedData[key];
213 }
214 await Actor.pushData(outputItem);
215 } else {
216 const fallbackItem = { ...item, llmresponse: currentResponse };
217 await Actor.pushData(fallbackItem);
218 }
219 } else {
220 item.llmresponse = llmresponse;
221 await Actor.pushData(item);
222 }
223}
224
225function buildFinalPrompt(promptText: string, multipleColumns: boolean): string {
226 if (!multipleColumns) {
227 return promptText;
228 }
229
230 return `${promptText}
231
232Important: Return only a strict JSON object with the requested fields as keys. No extra text or explanations, no markdown, just JSON.`;
233}
234
235async function validateJsonFormat(testItem: any, config: {
236 providers: Record<string, OpenAIProvider | AnthropicProvider | GoogleProvider>;
237 model: string;
238 temperature: string;
239 maxTokens: number;
240 prompt: string;
241}): Promise<boolean> {
242 const provider = getProvider(config.model);
243 let finalPrompt = replacePlaceholders(buildFinalPrompt(config.prompt, true), testItem);
244
245 for (let attempt = 1; attempt <= 3; attempt++) {
246 try {
247 const testResponse = await config.providers[provider].call(
248 finalPrompt,
249 config.model,
250 parseFloat(config.temperature),
251 config.maxTokens,
252 );
253
254
255 if (!testResponse) {
256 log.error('Empty response received from the API');
257 await Actor.fail('Empty response received from the API');
258 return false;
259 }
260
261
262 try {
263 JSON.parse(testResponse);
264 return true;
265 } catch (jsonError) {
266 if (attempt < 3) {
267 log.warning(`JSON validation attempt ${attempt} failed. Retrying...`);
268 log.debug('Response that failed JSON parsing:', { response: testResponse });
269 finalPrompt = `${finalPrompt}\n\nThe last response was not valid JSON. Please return valid JSON this time.`;
270
271 } else {
272
273 log.error('JSON validation attempts exhausted. The prompt may not produce valid JSON.');
274 log.debug('Final response that failed JSON parsing:', { response: testResponse });
275 return false;
276 }
277 }
278 } catch (apiError: any) {
279
280 log.error('API call failed:', {
281 error: apiError.message,
282 type: apiError.type,
283 code: apiError.code,
284 param: apiError.param,
285 });
286
287
288 throw apiError;
289 }
290 }
291 return false;
292}
293
294async function run(): Promise<void> {
295 try {
296 const validatedInput = await validateInput();
297
298
299 const configDetails = {
300 datasetId: validatedInput.inputDatasetId,
301 model: validatedInput.model,
302 promptTemplate: validatedInput.prompt,
303 multipleColumns: validatedInput.multipleColumns,
304 };
305 log.info('Configuration details:', configDetails);
306
307 const items = await fetchDatasetItems(
308 validatedInput.inputDatasetId,
309 validatedInput.testPrompt,
310 validatedInput.testItemsCount,
311 );
312
313 const providers = {
314 openai: new OpenAIProvider(validatedInput.llmProviderApiKey),
315 anthropic: new AnthropicProvider(validatedInput.llmProviderApiKey),
316 google: new GoogleProvider(validatedInput.llmProviderApiKey),
317 };
318
319 if (items.length > 0 && validatedInput.multipleColumns) {
320 const firstItem = validatedInput.preprocessingFunction(items[0]);
321 const validationResult = await validateJsonFormat(firstItem, {
322 providers,
323 model: validatedInput.model,
324 temperature: validatedInput.temperature,
325 maxTokens: validatedInput.maxTokens,
326 prompt: validatedInput.prompt,
327 });
328
329 if (!validationResult) {
330 throw new Error('Failed to produce valid JSON after multiple attempts. Please adjust your prompt or disable multiple columns.');
331 }
332 }
333
334 await processItems(items, providers, validatedInput);
335
336 log.info('Actor finished successfully');
337 await Actor.exit();
338 } catch (error) {
339 if (error instanceof Error) {
340 log.error('Actor failed:', { error: error.message });
341 await Actor.fail(error.message);
342 } else {
343 log.error('Actor failed with unknown error');
344 await Actor.fail('Unknown error occurred');
345 }
346 }
347}
348
349await run();