High level architectural diagram for a GCP ETL pipeline of currency rates into BigQuery
BigQuery Data Eng Basics Series Data Engineering

Automate a daily ETL of Currency Rates into BigQuery

Step by Step Guide – how to automate adding currency rates to BigQuery via a basic ETL flow.

1. Signup to the free plan from OpenExchangeRates

Generate an ‘App ID’. This will let you get the latest rates 1000 times per month for free.

2. Create a Dataset in BigQuery to hold the data called ‘currency_rates’

Create a dataset called currency_rates in BigQuery
Create a dataset called currency_rates in BigQuery

3. Create a BigQuery Table called “openexchangerates_responses” to Hold all the API responses

We will need a “data” column to hold the JSON from the response as a String for us to parse later, and a “timestamp” column so we know when it was added

create a table called openexchangerates_responses to hold all the API responses
create a table called openexchangerates_responses to hold all the API responses

4. Create Cloud Function called ‘get-latest-currency-rates’ to Call the API and save the JSON response to BigQuery

We’ll do it in Python 3.8 as most Data Engineers language of choice, and have it triggered by “HTTP” so Cloud Scheduler can call it directly.

Create a Cloud Function to call the API and insert the response into BigQuery
Create a Cloud Function to call the API and insert the response into BigQuery

Main.py – be sure to update all the variables to use your own GCP Project ID and App-ID from your OpenExchangeRate account.

import requests
import json
import time;
import os;

from google.cloud import bigquery

# Set any default values for these variables if they are not found from Environment variables
PROJECT_ID = os.environ.get("GCP_PROJECT", "YOUR-GCP-PROJECT")
OPENEXCHANGERATES_KEY = os.environ.get("OPENEXCHANGERATES_KEY", "YOUR-OPENEXCHANGERATE_APP-ID")
REGIONAL_ENDPOINT = os.environ.get("REGIONAL_ENDPOINT", "us-central1")
DATASET_ID = os.environ.get("DATASET_ID", "currency_rates")
TABLE_NAME = os.environ.get("TABLE_NAME", "openexchangerates_responses")
BASE_CURRENCY = os.environ.get("BASE_CURRENCY", "USD")

def main(request):

    latest_response = get_latest_currency_rates();
    write_to_bq(latest_response)
    return "Success"

def get_latest_currency_rates():
    
    params={'app_id': OPENEXCHANGERATES_KEY , 'base': BASE_CURRENCY}
    response = requests.get("https://openexchangerates.org/api/latest.json", params=params)
    print(response.json())
    return response.json()

def write_to_bq(response):

    # Instantiates a client
    bigquery_client = bigquery.Client(project=PROJECT_ID)

    # Prepares a reference to the dataset
    dataset_ref = bigquery_client.dataset(DATASET_ID)

    table_ref = dataset_ref.table(TABLE_NAME)
    table = bigquery_client.get_table(table_ref) 

    # get the current timestamp so we know how fresh the data is
    timestamp = time.time()
    # Ensure the Response is a String not JSON
    rows_to_insert = [{"timestamp":timestamp,"data":json.dumps(response)}]

    errors = bigquery_client.insert_rows(table, rows_to_insert)  # API request
    print(errors)
    assert errors == []

requirements.py

google-cloud-bigquery==2.13.0
Flask==1.0.2

5. Test the Function, this will add the first Row to BigQuery

Create a Cloud Function to call the API and insert the response into BigQuery
Create a Cloud Function to call the API and insert the response into BigQuery

6. Create a view in the BigQuery dataset to pull the Latest Currency Rates from the last inserted API response

In the BigQuery query console, add this SQL, then click “Save – > Save View”

WITH latest_response as (
    SELECT  
        timestamp,
        JSON_EXTRACT(data, '$.rates') as rates
    FROM 
    ( 
        -- order by timestamp to get the latest
        select 
            timestamp,
            data ,
            ROW_NUMBER() OVER (PARTITION BY data ORDER BY timestamp desc ) as rowNumber--add row number to enable filtering to latest

        -- Update this is you have used a different dataset or table name
        FROM currency_rates.openexchangerates_responses responses
    )
    WHERE 
        -- swap out for your own currency assuming you updated API request too
        JSON_EXTRACT_SCALAR(data, '$.base') = "USD"

        and rowNumber=1-- Get the latest row

), parsed_data as (
    
    SELECT 
        *,
        SPLIT(pair, ':') AS currencyKeyValues 
        FROM
        (
            SELECT 
                timestamp,
                "USD" AS base_currency,
                 REGEXP_EXTRACT_ALL(rates, r'"[^"]+":\d+\.?\d*') AS pair
            FROM latest_response
        )
    CROSS JOIN UNNEST (pair) AS pair
)

SELECT 
    timestamp as last_updated, 
    base_currency,  
    currencyKeyValues[offset(0)] AS currency,  
    currencyKeyValues[offset(1)] AS rate
FROM parsed_data
Saving the query as a view so we can always get the latest Currency Rates
Saving the query as a view so we can always get the latest Currency Rates

7. Test data the data

Query the View we created to ensure you have a row for each currency.

select * from currency_rates.latest_currency_rates
Example of the result you should see from testing the view
Example of the result you should see from testing the view

For extra points you can test it against a dataset of the price of bread in each currency from around the world:

8. Setup Cloud scheduler to run daily

Get the URL to trigger the Cloud Function we made earlier

How to get the URL of the Cloud Function
How to get the URL of the Cloud Function

Use it to create a Cloud Scheduler Job that runs every night at midnight

How to create a Cloud Scheduler Job to call the Cloud Function every night
How to create a Cloud Scheduler Job to call the Cloud Function every night

9. Optional extras

  • Change the BASE_CURRENCY from USD to your own, e.g. “GBP”. This will need to be done in the Cloud Function, and the BigQuery View
  • Pattern the infrastructure in Terraform. This will automatically create the Dataset, Table, Views and Functions, and you can automate the deployment via a CI/ CD pipeline.
  • Use this data when making reports and sharing them in you Report API Account

Here is the Github repo to the full code, and the same tutorial but written for Terraform can be found here.

If you have any issues with the guide, or if the GCP Console has changed so the images need updating then please just let me know. [email protected]