As a data scientist focusing on developing data products, you naturally want your work to reach its target audience. Suppose, however, that your company does not have a dedicated engineering team for productizing data-science code. One solution is to seek help in other teams, which are surely busy with their own endeavors, and spend months waiting. Alternatively, you could take the initiative and do it yourself. In this article, we take the initiative and schedule the training and application phases of a predictive model using Apache Airflow, Google Compute Engine, and Docker.

Let us first set expectations for what is assumed to be given and what will be attained by the end of this article. It is assumed that a predictive model for supporting business decisions—such as a model for identifying potential churners or a model for estimating the lifetime value of customers—has already been developed. This means that a business problem has already been identified and translated into a concrete question, the data needed for answering the question have already been collected and transformed into a target variable and a set of explanatory variables, and a modeling technique has already been selected and calibrated in order to answer the question by predicting the target variable given the explanatory variables. For the sake of concreteness, the model is assumed to be written in Python. We also assume that the company at hand has chosen Google Cloud Platform as its primary platform, which makes a certain suite of tools readily available.

Our goal is then to schedule the model to run in the cloud via Airflow, Compute Engine, and Docker so that it is periodically retrained (in order to take into account potential fluctuations in the data distribution) and periodically applied (in order to actually make predictions), delivering predictions to the data warehouse in the form of BigQuery for further consumption by other parties.

It is important to note that this article is not a tutorial on any of the aforementioned technologies. The reader is assumed to be familiar with Google Cloud Platform and to have an understanding of Airflow and Docker, as well as to be comfortable with finding out missing details on their own.

Lastly, the following two repositories contain the code discussed below:

Preparing the model

The suggested structure of the repository hosting the model is as follows:

.
├── configs/
│   ├── application.json
│   └── training.json
├── prediction/
│   ├── __init__.py
│   ├── main.py
│   ├── model.py
│   └── task.py
├── README.md
└── requirements.txt

Here prediction/ is a Python package, and it is likely to contain many more files than the ones listed. The main file is the entry point for command-line invocation, the task module defines the actions that the package is capable of performing, and the model module defines the model.

As alluded to above, the primary job of the main file is to parse command-line arguments, read a configuration file, potentially set up logging and alike, and delegate the rest to the task module. At a later stage, an invocation of an action might look as follows:

python -m prediction.main --action training --config configs/training.json

Here we are passing two arguments: --action and --config. The former is to specify the desired action, and the latter is to supply additional configuration parameters, such as the location of the training data and the values of the model’s hyperparameters. Keeping all parameters in a separate file, as opposed to hard-coding them, makes the code reusable, and passing them all at once as a single file scales much better than passing each parameter as a separate argument.

The task module is conceptually as follows (see the repository for the exact implementation):

class Task:

    def training(self):
        # Read the training data
        # Train the model
        # Save the trained model

    def application(self):
        # Read the application data
        # Load the trained model
        # Make predictions
        # Save the predictions

In this example, there are two tasks: training and application. The training task is responsible for fetching the training data, training the model, and saving the result in a predefined location for future usage by the application task. The application task is responsible for fetching the application data (that is, the data the model is supposed to be applied to), loading the trained model produced by the training task, making predictions, and saving them in a predefined location to be picked up for the subsequent delivery to the data warehouse.

Likewise, the model module can be simplified as follows:

class Model:

    def fit(self, data):
        # Estimate the model’s parameters

    def predict(self, data):
        # Make predictions using the estimated parameters

It can be seen that the structure presented above makes very few assumptions about the model, which makes it generally applicable. It can also be easily extended to accommodate other actions. For instance, one could have a separate action for testing the model on unseen data.

Having structured the model as shown above, it can now be productized, which we discuss next.

Wrapping the model into a service

Now it is time to turn the model into a service. In the scope of this article, a service is a self-sufficient piece of code that can be executed in the cloud upon request. To this end, another repository is created, adhering to the separation-of-concerns design principle. Specifically, by doing so, we avoid mixing the modeling code with the code specific to a particular environment where the model happens to be deployed. The suggested structure of the repository is as follows:

