Big Query
Try for free
No credit card required
Go to Store
Big Query
juansgaitan/big-query
Try for free
No credit card required
Append a CSV file to a Google bigQuery table. Create a "Service credentials" at the https://console.cloud.google.com/, copy & paste the JSON file into the variable value for 'CREDENTIALS' and set it as a secret.
Dockerfile
1# This is a template for a Dockerfile used to run acts in Actor system.
2# The base image name below is set during the act build, based on user settings.
3# IMPORTANT: The base image must set a correct working directory, such as /usr/src/app or /home/user
4FROM apify/actor-node-basic:v0.21.10
5
6# Second, copy just package.json and package-lock.json since it should be
7# the only file that affects "npm install" in the next step, to speed up the build
8COPY package*.json ./
9
10# Install NPM packages, skip optional and development dependencies to
11# keep the image small. Avoid logging too much and print the dependency
12# tree for debugging
13RUN npm --quiet set progress=false \
14 && npm install --only=prod --no-optional \
15 && echo "Installed NPM packages:" \
16 && (npm list --all || true) \
17 && echo "Node.js version:" \
18 && node --version \
19 && echo "NPM version:" \
20 && npm --version
21
22# Copy source code to container
23# Do this in the last step, to have fast build if only the source code changed
24COPY . ./
25
26# NOTE: The CMD is already defined by the base image.
27# Uncomment this for local node inspector debugging:
28# CMD [ "node", "--inspect=0.0.0.0:9229", "main.js" ]
package.json
1{
2 "name": "apify-project",
3 "version": "0.0.1",
4 "description": "",
5 "author": "It's not you it's me",
6 "license": "ISC",
7 "dependencies": {
8 "apify": "0.21.10",
9 "json2csv": "latest",
10 "@google-cloud/bigquery": "latest"
11 },
12 "scripts": {
13 "start": "node main.js"
14 }
15}
main.js
1// Source file for Hosted source in 'Source type'
2const fs = require('fs');
3const Apify = require('apify');
4const json2csv = require('json2csv'); // eslint-disable-line
5const BigQuery = require('@google-cloud/bigquery'); // eslint-disable-line
6
7const { log, error } = console;
8
9let isStoreIdSet = false;
10async function storeOrGetResults(key, items = [], filterKey) {
11 if (!isStoreIdSet || !key) {
12 throw new Error(`Error while storing or getting results. Missing ${key ?
13 'storeId in store' : 'key value'}.`);
14 }
15
16 const { keyValueStores } = Apify.client;
17 const record = await keyValueStores.getRecord({ key });
18 const storeRecord = record && record.body ? record.body : [];
19 let previous = typeof storeRecord === 'string' ? JSON.parse(storeRecord) : storeRecord;
20
21 if (items.length === 0) {
22 return { previous };
23 }
24
25 const current = items.slice();
26 if (current.length && previous.length && filterKey) {
27 const cache = current.reduce((object, item) => (
28 Object.assign(object, { [item[filterKey]]: true })
29 ), {});
30 previous = previous.filter(item => !cache[item[filterKey]]);
31 }
32
33 const next = previous.concat(current);
34 if (previous.length !== current.length) {
35 await keyValueStores.putRecord({
36 key,
37 body: JSON.stringify(next)
38 });
39 } else {
40 log('No state modifications required.');
41 }
42
43 log('Previous results:', previous.length);
44 log('Current results:', current.length);
45 log('Next results:', next.length);
46 return { previous, current, next };
47}
48
49async function createDataset(datasetName, bigquery) {
50 const [datasets] = await bigquery.getDatasets();
51 const currentDataset = datasets.find(dataset => dataset.id === datasetName);
52 if (currentDataset) {
53 return currentDataset;
54 }
55 return bigquery
56 .createDataset(datasetName)
57 .then((results) => {
58 const [dataset] = results;
59 log(`Dataset ${dataset.id} created.`);
60 return dataset;
61 })
62 .catch((err) => {
63 error('Error while creating dataset:', err);
64 });
65}
66
67async function getOrCreateTable(dataset, tableId) {
68 const [tables] = await dataset.getTables();
69 const currentTable = tables.find(({ id }) => id === tableId);
70 if (currentTable) {
71 log(`Found ${tableId} table.`);
72 return currentTable;
73 }
74 return dataset
75 .createTable(tableId)
76 .then((results) => {
77 const [table] = results;
78 log(`Table ${table.id} created.`);
79 return table;
80 })
81 .catch((err) => {
82 error('Error while creating table:', err);
83 });
84}
85async function uploadFile(table, filename) {
86 let job;
87 const config = {
88 autodetect: true
89 };
90 return table
91 .load(filename, config)
92 .then((data) => {
93 [job] = data;
94 log(`Job ${job.id} started.`);
95
96 // Wait for the job to finish
97 return job;
98 })
99 .then((metadata) => {
100 // Check the job's status for errors
101 const errors = metadata.status && metadata.status.errors;
102 if (errors && errors.length > 0) {
103 throw errors;
104 }
105 })
106 .then(() => {
107 log(`Job ${job.id} completed.`);
108 })
109 .catch((err) => {
110 error('Error while uploading file:', err);
111 });
112}
113
114Apify.main(async () => {
115 const input = await Apify.getValue('INPUT');
116 const {
117 tableIdAndStoreKey,
118 datasetName,
119 storeId
120 } = input;
121
122 if (!(datasetName || tableIdAndStoreKey || storeId)) {
123 throw new Error('Received invalid input');
124 }
125
126 Apify.client.setOptions({ storeId });
127 isStoreIdSet = true;
128
129 const key = tableIdAndStoreKey.toUpperCase();
130 const tableId = tableIdAndStoreKey.toLowerCase();
131
132 log('Currently running for:', key);
133 const { previous: previousData } = await storeOrGetResults(key);
134 if (!previousData.length) {
135 throw new Error(`The kv-store under the ${key} is empty.`);
136 }
137 log(`Items in ${key}:`, previousData.length);
138
139 const credentials = process.env.CREDENTIALS;
140 const { project_id: projectId } = JSON.parse(credentials);
141 log('Project ID:', projectId);
142
143 const keyFilename = '/credentials.json';
144 try {
145 await fs.writeFileSync(keyFilename, credentials);
146 } catch (err) {
147 throw new Error('Error while saving credentials.');
148 }
149
150 const bigquery = new BigQuery({ projectId, keyFilename });
151
152 log('Getting or creating dataset...');
153 const dataset = await createDataset(datasetName, bigquery);
154 log('Dataset id:', dataset.id);
155
156 log('Getting or creating table...');
157 const table = await getOrCreateTable(dataset, tableId);
158
159 let rows;
160 const options = {
161 format: 'json',
162 gzip: true
163 };
164 try {
165 [rows] = await table.getRows(options);
166 } catch (err) {
167 throw new Error('Error while extracting rows from table.');
168 }
169 log('BigQuery table current results:', rows.length);
170
171 let memo;
172 let data;
173 if (tableId === 'users') {
174 memo = rows.reduce((cache, { username }) => (
175 Object.assign(cache, { [username]: true })
176 ), {});
177 data = previousData.filter(({ username }) => {
178 if (memo[username] || !username) {
179 return false;
180 }
181 memo[username] = true;
182 return true;
183 });
184 } else {
185 memo = rows.reduce((cache, { referrer }) => (
186 Object.assign(cache, { [referrer]: true })
187 ), {});
188 data = previousData.filter(({ referrer }) => {
189 if (memo[referrer] || !referrer) {
190 return false;
191 }
192 memo[referrer] = true;
193 return true;
194 });
195 }
196
197 if (data.length === 0) {
198 log('No new results to insert.');
199 log('Done.');
200 return null;
201 }
202
203 let csv;
204 const [firstItem] = data;
205 const pathToFile = `${tableId}.csv`;
206 const fields = Object.keys(firstItem);
207 try {
208 csv = json2csv({ data, fields });
209 } catch (err) {
210 error('Error while converting JSON to CSV:', err);
211 }
212
213 try {
214 await fs.writeFileSync(pathToFile, csv);
215 } catch (err) {
216 throw new Error('Error while saving CSV file:', err);
217 }
218 log(`File '${pathToFile}' saved.`);
219
220 log(`Adding ${data.length} new results to the '${tableId}' table...`);
221 await uploadFile(table, pathToFile);
222
223 log('Done.');
224 return null;
225});
Developer
Maintained by Community
Actor Metrics
1 monthly user
-
2 stars
Created in Jan 2018
Modified 2 years ago
Categories