BigQuery
  • Updated on 20 Jul 2020
  • 8 minutes to read
  • Print
  • Share
  • Dark
    Light

BigQuery

  • Print
  • Share
  • Dark
    Light

One of the key elements of any machine learning solution is to figure out how to run it in production. It might be easy to run the initial PoC, but in the operational use there will be new data coming in, and the model needs to take this new training data into account in predictions.

Many customers use Aito for robotic process automation (RPA). In this context every time a bot executes a workflow, there might be a new entry used to improve the predictions.

Using traditional machine learning libraries that produce a static model, the workflow requires engineers to periodically retrain the model, test and deploy it. For a lot of the RPA projects this adds additional pain as the infrastructure and pipelines are often not ready for this. It's just too complex.

Aito works in a different way.

There is a better way

Every time you input a new datapoint into Aito's database, it is automatically used as training data for next predictions. So essentially every time an RPA bot runs and feeds a new datapoint into Aito, it will use the latest training data. Aito's predictions evolve on the fly, without need for constant retraining.

Segment.com and BigQuery

While there are tons of ways to input data into Aito, I wanted to provide an example of how to do that with Python. This tutorial focuses on how to input data from BigQuery in to Aito using Python SDK. Same works with any database with Python client.

Example dataset here is Aito's web analytics data that we orchestrate through Segment.com, and all ends up in BigQuery data warehouse. We use this data to, for example, understand what type of website actions correlate most with actions like signup, or becoming an active user.

Again, the contents of the data is not relevant to this example. This tutorial will not focus on the predictions, but on the data wrangling only.

Before you begin

Getting an Aito instance

If you want to follow along with this tutorial, you'll have to go and get your own Aito instance from the Aito Console.

  1. Sign in or create an account, if you haven't got one already.
  2. In the Aito Console go to the instances page and click the "Create an instance" button.
  3. Select the instance type you want to create and fill in needed fields, Sandbox is the free instance for testing and small projects. Visit our pricing page, to learn more about the Aito instance types.
  4. Click "Create instance" and wait for a moment while your instance is created, you will receive an email once your instance is ready.

If you're totally new to Aito the get started guides might be useful before reading this tutorial.

Accessing instance details

After the instance creation is ready, you can access the URL and API keys of the instance by

  1. Log in to Aito Console
  2. Click on the instance your created
  3. Go to the overview page. You can copy the API keys after clicking the eye icon.

api_info

Get the Aito Python SDK

  1. Get the Aito Python SDKl if you don't have it yet (you'll need Python3.6 or higher installed)
pip install aitoai
  1. Configure the CLI using the instance URL and read/write API key.
aito configure

Prerequisites checklist

Here is a checklist of things you need before we can start:

  • You need to have your Aito instance. Free Sandboxes are available, sign up.
  • Have source data that you want to push into Aito and use for predictions, preferably in BigQuery or in some other database. If you just want to play around, the world is full of free datasets. If you have your own Google Analytics account, check for instructions to get data into BigQuery. If not, maybe check the Google Analytics sample data, but remember that it will not update continuously, so you need to fake it to test the daily batch uploads.
  • Have necessary components installed for Python, connecting to your database and of course the Aito Python SDK. Code examples have been tested with version 0.2.1.
  • Have you already created the schema in Aito for your data? You'll need to do this as well. Essentially, you can just drop an example file in Aito Console and watch the schema autocreate, or DIY with help. You can also use the quick-add-table feature of the Aito CLI (comes with the Aito SDK).
aito quick-add-table --file-format csv --table-name table_name data_file.csv

Review the logic and schema

The logic of the daily batch transfer we seek to achieve is:

  1. Fetch new entries (based on previous max timestamp in Aito) from BigQuery to a dataframe
  2. Upload dataframe to Aito
  3. Keep only a recent snapshot of data in Aito. We clean data points that are older than X days
  4. EXTRA: Run evaluation to track prediction accuracy, and write result in a file for monitoring purposes

Clearly this is just one example. Especially with deleting old data you need to use your own judgement. In some scenarios it's better to keep a long history, in some cases predicting from more recent data is faster and serves the purpose.

One table schema from our example database is shown here. In real life scenarios there are multiple tables that sync from BigQuery to Aito. As an example of this, the fields anonymous_id links to another table called aito_segment_user.

If you have worked with Google Analytics, this dataset is most likely very familiar to you.