.
├── container/
│   ├── Dockerfile
│   ├── run.sh
│   └── wait.sh
├── service/
│   ├── configs/
│   │   ├── application.json
│   │   └── training.json
│   ├── source/                # the first repository as a submodule
│   └── requirements.txt
├── scheduler/
│   ├── configs/
│   │   ├── application.json
│   │   └── training.json
│   ├── application.py         # a symbolic link to graph.py
│   ├── graph.py
│   └── training.py            # a symbolic link to graph.py
├── Makefile
└── README.md

The container/ folder contains files for building a Docker image for the service. The service/ folder is the service itself, meaning that these files will be present in the container and eventually executed. Lastly, the scheduler/ folder contains files for scheduling the service using Airflow. The last one will be covered in the next section; here we focus on the first two.

Let us start with service/. The first repository (the one discussed in the previous section) is added to this second repository as a Git submodule living in service/source/. This means that the model will essentially be embedded in the service but will conveniently remain an independent entity. At all times, the service contains a reference to a particular state (a particular commit, potentially on a dedicated release branch) of the model, guaranteeing that the desired version of the model is in production. However, when invoking the model from the service, we might want to use a different set of configuration files than the ones present in the first repository. To this end, a service-specific version of the configuration files is created in service/configs/. We might also want to install additional Python dependencies; hence, there is a separate file with requirements.

Now it is time to containerize the service code by building a Docker image. The relevant files are gathered in container/. The image is defined in container/Dockerfile and is as follows:

# Use a minimal Python image
FROM python:3.7-slim

# Install Google Cloud SDK as described in
# https://cloud.google.com/sdk/docs/quickstart-debian-ubuntu

# Copy the service directory to the image
COPY service /service
# Copy the run script to the image
COPY container/run.sh /run.sh

# Install Python dependencies specific to the predictive model
RUN pip install --upgrade --requirement /service/source/requirements.txt
# Install Python dependencies specific to the service
RUN pip install --upgrade --requirement /service/requirements.txt

# Set the working directory to be the service directory
WORKDIR /service

# Set the entry point to be the run script
ENTRYPOINT /run.sh

As mentioned earlier, service/ gets copied as is (including service/source with the model), and it will be the working directory inside the container. We also copy container/run.sh, which becomes the entry point of the container; this script is executed whenever a container is launched. Let us take a look at the content of the script (as before, some parts omitted for clarity):

#!/bin/bash

function process_training() {
  # Invoke training
  python -m prediction.main \
    --action ${ACTION} \
    --config configs/${ACTION}.json
  # Set the output location in Cloud Storage
  local output=gs://${NAME}/${VERSION}/${ACTION}/${timestamp}
  # Copy the trained model from the output directory to Cloud Storage
  save output ${output}
}

function process_application() {
  # Find the latest trained model in Cloud Storage
  # Copy the trained model from Cloud Storage to the output directory
  load ${input} output
  # Invoke application
  python -m prediction.main \
    --action ${ACTION} \
    --config configs/${ACTION}.json
  # Set the output location in Cloud Storage
  local output=gs://${NAME}/${VERSION}/${ACTION}/${timestamp}
  # Copy the predictions from the output directory to Cloud Storage
  save output ${output}
  # Set the input file in Cloud Storage
  # Set the output data set and table in BigQuery
  # Ingest the predictions from Cloud Storage into BigQuery
  ingest ${input} ${output} player_id:STRING,label:BOOL
}

function delete() {
  # Delete a Compute Engine instance called "${NAME}-${VERSION}-${ACTION}"
}

function ingest() {
  # Ingest a file from Cloud Storage into a table in BigQuery
}

function load() {
  # Sync the content of a location in Cloud Storage with a local directory
}

function save() {
  # Sync the content of a local directory with a location in Cloud Storage
}

function send() {
  # Write into a Stackdriver log called "${NAME}-${VERSION}-${ACTION}"
}

