1import { createHash } from 'crypto'
2import os from "os"
3import path from "path"
4
5import fs from "fs"
6import pg from "pg"
7import pgConnectionString from 'pg-connection-string'
8import { config } from 'dotenv'
9import findConfig from "find-config"
10import { Client as ElasticClient } from "@elastic/elasticsearch"
11import filenamify from 'filenamify'
12import { Configuration, Dataset } from 'crawlee'
13import { MemoryStorage } from '@crawlee/memory-storage'
14
15config({ path: findConfig(`.env`) })
16
17const elasticIndexName = `actors-monorepo-shops`
18
19const globalLogsProps = {
20 __NODE_STARTED: new Date().toISOString(),
21}
22
23let actorName
24let pgClient
25let pgClientNormalized
26let elasticClient
27export async function init ({ actorNameOverride }, restInput) {
28 parseEnvFromInput(restInput)
29
30 if (os.platform() === `darwin`) {
31 const filePath = process.argv[1]
32 const basename = path.basename(filePath)
33 actorName = actorNameOverride ?? basename.split(`.`)[0]
34 const gitBranch = fs.readFileSync(path.join(process.cwd(), `..`, `.git/HEAD`), `utf8`)
35 .split(` `)[1]
36 .trim()
37 .replace(`refs/heads/`, ``)
38 const gitCommit = fs.readFileSync(path.join(process.cwd(), `..`, `.git/refs/heads/${gitBranch}`), `utf8`)
39 const gitCommitShort = gitCommit.substring(0, 7)
40 globalLogsProps.__GIT_COMMIT = gitCommitShort
41 }
42
43 if (process.env.APIFY_USE_MEMORY_REQUEST_QUEUE === `true`) {
44 Configuration.getGlobalConfig().useStorageClient(new MemoryStorage())
45 }
46
47 if (process.env.APIFY_IS_AT_HOME) {
48 actorName = actorNameOverride ?? process.env.APIFY_ACTOR_ID
49 }
50
51
52
53 if (process.env.ELASTIC_CLOUD_ID) {
54 elasticClient = new ElasticClient({
55 cloud: { id: process.env.ELASTIC_CLOUD_ID },
56 auth: { apiKey: process.env.ELASTIC_CLOUD_API_KEY },
57 })
58
59
60
61
62 async function enforceIndexMapping () {
63 const doesIndexExist = await elasticClient.indices.exists({ index: elasticIndexName })
64 if (!doesIndexExist) await elasticClient.indices.create({ index: elasticIndexName })
65 await elasticClient.indices.putMapping({
66 index: elasticIndexName,
67 body: {
68 properties: {
69 _discount: { type: `float` },
70 originalPrice: { type: `float` },
71 currentPrice: { type: `float` },
72 },
73 },
74 })
75 }
76
77 try {
78 await enforceIndexMapping()
79 } catch (err) {
80 if (err.message.includes(`cannot be changed from type`)) {
81 console.log(`Elastic index ${elasticIndexName} already exists with incorrect mappings. As existing mapping cannot be changed, index will be deleted and recreated.`)
82 await elasticClient.indices.delete({ index: elasticIndexName })
83 await enforceIndexMapping()
84 }
85 }
86 }
87
88
89
90 if (process.env.PG_CONNECTION_STRING) {
91 const pgConfig = pgConnectionString(process.env.PG_CONNECTION_STRING)
92
93
94 pgClient = new pg.Client(pgConfig)
95 await pgClient.connect()
96
97
98 const { rows: tables } = await pgClient.query(`
99 SELECT table_name
100 FROM information_schema.tables
101 WHERE table_schema = 'public'
102 `)
103
104
105 const tableExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_TABLE)
106 if (!tableExists) {
107 throw new Error(`Table ${process.env.PG_DATA_TABLE} does not exist in database ${pgConfig.database}`)
108 }
109
110
111 }
112
113 if (process.env.PG_CONNECTION_STRING_NORMALIZED) {
114 const pgConfig = pgConnectionString(process.env.PG_CONNECTION_STRING_NORMALIZED)
115
116 pgClientNormalized = new pg.Client(pgConfig)
117 await pgClientNormalized.connect()
118
119
120 const { rows: tables } = await pgClientNormalized.query(`
121 SELECT table_name
122 FROM information_schema.tables
123 WHERE table_schema = 'public'
124 `)
125
126
127 const tableMainExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_TABLE)
128
129 const tablePricesExists = tables.some(({ table_name }) => table_name === process.env.PG_DATA_PRICE_TABLE)
130 if (!tableMainExists) throw new Error(`Table ${process.env.PG_DATA_TABLE} does not exist in database ${pgConfig.database}`)
131 if (!tablePricesExists) throw new Error(`Table ${process.env.PG_DATA_PRICE_TABLE} does not exist in database ${pgConfig.database}`)
132
133
134 }
135}
136
137
138
139export const createUniqueKeyFromUrl = (url) => {
140 const hash = createHash(`sha256`)
141 const cleanUrl = url.split(`://`)[1]
142 hash.update(cleanUrl)
143 return hash.digest(`hex`)
144}
145
146
147
148
149
150
151export const sleepUntil = async (datetime) => {
152 const now = new Date()
153 const difference = datetime - now
154 if (difference > 0) {
155 return new Promise((resolve) => {
156 setTimeout(resolve, difference)
157 })
158 }
159 return Promise.resolve()
160}
161
162
163export function parsePrice (string) {
164 let amount, currency
165 const noText = string.replace(/[^\d,.]/g, ``)
166 const decimals = noText.match(/([,.])(\d{2})$/)
167 if (decimals) {
168 const decimalSeparator = decimals[1]
169
170 const decimalAmount = decimals[2]
171 const mainAmount = noText.split(decimalSeparator)[0].replace(/\D/g, ``)
172 amount = parseFloat(mainAmount + `.` + decimalAmount)
173 } else {
174 const justNumbers = noText.replace(/[,.]/g, ``)
175 amount = parseInt(justNumbers)
176 }
177 return { amount, currency }
178}
179
180export function toNumberOrNull (str) {
181
182 if (str === undefined) return null
183 if (str === null) return null
184 if (str === ``) return null
185 const num = Number(str)
186 if (Number.isNaN(num)) return null
187 return num
188}
189
190export async function save (objs) {
191 if (!Array.isArray(objs)) objs = [objs]
192 if (objs.length === 0) return console.log(`No data to save.`)
193
194 const objsExtended = objs.map(async (obj) => {
195 const objExtended = {
196 ...obj,
197 actorName,
198 ...globalLogsProps,
199
200
201 }
202
203 if (process.env.APIFY_IS_AT_HOME) {
204 objExtended.__APIFY_ACTOR_ID = process.env.APIFY_ACTOR_ID
205 objExtended.__APIFY_ACTOR_RUN_ID = process.env.APIFY_ACTOR_RUN_ID
206 objExtended.__APIFY_ACTOR_BUILD_ID = process.env.APIFY_ACTOR_BUILD_ID
207 objExtended.__APIFY_ACTOR_BUILD_NUMBER = process.env.APIFY_ACTOR_BUILD_NUMBER
208 objExtended.__APIFY_ACTOR_TASK_ID = process.env.APIFY_ACTOR_TASK_ID
209 if (process.env.APIFY_DONT_STORE_IN_DATASET !== `true`) {
210 await Dataset.pushData(obj)
211 }
212 }
213 return objExtended
214 })
215
216 if (os.platform() === `darwin`) {
217 const cwd = process.cwd()
218 const storageDir = path.join(cwd, `${actorName}.storage`)
219 if (!fs.existsSync(storageDir)) fs.mkdirSync(storageDir)
220 const dataDir = path.join(storageDir, `data`)
221 if (!fs.existsSync(dataDir)) fs.mkdirSync(dataDir)
222 for (const objExtended of objsExtended) {
223 const id = String(objExtended.id ?? objExtended.pid)
224 const fileName = `${filenamify(id)}.json`
225 const dataFilePath = path.join(dataDir, fileName)
226 fs.writeFileSync(dataFilePath, JSON.stringify(objExtended, null, 2))
227 }
228 }
229
230 if (pgClient) {
231 const objsPg = objs.map((obj) => ({
232 ...obj,
233
234 shop: actorName,
235 scrapedAt: new Date().toISOString().split(`T`)[0],
236 }))
237
238 const columns = getColumns(objsPg)
239 const values = getValues(objsPg)
240 const queryString = `
241 INSERT INTO public."${process.env.PG_DATA_TABLE}" (${columns})
242 VALUES (${values})
243 `
244 try {
245 const { rowCount } = await pgClient.query(queryString)
246 console.log(`[save] saved to database: ${JSON.stringify(rowCount)}`)
247 } catch (err) {
248 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
249 else throw err
250 }
251 }
252
253
254 if (pgClientNormalized) {
255 const objsPgData = objs.map((obj) => ({
256 shop: actorName,
257 pid: obj.pid,
258 name: obj.name,
259 url: obj.url,
260 img: obj.img,
261 }))
262
263 const objsPgDataPrice = objs.map((obj) => ({
264 shop: actorName,
265 pid: obj.pid,
266 scrapedAt: new Date().toISOString().split(`T`)[0],
267 currentPrice: obj.currentPrice,
268 originalPrice: obj.originalPrice,
269 inStock: obj.inStock,
270 }))
271
272 const queryString = `
273 INSERT INTO public."${process.env.PG_DATA_TABLE}" (${getColumns(objsPgData)})
274 VALUES (${getValues(objsPgData)})
275 ON CONFLICT DO NOTHING
276 `
277 try {
278 const { rowCount } = await pgClientNormalized.query(queryString)
279 console.log(`[save] saved to database (data): ${JSON.stringify(rowCount)}`)
280 } catch (err) {
281 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
282 else throw err
283 }
284
285 const queryStringPrice = `
286 INSERT INTO public."${process.env.PG_DATA_PRICE_TABLE}" (${getColumns(objsPgDataPrice)})
287 VALUES (${getValues(objsPgDataPrice)})
288 ON CONFLICT DO NOTHING
289 `
290 try {
291 const { rowCount } = await pgClientNormalized.query(queryStringPrice)
292 console.log(`[save] saved to database (price): ${JSON.stringify(rowCount)}`)
293 } catch (err) {
294 if (err.message.includes(`violates unique constraint`)) console.warn(`PostgresSQL: violates unique constraint`)
295 else throw err
296 }
297 }
298
299 if (elasticClient) {
300
301
302
303
304
305
306
307
308
309
310
311
312
313 }
314}
315
316function getColumns (objs) {
317 return Object.keys(objs[0]).map((key) => `"${key}"`).join(`, `)
318}
319
320function getValues (objs) {
321 return objs.map(objPg => Object.values(objPg).map((value) => {
322
323 if (typeof value === `string`) return `'${value.replace(/'/g, `''`)}'`
324
325 if (typeof value === `undefined` || value === null) return `NULL`
326 return value
327 }).join(`, `)).join(`), (`)
328}
329
330export function parseEnvFromInput (input) {
331 const env = {}
332 for (const key in input) {
333 if (key === key.toUpperCase()) env[key] = input[key]
334 }
335 console.log(`[parseEnvFromInput] ${JSON.stringify(env)}`)
336 Object.assign(process.env, env)
337}
338
339export const isInspect =
340 process.execArgv.join().includes(`--inspect`) ||
341
342 process?._preload_modules?.join(`|`)?.includes(`debug`)