Big Query avatar
Big Query
Try for free

No credit card required

View all Actors
Big Query

Big Query

juansgaitan/big-query
Try for free

No credit card required

Append a CSV file to a Google bigQuery table. Create a "Service credentials" at the https://console.cloud.google.com/, copy & paste the JSON file into the variable value for 'CREDENTIALS' and set it as a secret.

Dockerfile

1# This is a template for a Dockerfile used to run acts in Actor system.
2# The base image name below is set during the act build, based on user settings.
3# IMPORTANT: The base image must set a correct working directory, such as /usr/src/app or /home/user
4FROM apify/actor-node-basic:v0.21.10
5
6# Second, copy just package.json and package-lock.json since it should be
7# the only file that affects "npm install" in the next step, to speed up the build
8COPY package*.json ./
9
10# Install NPM packages, skip optional and development dependencies to
11# keep the image small. Avoid logging too much and print the dependency
12# tree for debugging
13RUN npm --quiet set progress=false \
14 && npm install --only=prod --no-optional \
15 && echo "Installed NPM packages:" \
16 && (npm list --all || true) \
17 && echo "Node.js version:" \
18 && node --version \
19 && echo "NPM version:" \
20 && npm --version
21
22# Copy source code to container
23# Do this in the last step, to have fast build if only the source code changed
24COPY  . ./
25
26# NOTE: The CMD is already defined by the base image.
27# Uncomment this for local node inspector debugging:
28# CMD [ "node", "--inspect=0.0.0.0:9229", "main.js" ]

package.json

1{
2    "name": "apify-project",
3    "version": "0.0.1",
4    "description": "",
5    "author": "It's not you it's me",
6    "license": "ISC",
7    "dependencies": {
8        "apify": "0.21.10",
9        "json2csv": "latest",
10        "@google-cloud/bigquery": "latest"
11    },
12    "scripts": {
13        "start": "node main.js"
14    }
15}

main.js