# Invoke the delete function when the script exits regardless of the reason
trap delete EXIT

# Report a successful start to Stackdriver
send 'Running the action...'
# Invoke the function specified by the ACTION environment variable
process_${ACTION}
# Report a successful completion to Stackdriver
send 'Well done.'

The script expects a number of environment variables to be set upon each container launch, which will be discussed shortly. The primary ones are NAME, VERSION, and ACTION, indicating the name of the service, version of the service, and action to be executed by the service, respectively.

As we shall see below, the above script interacts with several different products on Google Cloud Platform. It might then be surprising that there is only a handful of variables passed to the script. The explanation is that the convention-over-configuration design paradigm is followed to a great extent here, meaning that other necessary variables can be derived (save sensible default values) from the ones given, since there are certain naming conventions used throughout the project.

The process_training and process_application are responsible for training and application, respectively. It can be seen that they leverage the command-line interface by invoking the main file, which was discussed in the previous section. Since containers are stateless, all artifacts are stored in an external storage, which is a bucket in Cloud Storage in our case, and this job is delegated to the load and save functions used in both process_training and process_application. In addition, the result of the application action (that is, the predictions) is ingested into a table in BigQuery using Cloud SDK, which can be seen in the ingest function in container/run.sh.

The container communicates with the outside world using Stackdriver via the send function, which writes messages to a log dedicated to the current service run. The most important message is the one indicating a successful completion, which is sent at the very end; we use “Well done” for this purpose. This is the message that will be looked for in order to determine the overall outcome of a service run.

Note also that, upon successful or unsuccessful completion, the container deletes its hosting virtual machine, which is achieved by setting a handler (delete) for the EXIT event.

Lastly, let us discuss the commands used for building the image and launching the actions. This entails a few lengthy invocations of Cloud SDK, which can be neatly organized in a Makefile:

# The name of the service
name ?= example-prediction-service
# The version of the service
version ?= 2019-00-00

# The name of the project on Google Cloud Platform
project ?= example-cloud-project
# The zone for operations in Compute Engine
zone ?= europe-west1-b
# The address of Container Registry
registry ?= eu.gcr.io

# The name of the Docker image
image := ${name}
# The name of the instance excluding the action
instance := ${name}-${version}

build:
	docker rmi ${image} 2> /dev/null || true
	docker build --file container/Dockerfile --tag ${image} .
	docker tag ${image} ${registry}/${project}/${image}:${version}
	docker push ${registry}/${project}/${image}:${version}

training-start:
	gcloud compute instances create-with-container ${instance}-training \
		--container-image ${registry}/${project}/${image}:${version} \
		--container-env NAME=${name} \
		--container-env VERSION=${version} \
		--container-env ACTION=training \
		--container-env ZONE=${zone} \
		--container-restart-policy never \
		--no-restart-on-failure \
		--machine-type n1-standard-1 \
		--scopes default,bigquery,compute-rw,storage-rw
		--zone ${zone}

training-wait:
	container/wait.sh instance ${instance}-training ${zone}

training-check:
	container/wait.sh success ${instance}-training

# Similar for application

Here we define one command for building images, namely build, and three commands per action, namely start, wait, and check. In this section, we discuss build and start and leave the last two for the next section, as they are needed specifically for scheduling.

The build command is invoked as follows:

make build

It has to be used each time a new version of the service is to be deployed. The command creates a local Docker image according to the recipe in container/Dockerfile and uploads it to Container Registry, which is Google’s storage for Docker images. For the last operation to succeed, your local Docker has to be configured appropriately, which boils down to the following lines:

gcloud auth login # General authentication for Cloud SDK
gcloud auth configure-docker

Once build has finished successfully, one should be able to see the newly created image in Cloud Console by navigating to Container Registry in the menu to the left. All future versions of the service will be neatly grouped in a separate folder in the registry.

Given that the image is in the cloud, we can start to create virtual machines running containers with this particular image, which is what the start command is for:

make training-start # Similar for application

