Big Query
Pricing
Pay per usage
Go to Store
Big Query
Append a CSV file to a Google bigQuery table. Create a "Service credentials" at the https://console.cloud.google.com/, copy & paste the JSON file into the variable value for 'CREDENTIALS' and set it as a secret.
0.0 (0)
Pricing
Pay per usage
3
Total users
25
Monthly users
2
Runs succeeded
0%
Last modified
3 years ago
Dockerfile
# This is a template for a Dockerfile used to run acts in Actor system.# The base image name below is set during the act build, based on user settings.# IMPORTANT: The base image must set a correct working directory, such as /usr/src/app or /home/userFROM apify/actor-node-basic:v0.21.10
# Second, copy just package.json and package-lock.json since it should be# the only file that affects "npm install" in the next step, to speed up the buildCOPY package*.json ./
# Install NPM packages, skip optional and development dependencies to# keep the image small. Avoid logging too much and print the dependency# tree for debuggingRUN npm --quiet set progress=false \ && npm install --only=prod --no-optional \ && echo "Installed NPM packages:" \ && (npm list --all || true) \ && echo "Node.js version:" \ && node --version \ && echo "NPM version:" \ && npm --version
# Copy source code to container# Do this in the last step, to have fast build if only the source code changedCOPY . ./
# NOTE: The CMD is already defined by the base image.# Uncomment this for local node inspector debugging:# CMD [ "node", "--inspect=0.0.0.0:9229", "main.js" ]
package.json
{ "name": "apify-project", "version": "0.0.1", "description": "", "author": "It's not you it's me", "license": "ISC", "dependencies": { "apify": "0.21.10", "json2csv": "latest", "@google-cloud/bigquery": "latest" }, "scripts": { "start": "node main.js" }}
main.js
1// Source file for Hosted source in 'Source type'2const fs = require('fs');3const Apify = require('apify');4const json2csv = require('json2csv'); // eslint-disable-line5const BigQuery = require('@google-cloud/bigquery'); // eslint-disable-line6
7const { log, error } = console;8
9let isStoreIdSet = false;10async function storeOrGetResults(key, items = [], filterKey) {11 if (!isStoreIdSet || !key) {12 throw new Error(`Error while storing or getting results. Missing ${key ?13 'storeId in store' : 'key value'}.`);14 }15
16 const { keyValueStores } = Apify.client;17 const record = await keyValueStores.getRecord({ key });18 const storeRecord = record && record.body ? record.body : [];19 let previous = typeof storeRecord === 'string' ? JSON.parse(storeRecord) : storeRecord;20
21 if (items.length === 0) {22 return { previous };23 }24
25 const current = items.slice();26 if (current.length && previous.length && filterKey) {27 const cache = current.reduce((object, item) => (28 Object.assign(object, { [item[filterKey]]: true })29 ), {});30 previous = previous.filter(item => !cache[item[filterKey]]);31 }32
33 const next = previous.concat(current);34 if (previous.length !== current.length) {35 await keyValueStores.putRecord({36 key,37 body: JSON.stringify(next)38 });39 } else {40 log('No state modifications required.');41 }42
43 log('Previous results:', previous.length);44 log('Current results:', current.length);45 log('Next results:', next.length);46 return { previous, current, next };47}48
49async function createDataset(datasetName, bigquery) {50 const [datasets] = await bigquery.getDatasets();51 const currentDataset = datasets.find(dataset => dataset.id === datasetName);52 if (currentDataset) {53 return currentDataset;54 }55 return bigquery56 .createDataset(datasetName)57 .then((results) => {58 const [dataset] = results;59 log(`Dataset ${dataset.id} created.`);60 return dataset;61 })62 .catch((err) => {63 error('Error while creating dataset:', err);64 });65}66
67async function getOrCreateTable(dataset, tableId) {68 const [tables] = await dataset.getTables();69 const currentTable = tables.find(({ id }) => id === tableId);70 if (currentTable) {71 log(`Found ${tableId} table.`);72 return currentTable;73 }74 return dataset75 .createTable(tableId)76 .then((results) => {77 const [table] = results;78 log(`Table ${table.id} created.`);79 return table;80 })81 .catch((err) => {82 error('Error while creating table:', err);83 });84}85async function uploadFile(table, filename) {86 let job;87 const config = {88 autodetect: true89 };90 return table91 .load(filename, config)92 .then((data) => {93 [job] = data;94 log(`Job ${job.id} started.`);95
96 // Wait for the job to finish97 return job;98 })99 .then((metadata) => {100 // Check the job's status for errors101 const errors = metadata.status && metadata.status.errors;102 if (errors && errors.length > 0) {103 throw errors;104 }105 })106 .then(() => {107 log(`Job ${job.id} completed.`);108 })109 .catch((err) => {110 error('Error while uploading file:', err);111 });112}113
114Apify.main(async () => {115 const input = await Apify.getValue('INPUT');116 const {117 tableIdAndStoreKey,118 datasetName,119 storeId120 } = input;121
122 if (!(datasetName || tableIdAndStoreKey || storeId)) {123 throw new Error('Received invalid input');124 }125
126 Apify.client.setOptions({ storeId });127 isStoreIdSet = true;128
129 const key = tableIdAndStoreKey.toUpperCase();130 const tableId = tableIdAndStoreKey.toLowerCase();131
132 log('Currently running for:', key);133 const { previous: previousData } = await storeOrGetResults(key);134 if (!previousData.length) {135 throw new Error(`The kv-store under the ${key} is empty.`);136 }137 log(`Items in ${key}:`, previousData.length);138
139 const credentials = process.env.CREDENTIALS;140 const { project_id: projectId } = JSON.parse(credentials);141 log('Project ID:', projectId);142
143 const keyFilename = '/credentials.json';144 try {145 await fs.writeFileSync(keyFilename, credentials);146 } catch (err) {147 throw new Error('Error while saving credentials.');148 }149
150 const bigquery = new BigQuery({ projectId, keyFilename });151
152 log('Getting or creating dataset...');153 const dataset = await createDataset(datasetName, bigquery);154 log('Dataset id:', dataset.id);155
156 log('Getting or creating table...');157 const table = await getOrCreateTable(dataset, tableId);158
159 let rows;160 const options = {161 format: 'json',162 gzip: true163 };164 try {165 [rows] = await table.getRows(options);166 } catch (err) {167 throw new Error('Error while extracting rows from table.');168 }169 log('BigQuery table current results:', rows.length);170
171 let memo;172 let data;173 if (tableId === 'users') {174 memo = rows.reduce((cache, { username }) => (175 Object.assign(cache, { [username]: true })176 ), {});177 data = previousData.filter(({ username }) => {178 if (memo[username] || !username) {179 return false;180 }181 memo[username] = true;182 return true;183 });184 } else {185 memo = rows.reduce((cache, { referrer }) => (186 Object.assign(cache, { [referrer]: true })187 ), {});188 data = previousData.filter(({ referrer }) => {189 if (memo[referrer] || !referrer) {190 return false;191 }192 memo[referrer] = true;193 return true;194 });195 }196
197 if (data.length === 0) {198 log('No new results to insert.');199 log('Done.');200 return null;201 }202
203 let csv;204 const [firstItem] = data;205 const pathToFile = `${tableId}.csv`;206 const fields = Object.keys(firstItem);207 try {208 csv = json2csv({ data, fields });209 } catch (err) {210 error('Error while converting JSON to CSV:', err);211 }212
213 try {214 await fs.writeFileSync(pathToFile, csv);215 } catch (err) {216 throw new Error('Error while saving CSV file:', err);217 }218 log(`File '${pathToFile}' saved.`);219
220 log(`Adding ${data.length} new results to the '${tableId}' table...`);221 await uploadFile(table, pathToFile);222
223 log('Done.');224 return null;225});