bigquery

Cloud Functions + BigQuery = Data Feed Automation

The efficiencies of automation and hosted solutions are compelling for many enterprises.

Google Cloud Functions constitute an event-driven serverless compute platform that gives you the ability to run your code in the cloud without worrying about the underlying infrastructure.

Cloud Functions provide the following fundamental advantages:

  • In most cases when you want to run any kind of enterprise code for computing or operational functions, you need to have a virtual machine hosting and running your code. Cloud Functions avoid the effort and complexity of maintaining your own virtual machines.
  • Hosting your own infrastructure can be costly relative to the benefit, especially if you only need to run the code a few times a day. Cloud Functions, on the other hand, are ephemeral, spinning up and back down on demand, thereby maximizing efficiency and cost-effectiveness as you take advantage of billable resources. You only pay for the time it takes your code to run.
  • Related to the previous bullet, you can configure Cloud Functions to fire in response to events in the environment, thereby reducing or eliminating the need for manual activation
  • My favorite part is that now you can write our code in Python 3 (beta) and of course JavaScript (Node.JS).

The Use Case

In our use case we’re going to use the event-based trigger in Cloud Functions to:

  1. fire up a function once the GA 360 BigQuery export creates the ga_session_YYYYMMDD table for the day before
  2. then use this table to generate some custom reports
  3. then export these reports to csv files on Cloud Storage Bucket

Why not Dataprep?

So, why not just use Dataprep to run BigQuery views and schedule a Dataflow job to fire at 12:05 am after the GA table is created, then save the results to Cloud Storage with no coding (except the SQL queries of course)? The issue here is that if for any reason the GA BQ export got delayed the BQ views will fail causing your job to fail. Using a Stackdriver trigger is a more failsafe approach. The trigger will only fire once the table is created, eliminating the timing dependency and ensuring that the Cloud Function will find the table when executing the queries.

But can Cloud Functions be triggered by BigQuery?

Actually no, Cloud Functions can’t be triggered with BigQuery. Still, there is a way to work around this using Stackdriver and Pub/Sub (which is one of the source triggers of Cloud Functions). In our use case we’ll be using more than one product from the GCP family like Stackdriver, Pub/Sub, Cloud Functions (Python), Cloud Storage and of course BigQuery (not necessarily in that given order).

It looks like a lot of work!

Well, it may look like there are a lot of products used here, but really almost all the work is done in Cloud Functions and BigQuery, the rest of the products are just helping to close the loop with minimal configurations.

Putting it all to work

So, let’s list all the steps needed to get the job done, then get into details one by one:

  1. Create a new Pub/Sub topic to set as the sink for the Stackdriver export.
  2. Setup a Stackdriver Logging Export, in which we define a filter to monitor BigQuery logs and fire-up when the GA table is created.
  3. Write the BigQuery queries we need to use to extract the needed reports.
  4. Create a new Cloud Function and choose the trigger to be the Pub/Sub topic we created in Step #2.
  5. Write a Python code for the Cloud Function to run these queries and save the results into Pandas dataframes.
  6. Finally, write the dataframes into CSV files in Cloud Storage.
Automated CSV export to Cloud Storage

We’ll not go deep into the functionality of each GCP product. We’ll just review what we need to configure to have our use case working.

The help docs provide additional discussion on the Google Cloud Platform components used in this solution.

Pub/Sub & Stackdriver

First we create a new Pub/Sub topic (which can be done implicitly while creating the Stackdriver export, as shown below). Then we’ll make use of the fact that BigQuery logs are active by default, and define an export in Stackdriver with a filter for only when the GA table is created in BigQuery and configure the Stackdriver export to send the body of this log (which is in JSON format) to our Pub/Sub sink.

Preparing the BigQuery queries

In this step we prepare the BQ queries that will be used to produce the needed reports. Without getting into too much explanation about how to write the BigQuery queries, we’ll use the query below, which retrieves all sessions from the day before that included Add to cart eCommerce action, with all details about the products returned in the query.

SELECT<br />CONCAT(fullVisitorId,'.', CAST(visitId AS string)) AS sessionId,<br />hit.page.pageTitle AS pageTitle,<br />CONCAT(hit.page.hostname, hit.page.pagePath) AS pageURL,<br />hit.page.hostname AS hostname,<br />product.productSKU AS productSKU,<br />product.v2ProductName AS productName,<br />product.v2ProductCategory AS productCategory,<br />product.productPrice/1000000 AS productPrice,<br />product.productQuantity AS productQuantity<br />FROM<br />`..ga_sessions_*`,<br />UNNEST(hits) AS hit,<br />UNNEST(hit.product) AS product<br />WHERE<br />hit.eCommerceAction.action_type = '3'<br />AND _TABLE_SUFFIX = FORMAT_DATETIME("%Y%m%d",<br />DATETIME_ADD(CURRENT_DATETIME(),INTERVAL -1 DAY))

Query retrieving product data for previous day’s sessions that included Add to Cart ecommerce action in Google Analytics.

Creating the Cloud Functions

Now, on to creating our Cloud Function. We need to define the function name, memory to allocate, trigger (Pub/Sub in our case), topic (our topic name), and for the runtime we’ll use Python 3 (which is currently is in beta). There is another option to use Node.JS, but we’ll stick to Python for now.

Configuring the Cloud Function

Python & Cloud Storage

Below is a snippet of the Cloud Function python code used to run, execute and export the BigQuery’s results into a CSV file into a Cloud Storage Bucket.

