1import { createHash } from 'crypto'
2import os from "os"
3import path from "path"
4
5import fs from "fs"
6import pg from "pg"
7import pgConnectionString from 'pg-connection-string'
8import { config } from 'dotenv'
9import findConfig from "find-config"
10import { Client as ElasticClient } from "@elastic/elasticsearch"
11import filenamify from 'filenamify'
12import { Configuration, Dataset } from 'crawlee'
13import { MemoryStorage } from '@crawlee/memory-storage'
14
15config({ path: findConfig(`.env`) })
16
17const elasticIndexName = `actors-monorepo-shops`
18
19const globalLogsProps = {
20 __NODE_STARTED: new Date().toISOString(),
21}
22
23let actorName
24let pgClient
25let pgClientNormalized
26let elasticClient
27export async function init ({ actorNameOverride }, restInput) {
28 parseEnvFromInput(restInput)
29
30 if (os.platform() === `darwin`) {
31 const filePath = process.argv[1]
32 const basename = path.basename(filePath)
33 actorName = actorNameOverride ?? basename.split(`.`)[0]
34 const gitBranch = fs.readFileSync(path.join(process.cwd(), `..`, `.git/HEAD`), `utf8`)
35 .split(` `)[1]
36 .trim()
37 .replace(`refs/heads/`, ``)
38 const gitCommit = fs.readFileSync(path.join(process.cwd(), `..`, `.git/refs/heads/${gitBranch}`), `utf8`)
39 const gitCommitShort = gitCommit.substring(0, 7)
40 globalLogsProps.__GIT_COMMIT = gitCommitShort
41 }
42
43 if (process.env.APIFY_USE_MEMORY_REQUEST_QUEUE === `true`) {
44 Configuration.getGlobalConfig().useStorageClient(new MemoryStorage())
45 }
46
47 if (process.env.APIFY_IS_AT_HOME) {
48 actorName = actorNameOverride ?? process.env.APIFY_ACTOR_ID
49 }
50
51
52
53 if (process.env.ELASTIC_CLOUD_ID) {
54 elasticClient = new ElasticClient({
55 cloud: { id: process.env.ELASTIC_CLOUD_ID },
56 auth: { apiKey: process.env.ELASTIC_CLOUD_API_KEY },
57 })
58
59
60
61
62 async function enforceIndexMapping () {
63 const doesIndexExist = await elasticClient.indices.exists({ index: elasticIndexName })
64 if (!doesIndexExist) await elasticClient.indices.create({ index: elasticIndexName })
65 await elasticClient.indices.putMapping({
66 index: elasticIndexName,
67 body: {
68 properties: {
69 _discount: { type: `float` },
70 originalPrice: { type: `float` },
71 currentPrice: { type: `float` },
72 },
73 },
74 })
75 }
76
77 try {
78 await enforceIndexMapping()
79 } catch (err) {
80 if (err.message.includes(`cannot be changed from type`)) {
81 console.log(`Elastic index ${elasticIndexName} already exists with incorrect mappings. As existing mapping cannot be changed, index will be deleted and recreated.`)
82 await elasticClient.indices.delete({ index: elasticIndexName })
83 await enforceIndexMapping()
84 }
85 }
86 }
87
88
89
90 if (process.env.PG_CONNECTION_STRING) {
91 const pgConfig = pgConnectionString(process.env.PG_CONNECTION_STRING)
92
93
94 pgClient = new pg.Client(pgConfig)
95 await pgClient.connect()
96
97
98 const { rows: tables } = await pgClient.query(`
99 SELECT table_name
100 FROM information_schema.tables
101 WHERE table_schema = 'public'
102 `)
103
104
105 const tableExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_TABLE)
106 if (!tableExists) {
107 throw new Error(`Table ${process.env.PG_DATA_TABLE} does not exist in database ${pgConfig.database}`)
108 }
109
110
111 }
112
113 if (process.env.PG_CONNECTION_STRING_NORMALIZED) {
114 const pgConfig = pgConnectionString(process.env.PG_CONNECTION_STRING_NORMALIZED)
115
116 pgClientNormalized = new pg.Client(pgConfig)
117 await pgClientNormalized.connect()
118
119
120 const { rows: tables } = await pgClientNormalized.query(`
121 SELECT table_name
122 FROM information_schema.tables
123 WHERE table_schema = 'public'
124 `)
125
126
127 const tableMainExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_TABLE)
128
129 const tablePricesExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_PRICE_TABLE)
130 if (!tableMainExists) throw new Error(`Table ${process.env.PG_DATA_TABLE} does not exist in database ${pgConfig.database}`)
131 if (!tablePricesExists) throw new Error(`Table ${process.env.PG_DATA_PRICE_TABLE} does not exist in database ${pgConfig.database}`)
132
133
134 }
135}
136
137
138
139export const createUniqueKeyFromUrl = (url) => {
140 const hash = createHash(`sha256`)
141 const cleanUrl = url.split(`://`)[1]
142 hash.update(cleanUrl)
143 return hash.digest(`hex`)
144}
145
146
147
148
149
150
151export const sleepUntil = async (datetime) => {
152 const now = new Date()
153 const difference = datetime - now
154 if (difference > 0) {
155 return new Promise((resolve) => {
156 setTimeout(resolve, difference)
157 })
158 }
159 return Promise.resolve()
160}
161
162
163export function parsePrice (string) {
164 let amount, currency
165 const noText = string.replace(/[^\d,.]/g, ``)
166 const decimals = noText.match(/([,.])(\d{2})$/)
167 if (decimals) {
168 const decimalSeparator = decimals[1]
169
170 const decimalAmount = decimals[2]
171 const mainAmount = noText.split(decimalSeparator)[0].replace(/\D/g, ``)
172 amount = parseFloat(mainAmount + `.` + decimalAmount)
173 } else {
174 const justNumbers = noText.replace(/[,.]/g, ``)
175 amount = parseInt(justNumbers)
176 }
177 return { amount, currency }
178}
179
180export function toNumberOrNull (str) {
181
182 if (str === undefined) return null
183 if (str === null) return null
184 if (str === ``) return null
185 const num = Number(str)
186 if (Number.isNaN(num)) return null
187 return num
188}
189
190export async function save (objs) {
191 if (!Array.isArray(objs)) objs = [objs]
192 if (objs.length === 0) return console.log(`No data to save.`)
193
194 const objsExtended = await Promise.all(objs.map(async (obj) => {
195 const objExtended = {
196 ...obj,
197 actorName,
198 ...globalLogsProps,
199
200
201 }
202
203 if (process.env.APIFY_IS_AT_HOME) {
204 objExtended.__APIFY_ACTOR_ID = process.env.APIFY_ACTOR_ID
205 objExtended.__APIFY_ACTOR_RUN_ID = process.env.APIFY_ACTOR_RUN_ID
206 objExtended.__APIFY_ACTOR_BUILD_ID = process.env.APIFY_ACTOR_BUILD_ID
207 objExtended.__APIFY_ACTOR_BUILD_NUMBER = process.env.APIFY_ACTOR_BUILD_NUMBER
208 objExtended.__APIFY_ACTOR_TASK_ID = process.env.APIFY_ACTOR_TASK_ID
209 if (process.env.APIFY_DONT_STORE_IN_DATASET !== `true`) {
210 await Dataset.pushData(obj)
211 }
212 }
213 return objExtended
214 }))
215
216
217 if (os.platform() === `darwin`) {
218 const cwd = process.cwd()
219 const storageDir = path.join(cwd, `${actorName}.storage`)
220 if (!fs.existsSync(storageDir)) fs.mkdirSync(storageDir)
221 const dataDir = path.join(storageDir, `data`)
222 if (!fs.existsSync(dataDir)) fs.mkdirSync(dataDir)
223 for (const objExtended of objsExtended) {
224 const id = String(objExtended.id ?? objExtended.pid)
225 const fileName = `${filenamify(id)}.json`
226 const dataFilePath = path.join(dataDir, fileName)
227 fs.writeFileSync(dataFilePath, JSON.stringify(objExtended, null, 2))
228 }
229 }
230
231 if (pgClient) {
232 const objsPg = objs.map((obj) => ({
233 ...obj,
234
235 shop: actorName,
236 scrapedAt: new Date().toISOString().split(`T`)[0],
237 }))
238
239 const columns = getColumns(objsPg)
240 const values = getValues(objsPg)
241 const queryString = `
242 INSERT INTO public."${process.env.PG_DATA_TABLE}" (${columns})
243 VALUES (${values})
244 `
245 try {
246 const { rowCount } = await pgClient.query(queryString)
247 console.log(`[save] saved to database: ${JSON.stringify(rowCount)}`)
248 } catch (err) {
249 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
250 else throw err
251 }
252 }
253
254
255 if (pgClientNormalized) {
256 const objsPgData = objs.map((obj) => ({
257 shop: actorName,
258 pid: obj.pid,
259 name: obj.name,
260 url: obj.url,
261 img: obj.img,
262 }))
263
264 const objsPgDataPrice = objs.map((obj) => ({
265 shop: actorName,
266 pid: obj.pid,
267 scrapedAt: new Date().toISOString().split(`T`)[0],
268 currentPrice: obj.currentPrice,
269 originalPrice: obj.originalPrice,
270 inStock: obj.inStock,
271 }))
272
273 const queryString = `
274 INSERT INTO public."${process.env.PG_DATA_TABLE}" (${getColumns(objsPgData)})
275 VALUES (${getValues(objsPgData)})
276 ON CONFLICT DO NOTHING
277 `
278 try {
279 const { rowCount } = await pgClientNormalized.query(queryString)
280 console.log(`[save] saved to database (data): ${JSON.stringify(rowCount)}`)
281 } catch (err) {
282 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
283 else throw err
284 }
285
286 const queryStringPrice = `
287 INSERT INTO public."${process.env.PG_DATA_PRICE_TABLE}" (${getColumns(objsPgDataPrice)})
288 VALUES (${getValues(objsPgDataPrice)})
289 ON CONFLICT DO NOTHING
290 `
291 try {
292 const { rowCount } = await pgClientNormalized.query(queryStringPrice)
293 console.log(`[save] saved to database (price): ${JSON.stringify(rowCount)}`)
294 } catch (err) {
295 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
296 else throw err
297 }
298 }
299
300 if (elasticClient) {
301
302
303
304
305
306
307
308
309
310
311
312
313
314 }
315}
316
317function getColumns (objs) {
318 return Object.keys(objs[0]).map((key) => `"${key}"`).join(`, `)
319}
320
321function getValues (objs) {
322 return objs.map(objPg => Object.values(objPg).map((value) => {
323
324 if (typeof value === `string`) return `'${value.replace(/'/g, `''`)}'`
325
326 if (typeof value === `undefined` || value === null) return `NULL`
327 return value
328 }).join(`, `)).join(`), (`)
329}
330
331export function parseEnvFromInput (input) {
332 const env = {}
333 for (const key in input) {
334 if (key === key.toUpperCase()) env[key] = input[key]
335 }
336 console.log(`[parseEnvFromInput] ${JSON.stringify(env)}`)
337 Object.assign(process.env, env)
338}
339
340export const isInspect =
341 process.execArgv.join().includes(`--inspect`) ||
342
343 process?._preload_modules?.join(`|`)?.includes(`debug`)