Internally, it relies on gcloud compute instances create-with-container, which can be seen in Makefile listed above. There are a few aspects to note about this command. Apart from selecting the right image and version (--container-image), one has to make sure to set the environment variables mentioned earlier, as they control what the container will be doing once launched. This is achieved by passing a number of --container-env options to create-with-container. Here one can also easily scale up and down the host virtual machine via the --machine-type option. Lastly, it is important to set the --scopes option correctly in order to empower the container to work with BigQuery, Compute Engine, and Cloud Storage.

At this point, we have a few handy commands for working with the service. It is time for scheduling.

Scheduling the service

The goal now is to make both training and application be executed periodically, promptly delivering predictions to the data warehouse. Technically, one could just keep invoking make training-start and make application-start on their local machine, but of course, this is neither convenient nor reliable. Instead, we would like to have an autonomous scheduler running in the cloud that would, apart from its primary task of dispatching jobs, manage temporal dependencies between jobs, keep record of all past and upcoming jobs, and preferably provide a web-based dashboard for monitoring. One such tool is Airflow, and it is the one used in this article.

In Airflow, the work to be performed is expressed as a directed acyclic graph defined using Python. Our job is to create two such graphs. One is for training, and one is for application, each with its own periodicity. At this point, it might seem that each graph should contain only one node calling the start command, which was introduced earlier. However, a more comprehensive solution is to not only start the service but also wait for its termination and check that it successfully executed the corresponding logic. It will give us great visibility on the life cycle of the service in terms of various statistics (for instance, the duration and outcome of all runs) directly in Airflow.

The above is the reason we have defined two additional commands in Makefile: wait and check. The wait command ensures that the virtual machine reached a terminal state (regardless of the outcome), and the check command ensures that the terminal state was the one expected. This functionality can be implemented in different ways. The approach that we use can be seen in container/wait.sh, which is invoked by both operations from Makefile:

#!/bin/bash

function process_instance() {
  echo 'Waiting for the instance to finish...'
  while true; do
    # Try to read some information about the instance
    # Exit successfully when there is no such instance
    wait
  done
}

function process_success() {
  echo 'Waiting for the success to be reported...'
  while true; do
    # Check if the last entry in Stackdriver contains “Well done”
    # Exit successfully if the phrase is present
    wait
  done
}

function wait() {
  echo 'Waiting...'
  sleep 10
}

# Invoke the function specified by the first command-line argument and forward
# the rest of the arguments to this function
process_${1} ${@:2:10}

The script has two main functions. The process_instance function waits for the virtual machine to finish, and it is currently based on trying to fetch some information about the machine in question using Cloud SDK. Whenever this fetching fails, it is an indication of the machine being shut down and destroyed, which is exactly what is needed in this case. The process_success function waits for the key phrase “Well done” to appear in Stackdriver. However, this message might never appear, and this is the reason process_success has a timeout, unlike process_instance.

All right, there are now three commands to schedule in sequence: start, wait, and check. For instance, for training, the exact command sequence is the following:

make training-start
make training-wait
make training-check

We need to create two separate Python files defining two separate Airflow graphs; however, the graphs will be almost identical except for the triggering interval and the prefix of the start, wait, and check commands. It then makes sense to keep the varying parts in separate configuration files and use the exact same code for constructing the graphs, adhering to the do-not-repeat-yourself design principle. The scheduler/configs/ folder contains the configuration files suggested, and scheduler/graph.py is the Python script creating a graph:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator


def configure():
    # Extract the directory containing the current file
    path = os.path.dirname(__file__)
    # Extract the name of the current file without its extension
    name = os.path.splitext(os.path.basename(__file__))[0]
    # Load the configuration file corresponding to the extracted name
    config = os.path.join(path, 'configs', name + '.json')
    config = json.loads(open(config).read())
    return config