from google.cloud import bigquery<br />from google.cloud import storage<p>def export_to_gcs():<br /># BQ Query to get add to cart sessions<br />QUERY = "SELECT<br />CONCAT(fullVisitorId,'.', CAST(visitId AS string)) AS sessionId,<br />hit.page.pageTitle AS pageTitle,<br />CONCAT(hit.page.hostname, hit.page.pagePath) AS pageURL,<br />hit.page.hostname AS hostname,<br />product.productSKU AS productSKU,<br />product.v2ProductName AS productName,<br />product.v2ProductCategory AS productCategory,<br />product.productPrice/1000000 AS productPrice,<br />product.productQuantity AS productQuantity<br />FROM<br />`..ga_sessions_*`,<br />UNNEST(hits) AS hit,<br />UNNEST(hit.product) AS product<br />WHERE<br />hit.eCommerceAction.action_type = '3'<br />AND _TABLE_SUFFIX = FORMAT_DATETIME('%Y%m%d',DATETIME_ADD(CURRENT_DATETIME(),INTERVAL -1 DAY))"<br />bq_client = bigquery.Client()<br />query_job = bq_client.query(QUERY) # API request<br />rows_df = query_job.result().to_dataframe() # Waits for query to finish<br />storage_client = storage.Client()<br />bucket = storage_client.get_bucket('BucketName')<br />blob = bucket.blob('Add_to_Cart.csv')<br />blob.upload_from_string(rows_df.to_csv(sep=';',index=False,<br />encoding='utf-8'),content_type='application/octet-stream')<br />

Cloud Function python code, executed when the function is triggered

Here, we are using google.cloud.bigquery and google.cloud.storage packages to:

  1. connect to BigQuery to run the query
  2. save the results into a pandas dataframe
  3. connect to Cloud Storage to save the dataframe to a CSV file.

The final step is to set our Python function export_to_gcs() as “Function to execute” when the Cloud Function is triggered.

How much data can this handle?

Say our data is in the volume of millions of records; we can always extend the memory allocated for our Cloud Function up to 2GB, but this comes with a higher price, and what if even 2GB is not enough?

Another workaround for this is not using Pandas to save query results. Instead, we can save the results to a BigQuery intermediate table, then export this table directly to Cloud Storage, letting BigQuery do all the heavy lifting for us.

Below are two functions to do so. The save_to_bq_table() function runs a query and saves the results to a BigQuery table, here we are setting allow_large_results = True to avoid job crashing if the result set is huge.

def save_to_bq_table():<br />bq_client = bigquery.Client()<br /># Saving data to a intermediate table then export it to GCS<br />query = "##Query with millions of records results##"<br />job_config = bigquery.QueryJobConfig()<br /># Set the destination table<br />table_ref = bq_client.dataset(dataset_id).table('TableID')<br />job_config.destination = table_ref<br />job_config.allow_large_results = True<br /># Start the query, passing in the extra configuration.<br />query_job = bq_client.query(<br />query,<br />location='US', # Location must match that of the source table<br />job_config=job_config) # API request - starts the query<br />query_job.result() # Waits for the query to finish<br />

Function to save query results to a BigQuery intermediate table.

The export_bq_table() function, exports the table to Cloud Storage CSV file(s), then deletes the table.

def export_bq_table():<br />client = bigquery.Client()<br />destination_uri = 'gs://{}/{}'.format('BucketName','ExportFileName_*.csv')<br />dataset_ref = client.dataset(dataset_id, project=project_id)<br />table_ref = dataset_ref.table(tableId)<br />extract_job = client.extract_table(<br />table_ref,<br />destination_uri,<br />location='US') # API request # Location must match that of the source table<br />extract_job.result() # Waits for job to complete.<br />client.delete_table(table_ref) # API request<br />

Function to export the BigQuery intermediate table to Cloud Storage and delete the table.

The exported files will have a limit of 1GB per file, so adding an asterisk * somewhere in the file name in the URI will generate multiple files with incremental files names, FileName-000000000000.csv, FileName-000000000001.csv, FileName-000000000002.csv and so on.

Summary

Although Cloud Functions can’t be used for complex transformations which is a task for Dataflow, Cloud Functions are also a very powerful tool that can be used alongside other GCP products to automate quick tasks with little code writing effort. They can be used for exporting data from BigQuery, writing data from Cloud Storage into BigQuery once files are put into a GS Bucket, reacting to a specific HTTP request, monitor Pub/Sub topics to parse and process different messages, and so much more.

Other use cases

Some other use cases of Google Cloud Functions include:

Contact us to discuss data engineering and Google Cloud.
Mahmoud Taha

Data Engineering Team Lead Mahmoud is all about facing challenges, the harder the challenge the sweeter the victory. After receiving his Bachelor degree in Computer Science and a diploma in Software Development he worked as a software developer. Later switching careers to the Business Intelligence and Data Analysis field in which he found his passion working with data, analysis and revealing hidden insights to help companies make better business decisions.

Share
Published by
Mahmoud Taha

Recent Posts

Unlock the Power of Data-Driven Decisions with Amplitude Analytics

In today’s hyper-competitive digital market, businesses need more than just data; they need actionable insights…

3 weeks ago

Optimizing user experiences with Digital Experience Analytics (DXA) platforms

As consumers become increasingly digitally savvy, and more and more brand touchpoints take place online,…

3 months ago

Enabling Value-Based Bidding with Google Tightlock

Marketers are on a constant journey to optimize the efficiency of paid search advertising. In…

4 months ago

This website uses cookies.