{
  "type": "table",
  "columns": {
    "anonymous_id": {
      "link": "aito_segment_user.id",
      "nullable": False,
      "type": "String"
    },
    "context_campaign_medium": {
      "nullable": True,
      "type": "String"
    },
    "context_campaign_name": {
      "nullable": True,
      "type": "String"
    },
    "context_campaign_source": {
      "nullable": True,
      "type": "String"
    },
    "context_ip": {
      "nullable": False,
      "type": "String"
    },
    "context_locale": {
      "nullable": False,
      "type": "String"
    },
    "context_page_path": {
      "nullable": False,
      "type": "String"
    },
    "context_page_referrer": {
      "nullable": True,
      "type": "String"
    },
    "context_page_title": {
      "nullable": True,
      "type": "String"
    },
    "context_user_agent": {
      "nullable": False,
      "type": "String"
    },
    "id": {
      "nullable": False,
      "type": "String"
    },
    "unix_timestamp": {
      "nullable": False,
      "type": "Int"
    },
    "referrer": {
      "nullable": True,
      "type": "String"
    },
    "search": {
      "nullable": True,
      "type": "Text",
      "analyzer": "en"
    },
    "url": {
      "nullable": False,
      "type": "String"
    }
  }
}

Finally there is some code!

A housekeeping note. We are using pandas_gbq to make queries to BigQuery, but you can do the same with Google's own BigQuery Python Client.

Similarly, we are using GCP Service Accounts to authenticate, but again this is up to you.

import pandas as pd
import pandas_gbq
from google.oauth2 import service_account
from aito.aito_client import AitoClient

pandas_gbq.context.credentials = service_account.Credentials.from_service_account_file('YOUR-OWN-FILE.json')
pandas_gbq.context.project = 'YOUR-OWN-GCP-PROJECT'

aito = AitoClient("https://$AITO_INSTANCE_URL", "YOUR-OWN-AITO-API-KEY")

Get data from BigQuery

First let's check what is the latest entry currently in my Aito database.Use Aito's search endpoint, and order by the descending timestamp (unix secs) and limiting to 1 result. It's like a DIY max function!

body = {
    "from": "aito_segment_web",
    "orderBy": { "$desc": "unix_timestamp" },
    "limit": 1,
    "select": ["unix_timestamp"]
}

batch_start_unixtime = aito.request(
    method = 'POST',
    endpoint = '/api/v1/_search',
    query = body
)['hits'][0]['unix_timestamp']

Pay attention to grab only the timestamp to the variable with ['hits'][0]['unix_timestamp'] instead of the entire JSON response. The value is an integer. Relevant for later on!

Next, let's fetch the data from BigQuery into a dataframe. It has to match the schema in Aito, therefore be careful how you construct it. Essentially you want an exact match, otherwise you'll be getting steady stream of error messages.

sql = """
SELECT
  anonymous_id,
  context_campaign_medium,
  context_campaign_name,
  context_campaign_source,
  context_ip,
  context_locale,
  context_page_path,
  context_page_referrer,
  context_page_title,
  context_user_agent,
  id,
  UNIX_SECONDS(timestamp) as unix_timestamp,
  referrer,
  search,
  url
FROM aito_ai_website.pages
WHERE
  UNIX_SECONDS(timestamp) > {starttime};
""".format(starttime=str(batch_start_unixtime))

df = pandas_gbq.read_gbq(sql)

Upload to Aito

Now this step is simple! Using Aito Python client, we'll upload the data in batches. By default data is split to batches of 1000 rows, but you can control it if you wish a different batch size.

In the below command we are declaring the destination table to be aito_segment_web and convert the dataframe to a Dictionary, oriented by records.

aito.upload_entries(
    "aito_segment_web",
    df.to_dict(orient="records"))

New data uploaded. That was it! Not too bad right?

Janitorial duties

Aito is not a data lake. Nor a data warehouse. It's a predictive database. So the intention is not to keep your entire history of data there, but only data that is needed for high quality predictions. It is very challenging to give a general rule on what is the optimal sample. You'll learn it by quickly testing different datasets and using Aito's evaluate end point to get accuracy.

For demo purposes, we will purge data that is older than 90 days, three months.

cutoff_unix_sec = int(time.time()) - (90 * 24 * 60 * 60) # days * hours * minutes * seconds

del_body = {
    "from": "aito_segment_web",
    "where": { "unix_timestamp" : { "$lt" : cutoff_unix_sec } }
}

aito.request(
    method = 'POST',
    endpoint = '/api/v1/data/_delete',
    query = del_body
)

Evaluate accuracy with the new dataset

Evaluating prediction accuracy is discussed in this How-to article. It would require going a couple of steps deeper into our prediction targets to make sense, and that is beyond the scope of this tutorial. However, here are some pointers for the curious ones:

  • Define an evaluate query body.
  • Send it to Aito as a job, as evaluates take usually more time than our normal query timeout of 30 secs.
  • After getting the results, you would append the vital statistics like size of sthe ample, error rate, accuracy etc to a file for later examination.

Summary

Every developer knows there is more required in production than what we did here. In reality you would add e.g. error handling, and rather use a bit more lines to make your code maintainable. You probably have several linked tables (Aito supports inference in relational datasets!), which I chose to ignore to keep things simple. In addition you would schedule the execution, using cron or maybe schedule a recurring Python script in GCP.

The key takeaway, however, is this. With 15 lines of Python code you are keeping the machine learning model up to date. Nothing else is needed. This will save some serious pain and suffering compared to managing all the model training and deployment yourself, day in day out.

Was this article helpful?