1// Source file for Hosted source in 'Source type'
2const fs = require('fs');
3const Apify = require('apify');
4const json2csv = require('json2csv'); // eslint-disable-line
5const BigQuery = require('@google-cloud/bigquery'); // eslint-disable-line
6
7const { log, error } = console;
8
9let isStoreIdSet = false;
10async function storeOrGetResults(key, items = [], filterKey) {
11  if (!isStoreIdSet || !key) {
12    throw new Error(`Error while storing or getting results. Missing ${key ?
13      'storeId in store' : 'key value'}.`);
14  }
15
16  const { keyValueStores } = Apify.client;
17  const record = await keyValueStores.getRecord({ key });
18  const storeRecord = record && record.body ? record.body : [];
19  let previous = typeof storeRecord === 'string' ? JSON.parse(storeRecord) : storeRecord;
20
21  if (items.length === 0) {
22    return { previous };
23  }
24
25  const current = items.slice();
26  if (current.length && previous.length && filterKey) {
27    const cache = current.reduce((object, item) => (
28      Object.assign(object, { [item[filterKey]]: true })
29    ), {});
30    previous = previous.filter(item => !cache[item[filterKey]]);
31  }
32
33  const next = previous.concat(current);
34  if (previous.length !== current.length) {
35    await keyValueStores.putRecord({
36      key,
37      body: JSON.stringify(next)
38    });
39  } else {
40    log('No state modifications required.');
41  }
42
43  log('Previous results:', previous.length);
44  log('Current results:', current.length);
45  log('Next results:', next.length);
46  return { previous, current, next };
47}
48
49async function createDataset(datasetName, bigquery) {
50  const [datasets] = await bigquery.getDatasets();
51  const currentDataset = datasets.find(dataset => dataset.id === datasetName);
52  if (currentDataset) {
53    return currentDataset;
54  }
55  return bigquery
56    .createDataset(datasetName)
57    .then((results) => {
58      const [dataset] = results;
59      log(`Dataset ${dataset.id} created.`);
60      return dataset;
61    })
62    .catch((err) => {
63      error('Error while creating dataset:', err);
64    });
65}
66
67async function getOrCreateTable(dataset, tableId) {
68  const [tables] = await dataset.getTables();
69  const currentTable = tables.find(({ id }) => id === tableId);
70  if (currentTable) {
71    log(`Found ${tableId} table.`);
72    return currentTable;
73  }
74  return dataset
75    .createTable(tableId)
76    .then((results) => {
77      const [table] = results;
78      log(`Table ${table.id} created.`);
79      return table;
80    })
81    .catch((err) => {
82      error('Error while creating table:', err);
83    });
84}
85async function uploadFile(table, filename) {
86  let job;
87  const config = {
88    autodetect: true
89  };
90  return table
91    .load(filename, config)
92    .then((data) => {
93      [job] = data;
94      log(`Job ${job.id} started.`);
95
96      // Wait for the job to finish
97      return job;
98    })
99    .then((metadata) => {
100      // Check the job's status for errors
101      const errors = metadata.status && metadata.status.errors;
102      if (errors && errors.length > 0) {
103        throw errors;
104      }
105    })
106    .then(() => {
107      log(`Job ${job.id} completed.`);
108    })
109    .catch((err) => {
110      error('Error while uploading file:', err);
111    });
112}
113
114Apify.main(async () => {
115  const input = await Apify.getValue('INPUT');
116  const {
117    tableIdAndStoreKey,
118    datasetName,
119    storeId
120  } = input;
121
122  if (!(datasetName || tableIdAndStoreKey || storeId)) {
123    throw new Error('Received invalid input');
124  }
125
126  Apify.client.setOptions({ storeId });
127  isStoreIdSet = true;
128
129  const key = tableIdAndStoreKey.toUpperCase();
130  const tableId = tableIdAndStoreKey.toLowerCase();
131
132  log('Currently running for:', key);
133  const { previous: previousData } = await storeOrGetResults(key);
134  if (!previousData.length) {
135    throw new Error(`The kv-store under the ${key} is empty.`);
136  }
137  log(`Items in ${key}:`, previousData.length);
138
139  const credentials = process.env.CREDENTIALS;
140  const { project_id: projectId } = JSON.parse(credentials);
141  log('Project ID:', projectId);
142
143  const keyFilename = '/credentials.json';
144  try {
145    await fs.writeFileSync(keyFilename, credentials);
146  } catch (err) {
147    throw new Error('Error while saving credentials.');
148  }
149
150  const bigquery = new BigQuery({ projectId, keyFilename });
151
152  log('Getting or creating dataset...');
153  const dataset = await createDataset(datasetName, bigquery);
154  log('Dataset id:', dataset.id);
155
156  log('Getting or creating table...');
157  const table = await getOrCreateTable(dataset, tableId);
158
159  let rows;
160  const options = {
161    format: 'json',
162    gzip: true
163  };
164  try {
165    [rows] = await table.getRows(options);
166  } catch (err) {
167    throw new Error('Error while extracting rows from table.');
168  }
169  log('BigQuery table current results:', rows.length);
170
171  let memo;
172  let data;
173  if (tableId === 'users') {
174    memo = rows.reduce((cache, { username }) => (
175      Object.assign(cache, { [username]: true })
176    ), {});
177    data = previousData.filter(({ username }) => {
178      if (memo[username] || !username) {
179        return false;
180      }
181      memo[username] = true;
182      return true;
183    });
184  } else {
185    memo = rows.reduce((cache, { referrer }) => (
186      Object.assign(cache, { [referrer]: true })
187    ), {});
188    data = previousData.filter(({ referrer }) => {
189      if (memo[referrer] || !referrer) {
190        return false;
191      }
192      memo[referrer] = true;
193      return true;
194    });
195  }
196
197  if (data.length === 0) {
198    log('No new results to insert.');
199    log('Done.');
200    return null;
201  }
202
203  let csv;
204  const [firstItem] = data;
205  const pathToFile = `${tableId}.csv`;
206  const fields = Object.keys(firstItem);
207  try {
208    csv = json2csv({ data, fields });
209  } catch (err) {
210    error('Error while converting JSON to CSV:', err);
211  }
212
213  try {
214    await fs.writeFileSync(pathToFile, csv);
215  } catch (err) {
216    throw new Error('Error while saving CSV file:', err);
217  }
218  log(`File '${pathToFile}' saved.`);
219
220  log(`Adding ${data.length} new results to the '${tableId}' table...`);
221  await uploadFile(table, pathToFile);
222
223  log('Done.');
224  return null;
225});
Developer
Maintained by Community
Actor metrics
  • 2 monthly users
  • 0.0 days response time
  • Created in Jan 2018
  • Modified over 1 year ago
Categories