def construct(config):

    def _construct_graph(default_args, start_date, **options):
        start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
        return DAG(default_args=default_args, start_date=start_date, **options)

    def _construct_task(graph, name, code):
        return BashOperator(task_id=name, bash_command=code, dag=graph)

    # Construct an empty graph
    graph = _construct_graph(**config['graph'])
    # Construct the specified tasks
    tasks = [_construct_task(graph, **task) for task in config['tasks']]
    tasks = dict([(task.task_id, task) for task in tasks])
    # Enforce the specified dependencies between the tasks
    for child, parent in config['dependencies']:
        tasks[parent].set_downstream(tasks[child])
    return graph


try:
    # Load an appropriate configuration file and construct a graph accordingly
    graph = construct(configure())
except FileNotFoundError:
    # Exit without errors in case the current file has no configuration file
    pass

The script receives no arguments and instead tries to find a suitable configuration file based on its own name, which can be seen in the configure function. Then scheduler/training.py and scheduler/application.py can simply be symbolic links to scheduler/graph.py, avoiding any code repetition. When they are read by Airflow, each one will have its own name, and it will load its own configuration if there is one in scheduler/configs/.

For instance, for training, scheduler/configs/training.json is as follows:

{
  "graph": {
    "dag_id": "example-prediction-service-training",
    "schedule_interval": "0 0 1 * *",
    "start_date": "2019-07-01"
  },
  "tasks": [
    {
      "name": "start",
      "code": "make -C '${ROOT}/..' training-start"
    },
    {
      "name": "wait",
      "code": "make -C '${ROOT}/..' training-wait"
    },
    {
      "name": "check",
      "code": "make -C '${ROOT}/..' training-check"
    }
  ],
  "dependencies": [
    ["wait", "start"],
    ["check", "wait"]
  ]
}

Each configuration file contains three main sections: graph, tasks, and dependencies. The first section prescribes the desired start date, periodicity, and other parameters specific to the graph itself. In this example, the graph is triggered on the first day of every month at midnight (0 0 1 * *), which might be a reasonable frequency for retraining the model. The second section defines what commands should be executed. It can be seen that there is one task for each of the three actions. The -C '${ROOT}/..' part is needed in order to ensure that the right Makefile is used, which is taken care of in scheduler/graph.py. Lastly, the third section dictates the order of execution by enforcing dependencies. In this case, we are saying that wait depends on (should be executed after) start, and that check depends on wait, forming a chain of tasks.

At this point, the graphs are considered to be complete. In order to make Airflow aware of them, the repository can be simply cloned into the dags directory of Airflow.

Lastly, Airflow itself can live on a separate instance in Compute Engine. Alternatively, Cloud Composer provided by Google Cloud Platform can be utilized for this purpose.

Conclusion

Having reached this point, our predictive model is up and running in the cloud in an autonomous fashion, delivering predictions to the data warehouse to act upon. The data warehouse is certainly not the end of the journey, but we stop here and save the discussion for another time.

Although the presented workflow gets the job done, it has its own limitations and weaknesses, which one has to be aware of. The most prominent one is the communication between a Docker container running inside a virtual machine and the scheduler, Airflow. Busy waiting for a virtual machine in Compute Engine to shut down and for Stackdriver to deliver a certain message is arguably not the most reliable solution. There is also a certain overhead associated with starting a virtual machine in Compute Engine, downloading an image from Container Registry, and launching a container. Furthermore, this approach is not suitable for online prediction, as the service does not expose any API for other services to integrate with—its job is making periodically batch predictions.

If you have any suggestions regarding improving the workflow or simply would like to share your thoughts, please leave a comment below or send an e-mail. Feel also free to create an issue or open a pull request on GitHub. Any feedback is very much appreciated!

Follow-up

Since its publication, the workflow presented in this article has been significantly simplified. More specifically, on July 16, 2019, it became possible to execute arbitrary Docker images on Google AI Platform. The platform takes care of the whole life cycle of the container, obviating the need for any wait scripts and ad-hoc communication mechanisms via Stackdriver. Refer to “How to run serverless batch jobs on Google Cloud” by Lak Lakshmanan for further details.

References