Connect with us


Dynamic A/B testing for machine learning models with Amazon SageMaker MLOps projects

In this post, you learn how to create a MLOps project to automate the deployment of an Amazon SageMaker endpoint with multiple production variants for A/B testing. You also deploy a general purpose API and testing infrastructure that includes a multi-armed bandit experiment framework. This testing infrastructure will automatically optimize traffic to the best-performing model…



In this post, you learn how to create a MLOps project to automate the deployment of an Amazon SageMaker endpoint with multiple production variants for A/B testing. You also deploy a general purpose API and testing infrastructure that includes a multi-armed bandit experiment framework. This testing infrastructure will automatically optimize traffic to the best-performing model over time based on user feedback.

Amazon SageMaker MLOps projects are a new capability recently released with Amazon SageMaker Pipelines, the first purpose-built, easy-to-use, continuous integration and continuous delivery (CI/CD) service for ML. The MLOps project template provisions the initial setup required for a complete end-to-end MLOps system, including model building, training, and deployment, and can be customized to support your own organizations requirements.

When deploying new models to production, it’s often a good idea to gradually roll out new models to users as part of an A/B testing experiment in which you monitor a high-level metric, such as click-through rate or conversion rate, to measure if your new model is an improvement over your previously deployed production variant.

This post contains three sections:

  • An introduction to A/B testing design to provide a high-level overview of the concepts and algorithms
  • Hands-on steps for operationalizing an A/B testing deployment pipeline
  • A simulation of an A/B test against your deployed machine learning (ML) model variants

A/B testing for machine learning

In the context of ML, performing A/B testing on the new model and the old model with production traffic can be an effective final step after offline evaluation because in real life many things influence user behavior and metrics. By randomizing which users are in which group (A or B), you minimize the chances that other factors, like mobile vs. desktop, drive your metrics.

Offline evaluation is a data science practice of holding out a test dataset during ML training to evaluate how effective your model is at performing predictions on unseen data. Data scientists iterate on the evaluation process to improve performance, tuning the data preparation and model parameters, using visualizations such an ROC curve to measure a binary classifier’s ability to correctly separate true from false positive predictions.

Before deploying this model to all users, it’s a good idea to run this new or “challenger” model side-by-side with an existing “champion” model in an A/B test to find empirical evidence of the impact this new model has on your business metrics, such as click-through rate, conversion rate, or revenue. By collecting real-time feedback as your model is running, you can optimize how traffic is distributed between the champion and challenger models of the period of the test, which can often run for several weeks.

When you’re confident that this new challenger model is the outperforming your previous model, you can deploy this new model to all users, and begin the process again.

When should you test?

After you identify a clear business metric that you can measure based on regular user feedback, you should consider A/B testing when a change to your dataset or model definition occurs. The following table summarizes some examples.

Different Dataset Different Model
Dataset has been updated to include latest fresh data You’re trying a different algorithm architecture
Dataset has been cleaned, normalized, or scaled differently You’re experimenting with different hyperparameters
Datasets has been resampled to remove bias or adjust for minority class imbalance You’re using transfer learning to fine-tune a pre-trained model

Use case: Recommending helpful reviews

In this post, you build a review helpfulness binary classifier trained on the Amazon Customer Reviews Dataset using the SageMaker Blazing Text algorithm. The following screenshot shows the product page for the Amazon Echo Show 5, which has 937 reviews with an average star rating of 4.5 out of 5. The page also shows a top positive and top critical review.

Users looking to buy an Echo Show (or any product on Amazon) provide feedback on reviews that are helpful, and we use this data to train an ML classification model that can then identify the most helpful reviews based on just the free text. This allows us to surface the best new reviews from the comments while we wait for users to validate our selection with their feedback.

Apply A/B testing

In this section, we deep dive into A/B testing. For more details on how these algorithms are implemented in Python, check out the source code in the GitHub repo.

For an A/B test to be considered successful, you need to perform a statistical analysis of the metrics gathered from the test to determine if there is a statistically significant result. This analysis is based on the significance level you set for the experiment; a 5% significance level is considered the industry standard. For example, a significance level of 0.05 indicates a 5% risk of concluding that a difference exists when there is no actual difference. A lower significance level means that we need stronger evidence for a statistically significant result. For more information about statistical significance, see A Refresher on Statistical Significance.

Traditional A/B testing runs for a defined period based on the number of users necessary to reach a statistically significant result. Tools such as Evan Miller’s Awesome A/B Tools can help you determine how large your sample size needs to be. During this initial period of exploration, you evaluate whether your new model variant is going to challenge the current champion, sending traffic to the less effective variant until the test is complete, which in this case is not until week 5 (as shown in the following graph).

Multi-armed bandit testing is dynamic, and includes a gradual change from exploration to exploitation over the duration of the test, sending more traffic to the challenger variant that is delivering the highest reward as defined by your conversion metric (as shown in the following graph). This reduces the traffic being sent to the less effective variant over the lifetime of the test.

A/B testing strategies

For an A/B Test to be effective, users need to be assigned to a particular model variant for the duration of the experiment.

We start assignment using a random distribution based the initial model weights while we collect user feedback as invocation and conversion metrics. We can use reward probability estimates based on the user feedback we collect to exploit the best-performing model variants with either the simple Epsilon-Greedy bandit strategy, or a more sophisticated strategy such as upper confidence bound or Thompson sampling.


The simple ε-greedy algorithm selects the best variant most of the time, but does random exploration occasionally:

  • If the ε parameter is 0.1 then 10% of the time, we choose a model variant at random. The other 90% of the time, we choose the variant that has the highest expectation of reward. In the following example, this leads us to choosing the better-performing Challenger 1 model more often.
  • When the ε parameter is 0.2, we explore 20% of the time, introducing more chance of selecting the poorer-performing variant, which is why the reward rate is lower overall, as shown with the yellow line.

Upper confidence bound

The upper confidence bound (UCB) algorithm introduces uncertainty around variants by keeping track of how many times a variant is explored:

  • For each variant invocation i, we record the average reward μi and number of times we tried it ni. t is the total number of invocations for all variants. The UCB1 algorithm formula is

  • We explore variants with high uncertainty that are infrequently selected. In the following example, the Challenger 1 variant is more likely to be selected at the end of week 1 due to a higher upper confidence bound of 0.7.
  • In week 2 as uncertainty levels drop, we exploit the variants with the highest mean plus uncertainty, which puts the Challenger 2 variant ahead with an upper confidence bound of 0.55.

Thompson sampling

Thompson sampling estimates the uncertainty around variants by using beta probability distributions:

  • Beta probability distributions are defined by parameters α and β, which correspond to helpful and not helpful reviews, respectively.
  • We explore variants by randomly sampling a distribution for each variant, from which we select the variant with the highest sampled value. In the following initial example, we select Challenger 1 with a value of 5.
  • As we capture user feedback, we update the α and β values, which adjusts the shape of the distributions. As you can see, the skew right in Challenger 2 for week 3 has increased the number of helpful reviews.

In the next section, we explore the solution for implementing A/B testing with SageMaker.

Solution overview

The following diagram illustrates the architecture for our solution.

To build this solution in Amazon SageMaker Studio, we use the AWS Cloud Development Kit (AWS CDK). If you’re new to AWS CDK, we recommend that you start your journey with the AWS CDK Workshop for Python. For more information, see AWS CDK Reference Documentation.

The following are the high-level steps to deploy this solution:

  1. Publish a SageMaker MLOps Project template in the AWS Service Catalog.
  2. Deploy an Amazon API Gateway and testing infrastructure.
  3. Create a new project in Studio.

Then you can train and deploy ML models for A/B testing in the sample notebook provided.

Getting started

Amazon SageMaker MLOps project templates are defined as AWS CloudFormation and published via the AWS Service Catalog. These are made available to data scientists via Studio, an IDE for ML. To configure Studio in your account, complete the following steps:

  1. Prepare your Studio domain.
  2. Enable SageMaker project templates and SageMaker JumpStart for this account and Studio users.
  3. Open Studio.
  4. In the Launcher, under Utilities and files, choose System terminal.

  1. Clone the GitHub repository in this new terminal:

git clone cd amazon-sagemaker-ab-testing-pipeline

For instructions on installation prerequisites and how to configure permissions required for AWS CDK, see the GitHub repo README file.

  1. When your environment is set up, and you can list the AWS CDK stacks using the following command:

You’re ready to move on to the next steps.

Publish the SageMaker MLOps project template

In this step, you create a portfolio and product to provision a custom SageMaker MLOps project template in the AWS Service Catalog and configure it so you can launch the project from within your Studio domain.

Run the following command to deploy the MLOps project template, passing the required ExecutionRoleArn parameter:

cdk deploy ab-testing-service-catalog –parameters ExecutionRoleArn=<

If you don’t have an AWS Identity and Access Management (IAM) execution role available, go to the SageMaker console and choose Amazon SageMaker Studio. In the Studio Summary section, locate the attribute Execution role. Search for the name of this role in IAM to copy the ARN.

AWS CDK lists the changes and asks you to confirm you wish to deploy these. Enter y for yes.

This stack uploads the deployment pipeline code to Amazon Simple Storage Service (Amazon S3) and returns the outputs CodeCommitSeedBucket and CodeCommitSeedKey, which you need when creating the Studio project.

The MLOps project template creates a deployment pipeline that is triggered when a new model is approved, as shown in the following diagram.

The following are the sequence of events that occur when a model is approved in the SageMaker model registry:

  1. The data scientist commits a configuration file to AWS CodeCommit that includes the stage and A/B testing strategy.
  2. The data scientist approves the model in the SageMaker model registry.
  3. An Amazon CloudWatch model approved event starts the AWS CodeBuild job (
  4. CodeBuild pulls the latest source from CodeCommit.
  5. CodeBuild queries the SageMaker model registry for the latest champion and challenger models (
  6. CodeBuild outputs the CloudFormation stack ( to deploy the SageMaker endpoint.

After an endpoint is in service, it’s available to be served by the API, as you see in the next section.

Deploy the API and testing infrastructure

In this step, you deploy an API Gateway and supporting resources to enable dynamic A/B testing of any SageMaker endpoint that has multiple production variants.

Run the following command to deploy the API and testing infrastructure with optional configuration:

cdk deploy ab-testing-api -c stage_name=dev

This stack outputs an ApiEndpoint URL, which you provide to the A/B Testing sample notebook.

The API provides a wrapper around SageMaker to dynamically invoke the model variant assigned to a user.

The following workflow starts when a SageMaker endpoint status changes to be InService.

This workflow includes the following steps:

  1. A CloudWatch endpoint change event triggers the AWS Lambda register handler (
  2. The register handler queries the SageMaker endpoint to get model variant weights and configuration.
  3. The register handler updates an Amazon DynamoDB metrics table to set variant weights and clear any metrics for the endpoint.

The app is now available to make an inference invocation against this endpoint.

  1. The app sends an invocation request to the API specifying the user, endpoint name, and inference payload.
  2. The API handler ( attempts to get the assigned variant from the DynamoDB assignment table for the given user and endpoint. If no variant is found, metrics for the endpoint are retrieved from the DynamoDB metrics table and passed to the bandit algorithm ( The bandit algorithm returns a new variant for the user, which is written back to the DynamoDB assignment table.
  3. The API handler invokes the target variant on the SageMaker endpoint with the inference payload.
  4. The API handler logs the invocation to Amazon Kinesis Data Firehose.
  5. The API handler returns the invocation response to the app, including the assigned user variant.

After a successful conversion action from the user, the app calls the API to update metrics.

  1. The app sends a conversion request to the API specifying the user and endpoint name.
  2. The API handler logs the conversion event for the user and variant to Kinesis Data Firehose.

Periodically, events are delivered to Amazon S3 as a series of JSON lines.

  1. The Firehose delivery stream delivers its events to Amazon S3.
  2. The put to Amazon S3 triggers the Lambda metrics handler passing the source object.
  3. The metrics handler ( reads the S3 object, groups events by endpoint, and updates metrics in the DynamoDB metrics table.
  4. The metrics handler publishes metrics to CloudWatch.

With the MLOps template published and our API infrastructure ready, we can continue.

Create a new project in Studio

When your MLOps project template is registered in the AWS Service Catalog, you can create a project using your new template.

  1. Switch back to the Launcher in Studio.
  2. Choose New project in the ML tasks and components

On the Create project page, SageMaker templates is chosen by default. This option lists the built-in templates. However, you should use the template you published for the A/B testing deployment pipeline.

  1. Choose Organization templates.
  2. Choose A/B Testing Deployment Pipeline.
  3. Choose Select project template.

You may need to refresh the Studio IDE to see the latest organization templates.

  1. In the Project details section, for Name, enter ab-testing-pipeline.

The project name must have 32 characters or fewer.

  1. In the Project template parameters, for StageName, leave the default dev.
  2. For CodeCommitSeedBucket, enter the CodeCommitSeedBucket output from the ab-testing-service-catalog stack.
  3. For CodeCommitSeedKey, enter the CodeCommitSeedKey output from the ab-testing-service-catalog stack.
  4. Choose Create project.

This takes a few minutes to provision the project, in the meantime we can move on to training our models.

Train and deploy ML models for A/B testing

In the following section, you learn how to train, deploy, and simulate a test against our A/B testing pipeline using the mab-review-helpfulness.ipynb sample notebook from the GitHub repository.

Start by browsing to the amazon-sagemaker-ab-testing-pipeline folder you cloned from GitHub, navigate to the notebook directory, and open the Studio notebook named mab-reviews-helpfulness.ipynb.

This notebook takes you through several steps:

  1. Prepare your data.
  2. Run a SageMaker pipeline.
  3. Run your tuning job.
  4. Test the endpoint.
  5. Run your A/B testing simulation.
  6. Determine the winning model.

Data preparation

In this step, we download the electronics reviews from the Amazon Customer Reviews dataset. This dataset contains star_rating as well as helpful_score and total_score fields for each review.

We perform some feature engineering to calculate a helpful score for all reviews with at least five total votes:

df_reviews = df_reviews[df_reviews[‘total_votes’] >= 5] df_reviews[‘helpful_score’] = df_reviews[‘helpful_votes’] / df_reviews[‘total_votes’] df_reviews[‘sentiment’] = pd.cut(df_reviews[‘star_rating’], bins=[0,2,3,6], labels=[‘Negative’,’Nuetral’,’Positive’]) df_reviews.describe()

The following visualization of the results shows that a helpful score threshold of 0.8 aligns well with high-rated products.

We use this as a target variable to create our binary classifier:

df_reviews[‘is_helpful’] = (df_reviews[‘helpful_score’] > 0.80)

We then split the data into training, test, and validation datasets. Transform the training data into the format required for the SageMaker Blazing Text algorithm and upload the text files to Amazon S3.

Run a SageMaker pipeline

In this step, we create a SageMaker pipeline to train and register our model with the deployment project we created.

  1. Edit the notebook cell to update <> with your project name, for example ab-testing-pipeline:

project_name = ‘<>‘ # << Update with A/B testing deployment project

In the next series of cells, you define a SageMaker pipeline that has two steps: training the model and registering the model.

  1. Continue running the cells and choose Start Pipeline.

The pipeline takes a few minutes to run.

  1. Navigate back to your project, which should now be created, and choose the Pipelines

  1. Choose the pipeline to get the list of Executions.
  2. Choose the pipeline run to see the graph visualization that includes the TrainModel and RegisterModel steps.

  1. Return to the notebook to approve the latest model package as our initial champion model:

champion_model_group = f”{project_name}-champion” # Get the latest champion model package packages = sm_client.list_model_packages(ModelPackageGroupName=champion_model_group, SortBy=’CreationTime’, SortOrder=’Descending’, MaxResults=1)[‘ModelPackageSummaryList’] # Approve the model for package in packages: latest_model_package_arn = package[‘ModelPackageArn’] model_package_version = latest_model_package_arn.split(‘/’)[-1] if package[‘ModelApprovalStatus’] == ‘PendingManualApproval’: print(f”Approving Champion Version: {model_package_version}”) model_package_update_response = sm_client.update_model_package( ModelPackageArn=latest_model_package_arn, ModelApprovalStatus=”Approved”, ) else: print(f”Champion Version: {model_package_version} approved”)

You can also manually approve models in Studio via the Model Groups tab in the project page.

Run a tuning job

In this next step of the notebook, we run a SageMaker tuning job to improve on this initial model for our A/B test. The notebook is configured to run a total of nine jobs with three in parallel. This process takes approximately 30 minutes to complete.

When this is complete, we can list these training jobs sorted by accuracy and see the hyperparameters identified in the best-performing training job.

These metrics are also visible on the Experiments tab of the SageMaker project, where you can view and compare results.

If we’re happy with the performance, we can register and approve this model in our challenger model group.

best_estimator = tuner.best_estimator() challenger_model_group = f”{project_name}-challenger” model_package = best_estimator.register( content_types=[“text/plain”], response_types=[“text/csv”], inference_instances=[“ml.t2.large”, “ml.m5.xlarge”], transform_instances=[“ml.m5.xlarge”], model_package_group_name=challenger_model_group, approval_status=”Approved” ) model_package_version = model_package.model_package_arn.split(‘/’)[-1] print(f”Registered and Approved Challenger Version: {model_package_version}”)

This triggers the A/B testing deployment pipeline, which creates a multi-variant SageMaker endpoint with the latest champion model and the challenger model you just approved. The production variant name is prefixed with champion or challenger and followed by the version number (for example, Challenger1).

The deployment pipeline reads a JSON configuration to determine settings to provision the endpoint:

  • stage_name – The stage suffix for the SageMaker endpoint (for example, dev)
  • instance_count – The number of instances to deploy per variant
  • instance_type – The type of instance to deploy per variant
  • challenger_variant_count – The number of challenger models to deploy
  • strategy – The algorithm strategy for selecting user model variants (WeightedSampling, EpsilonGreedy, UCB1, or ThompsonSampling)
  • epsilon – The epsilon parameter used by the EpsilonGreedy strategy
  • warmup – The number of invocations to warm up with WeightedSampling before applying the strategy

The CodeBuild stage in the deployment pipeline queries the SageMaker model registry for the latest N challenger models created after the top champion model. You also have the option of specifying the explicit champion_variant_config and challenger_variant_config model versions and variant_name as well as overriding the instance_count and instance_type, as demonstrated in the following example production config file:

{ “stage_name”: “prod”, “strategy”: “EpsilonGreedy”, “warmup”: 100, “epsilon”: 0.1, “instance_count”: 2, “instance_type”: “ml.c5.large”, “champion_variant_config”: { “model_package_version”: 1, “variant_name”: “Champion”, “instance_count”: 3, “instance_type”: “ml.m5.xlarge” }, “challenger_variant_config”: [ { “model_package_version”: 1, “variant_name”: “Challenger1”, “instance_type”: “ml.c5.xlarge” }, { “model_package_version”: 2, “variant_name”: “Challenger2”, “instance_count”: 1 } ] }

The CodeBuild stage of the pipeline uses AWS CDK to create the CloudFormation template.

After the pipeline is complete, you can see an InService entry for the endpoint.

Test the endpoint

With our new multi-variant endpoint in service, we can perform an offline evaluation with our test set and calculate an overall accuracy for each of these models (your results might vary slightly):

variant_name Challenger1 0.667159 Champion1 0.653592 dtype: float64

Because this is a binary classifier for review helpfulness, we can use a confusion matrix to visualize the number of times our model correctly predicted the review as helpful (true positive) or not helpful (true negative), which shows slight improvement in both instances for our Challenger1 model.

Run the A/B testing simulation

In this step, we use the same test data to run an A/B testing simulation.

We first edit the notebook cell to update rest_api with the ApiEndpoint output from AWS CloudFormation:

rest_api = ‘https://<>.execute-api.<>’ # << Update this with Rest API output

We define a few Python functions to interface with the API, which pass a user_id and endpoint_name to the invocation endpoint. We also provide a wrapper for the conversion and stats endpoints:

import json import os import requests def api_invocation(user_id, text_array): payload = { “user_id”: str(user_id), “endpoint_name”: endpoint_name, “content_type”: “application/json”, “data”: json.dumps({“instances” : text_array, “configuration”: { “k”: 1 }}), } rest_url = os.path.join(rest_api, ‘invocation’) r =, data=json.dumps(payload)) return r.json() def api_conversion(payload): rest_url = os.path.join(rest_api, ‘conversion’) r =, data=json.dumps(payload)) return r.json() def api_stats(): payload = { “endpoint_name”: endpoint_name, } rest_url = os.path.join(rest_api, ‘stats’) r =, data=json.dumps(payload)) return r.json()

The stats endpoint returns the variant metrics for a given endpoint_name, which are all zero to start our test:

{ ‘endpoint_name’: ‘sagemaker-ab-testing-pipeline-dev’, ‘variant_metrics’: [ {‘endpoint_name’: ‘sagemaker-ab-testing-pipeline-dev’, ‘variant_name’: ‘Champion1’, ‘initial_variant_weight’: 1.0, ‘invocation_count’: 0, ‘conversion_count’: 0, ‘reward_sum’: 0.0}, {‘endpoint_name’: ‘sagemaker-ab-testing-pipeline-dev’, ‘variant_name’: ‘Challenger1’, ‘initial_variant_weight’: 1.0, ‘invocation_count’: 0, ‘conversion_count’: 0, ‘reward_sum’: 0.0} ], ‘strategy’: ‘ThompsonSampling’, ‘epsilon’: 0.1, ‘warmup’: 0 }

Next, we split our test review dataset into batches of 20 and invoke the API with a new user on each batch. The API allocates a variant for each user based on the bandit algorithm strategy, and returns this along with the strategy selected in the response. This process takes a few minutes to complete, as indicated by the progress bar.

Our results show that for the first half of the test, the WeightSampling strategy is selected. This is because metrics are updated periodically via the Firehose delivery stream, and the bandit algorithms require reward feedback before they can suggest which variant to assign for a given user.

variant_name strategy Challenger1 ThompsonSampling 302 WeightedSampling 186 Champion1 ThompsonSampling 47 WeightedSampling 177 Name: is_helpful_prediction, dtype: int64

When we plot the cumulative reward for the variants Champion1 and Challenger1, we can see that in the beginning they’re both increasing at a steady rate.

But when our bandit algorithm starts to use the reward rate, we start to favor the Challenger1 model variant because it’s outperforming Champion1.

If we visualize the beta probability distribution over time with these batches, we can see the shape of the Challenger1 distribution starts to become taller and skinner as it gets more helpful vs. not helpful reviews. When randomly sampling in these distributions with the Thompson sampling strategy, we’re more likely to select the Challenger1 variant as it continues to separate from the original Champion1 model.

Determine the winning model

When setting up an A/B testing experiment, you need to do the following:

  • Identify the business metric that you want to improve, such as such as click-through rate or conversion rate
  • Identify the expected performance improvement for that metric (for example, 5%)

Multi-armed bandits provide the opportunity to determine the winner early when the following conditions are met:

  • The experiment has received regular traffic
  • The experiment has run for sufficient time to cancel out any periodicity (for example, 2 weeks)
  • The new variant has exceeded the expected performance improvement.

In our simulation, we can calculate a conversion rate improvement for Challenger1 over Champion1. Assuming a normal distribution, we evaluate whether this is a statistically significant result with a 95% confidence interval. In my case the results were too close to call, so the recommendation is to continue running this test.

Clean up

To clean up all the resources you provisioned in this example, complete the following steps in your terminal, using the AWS Command Line Interface (AWS CLI):

  1. Delete the CloudFormation stack created to provision the SageMaker endpoint:

aws cloudformation delete-stack –stack-name sagemaker-<>-deploy-<>

  1. Empty the S3 bucket containing the artifacts output from the A/B testing deployment pipeline:

aws s3 rm –recursive s3://sagemaker-<>-artifact-<>-<>

  1. Delete the project, which removes the CloudFormation stack that created the deployment pipeline:

aws sagemaker delete-project –project-name <>

  1. Delete the AWS Service Catalog project template:

cdk destroy ab-testing-service-catalog

  1. Finally, delete the API and testing infrastructure:

cdk destroy ab-testing-api

Conclusion and next steps

In this post, you learned how to apply A/B testing to ML models. You used the AWS CDK to publish a custom MLOps template, and deployed a general purpose API and testing infrastructure to simulate A/B testing for recommending helpful reviews.

Because you assigned users to model variants using the Thompson sampling multi-armed bandit strategy, you reduced the volume of traffic sent to the poor-performing variant in the A/B test and reduced the time to call a winner.

As a next step, try using the A/B testing deployment pipeline for your own models in production. The source code is available on the GitHub repo.

About the Author

Julian Bright is an Sr. AI/ML Specialist Solutions Architect based out of Melbourne, Australia. Julian works as part of the global AWS machine learning team and is passionate about helping customers realise their AI and ML journey through MLOps. In his spare time he loves running around after his kids, playing soccer and getting outdoors.


Continue Reading
Click to comment

Leave a Reply

Your email address will not be published.


Build a GNN-based real-time fraud detection solution using Amazon SageMaker, Amazon Neptune, and the Deep Graph Library

Fraudulent activities severely impact many industries, such as e-commerce, social media, and financial services. Frauds could cause a significant loss for businesses and consumers. American consumers reported losing more than $5.8 billion to frauds in 2021, up more than 70% over 2020. Many techniques have been used to detect fraudsters—rule-based filters, anomaly detection, and machine…




Fraudulent activities severely impact many industries, such as e-commerce, social media, and financial services. Frauds could cause a significant loss for businesses and consumers. American consumers reported losing more than $5.8 billion to frauds in 2021, up more than 70% over 2020. Many techniques have been used to detect fraudsters—rule-based filters, anomaly detection, and machine learning (ML) models, to name a few.

In real-world data, entities often involve rich relationships with other entities. Such a graph structure can provide valuable information for anomaly detection. For example, in the following figure, users are connected via shared entities such as Wi-Fi IDs, physical locations, and phone numbers. Due to the large number of unique values of these entities, like phone numbers, it’s difficult to use them in the traditional feature-based models—for example, one-hot encoding all phone numbers wouldn’t be viable. But such relationships could help predict whether a user is a fraudster. If a user has shared several entities with a known fraudster, the user is more likely a fraudster.

Recently, graph neural network (GNN) has become a popular method for fraud detection. GNN models can combine both graph structure and attributes of nodes or edges, such as users or transactions, to learn meaningful representations to distinguish malicious users and events from legitimate ones. This capability is crucial for detecting frauds where fraudsters collude to hide their abnormal features but leave some traces of relations.

Current GNN solutions mainly rely on offline batch training and inference mode, which detect fraudsters after malicious events have happened and losses have occurred. However, catching fraudulent users and activities in real time is crucial for preventing losses. This is particularly true in business cases where there is only one chance to prevent fraudulent activities. For example, in some e-commerce platforms, account registration is wide open. Fraudsters can behave maliciously just once with an account and never use the same account again.

Predicting fraudsters in real time is important. Building such a solution, however, is challenging. Because GNNs are still new to the industry, there are limited online resources on converting GNN models from batch serving to real-time serving. Additionally, it’s challenging to construct a streaming data pipeline that can feed incoming events to a GNN real-time serving API. To the best of the authors’ knowledge, no reference architectures and examples are available for GNN-based real-time inference solutions as of this writing.

To help developers apply GNNs to real-time fraud detection, this post shows how to use Amazon Neptune, Amazon SageMaker, and the Deep Graph Library (DGL), among other AWS services, to construct an end-to-end solution for real-time fraud detection using GNN models.

We focus on four tasks:

  • Processing a tabular transaction dataset into a heterogeneous graph dataset
  • Training a GNN model using SageMaker
  • Deploying the trained GNN models as a SageMaker endpoint
  • Demonstrating real-time inference for incoming transactions

This post extends the previous work in Detecting fraud in heterogeneous networks using Amazon SageMaker and Deep Graph Library, which focuses on the first two tasks. You can refer to that post for more details on heterogeneous graphs, GNNs, and semi-supervised training of GNNs.

Businesses looking for a fully-managed AWS AI service for fraud detection can also use Amazon Fraud Detector, which makes it easy to identify potentially fraudulent online activities, such as the creation of fake accounts or online payment fraud.

Solution overview

This solution contains two major parts.

The first part is a pipeline that processes the data, trains GNN models, and deploys the trained models. It uses AWS Glue to process the transaction data, and saves the processed data to both Amazon Neptune and Amazon Simple Storage Service (Amazon S3). Then, a SageMaker training job is triggered to train a GNN model on the data saved in Amazon S3 to predict whether a transaction is fraudulent. The trained model along with other assets are saved back to Amazon S3 upon the completion of the training job. Finally, the saved model is deployed as a SageMaker endpoint. The pipeline is orchestrated by AWS Step Functions, as shown in the following figure.

The second part of the solution implements real-time fraudulent transaction detection. It starts from a RESTful API that queries the graph database in Neptune to extract the subgraph related to an incoming transaction. It also has a web portal that can simulate business activities, generating online transactions with both fraudulent and legitimate ones. The web portal provides a live visualization of the fraud detection. This part uses Amazon CloudFront, AWS Amplify, AWS AppSync, Amazon API Gateway, Step Functions, and Amazon DocumentDB to rapidly build the web application. The following diagram illustrates the real-time inference process and web portal.

The implementation of this solution, along with an AWS CloudFormation template that can launch the architecture in your AWS account, is publicly available through the following GitHub repo.

Data processing

In this section, we briefly describe how to process an example dataset and convert it from raw tables into a graph with relations identified among different columns.

This solution uses the same dataset, the IEEE-CIS fraud dataset, as the previous post Detecting fraud in heterogeneous networks using Amazon SageMaker and Deep Graph Library. Therefore, the basic principle of the data process is the same. In brief, the fraud dataset includes a transactions table and an identities table, having nearly 500,000 anonymized transaction records along with contextual information (for example, devices used in transactions). Some transactions have a binary label, indicating whether a transaction is fraudulent. Our task is to predict which unlabeled transactions are fraudulent and which are legitimate.

The following figure illustrates the general process of how to convert the IEEE tables into a heterogeneous graph. We first extract two columns from each table. One column is always the transaction ID column, where we set each unique TransactionID as one node. Another column is picked from the categorical columns, such as the ProductCD and id_03 columns, where each unique category was set as a node. If a TransactionID and a unique category appear in the same row, we connect them with one edge. This way, we convert two columns in a table into one bipartite. Then we combine those bipartites along with the TransactionID nodes, where the same TransactionID nodes are merged into one unique node. After this step, we have a heterogeneous graph built from bipartites.

For the rest of the columns that aren’t used to build the graph, we join them together as the feature of the TransactionID nodes. TransactionID values that have the isFraud values are used as the label for model training. Based on this heterogeneous graph, our task becomes a node classification task of the TransactionID nodes. For more details on preparing the graph data for training GNNs, refer to the Feature extraction and Constructing the graph sections of the previous blog post.

The code used in this solution is available in src/scripts/ You can also experiment with data processing through the Jupyter notebook src/sagemaker/01.FD_SL_Process_IEEE-CIS_Dataset.ipynb.

Instead of manually processing the data, as done in the previous post, this solution uses a fully automatic pipeline orchestrated by Step Functions and AWS Glue that supports processing huge datasets in parallel via Apache Spark. The Step Functions workflow is written in AWS Cloud Development Kit (AWS CDK). The following is a code snippet to create this workflow:

import { LambdaInvoke, GlueStartJobRun } from ‘aws-cdk-lib/aws-stepfunctions-tasks’; const parametersNormalizeTask = new LambdaInvoke(this, ‘Parameters normalize’, { lambdaFunction: parametersNormalizeFn, integrationPattern: IntegrationPattern.REQUEST_RESPONSE, }); … const dataProcessTask = new GlueStartJobRun(this, ‘Data Process’, { integrationPattern: IntegrationPattern.RUN_JOB, glueJobName: etlConstruct.jobName, timeout: Duration.hours(5), resultPath: ‘$.dataProcessOutput’, }); … const definition = parametersNormalizeTask .next(dataIngestTask) .next(dataCatalogCrawlerTask) .next(dataProcessTask) .next(hyperParaTask) .next(trainingJobTask) .next(runLoadGraphDataTask) .next(modelRepackagingTask) .next(createModelTask) .next(createEndpointConfigTask) .next(checkEndpointTask) .next(endpointChoice);

Besides constructing the graph data for GNN model training, this workflow also batch loads the graph data into Neptune to conduct real-time inference later on. This batch data loading process is demonstrated in the following code snippet:

from neptune_python_utils.endpoints import Endpoints from neptune_python_utils.bulkload import BulkLoad … bulkload = BulkLoad( source=targetDataPath, endpoints=endpoints, role=args.neptune_iam_role_arn, region=args.region, update_single_cardinality_properties=True, fail_on_error=True) load_status = bulkload.load_async() status, json = load_status.status(details=True, errors=True) load_status.wait()

GNN model training

After the graph data for model training is saved in Amazon S3, a SageMaker training job, which is only charged when the training job is running, is triggered to start the GNN model training process in the Bring Your Own Container (BYOC) mode. It allows you to pack your model training scripts and dependencies in a Docker image, which it uses to create SageMaker training instances. The BYOC method could save significant effort in setting up the training environment. In src/sagemaker/02.FD_SL_Build_Training_Container_Test_Local.ipynb, you can find details of the GNN model training.

Docker image

The first part of the Jupyter notebook file is the training Docker image generation (see the following code snippet):

*!* aws ecr get-login-password –region us-east-1 | docker login –username AWS –password-stdin image_name *=* ‘fraud-detection-with-gnn-on-dgl/training’ *!* docker build -t $image_name ./FD_SL_DGL/gnn_fraud_detection_dgl

We used a PyTorch-based image for the model training. The Deep Graph Library (DGL) and other dependencies are installed when building the Docker image. The GNN model code in the src/sagemaker/FD_SL_DGL/gnn_fraud_detection_dgl folder is copied to the image as well.

Because we process the transaction data into a heterogeneous graph, in this solution we choose the Relational Graph Convolutional Network (RGCN) model, which is specifically designed for heterogeneous graphs. Our RGCN model can train learnable embeddings for the nodes in heterogeneous graphs. Then, the learned embeddings are used as inputs of a fully connected layer for predicting the node labels.


To train the GNN, we need to define a few hyperparameters before the training process, such as the file names of the graph constructed, the number of layers of GNN models, the training epochs, the optimizer, the optimization parameters, and more. See the following code for a subset of the configurations:

edges *=* “,”*.*join(map(*lambda* x: x*.*split(“/”)[*-*1], [file *for* file *in* processed_files *if* “relation” *in* file])) params *=* {‘nodes’ : ‘features.csv’, ‘edges’: edges, ‘labels’: ‘tags.csv’, ’embedding-size’: 64, ‘n-layers’: 2, ‘n-epochs’: 10, ‘optimizer’: ‘adam’, ‘lr’: 1e-2}

For more information about all the hyperparameters and their default values, see in the src/sagemaker/FD_SL_DGL/gnn_fraud_detection_dgl folder.

Model training with SageMaker

After the customized container Docker image is built, we use the preprocessed data to train our GNN model with the hyperparameters we defined. The training job uses the DGL, with PyTorch as the backend deep learning framework, to construct and train the GNN. SageMaker makes it easy to train GNN models with the customized Docker image, which is an input argument of the SageMaker estimator. For more information about training GNNs with the DGL on SageMaker, see Train a Deep Graph Network.

The SageMaker Python SDK uses Estimator to encapsulate training on SageMaker, which runs SageMaker-compatible custom Docker containers, enabling you to run your own ML algorithms by using the SageMaker Python SDK. The following code snippet demonstrates training the model with SageMaker (either in a local environment or cloud instances):

from sagemaker.estimator import Estimator from time import strftime, gmtime from sagemaker.local import LocalSession localSageMakerSession = LocalSession(boto_session=boto3.session.Session(region_name=current_region)) estimator = Estimator(image_uri=image_name, role=sagemaker_exec_role, instance_count=1, instance_type=’local’, hyperparameters=params, output_path=output_path, sagemaker_session=localSageMakerSession) training_job_name = “{}-{}”.format(‘GNN-FD-SL-DGL-Train’, strftime(“%Y-%m-%d-%H-%M-%S”, gmtime())) print(training_job_name){‘train’: processed_data}, job_name=training_job_name)

After training, the GNN model’s performance on the test set is displayed like the following outputs. The RGCN model normally can achieve around 0.87 AUC and more than 95% accuracy. For a comparison of the RGCN model with other ML models, refer to the Results section of the previous blog post for more details.

Epoch 00099 | Time(s) 7.9413 | Loss 0.1023 | f1 0.3745 Metrics Confusion Matrix: labels positive labels negative predicted positive 4343 576 predicted negative 13494 454019 f1: 0.3817, precision: 0.8829, recall: 0.2435, acc: 0.9702, roc: 0.8704, pr: 0.4782, ap: 0.4782 Finished Model training

Upon the completion of model training, SageMaker packs the trained model along with other assets, including the trained node embeddings, into a ZIP file and then uploads it to a specified S3 location. Next, we discuss the deployment of the trained model for real-time fraudulent detection.

GNN model deployment

SageMaker makes the deployment of trained ML models simple. In this stage, we use the SageMaker PyTorchModel class to deploy the trained model, because our DGL model depends on PyTorch as the backend framework. You can find the deployment code in the src/sagemaker/03.FD_SL_Endpoint_Deployment.ipynb file.

Besides the trained model file and assets, SageMaker requires an entry point file for the deployment of a customized model. The entry point file is run and stored in the memory of an inference endpoint instance to respond to the inference request. In our case, the entry point file is the file in the src/sagemaker/FD_SL_DGL/code folder, which performs four major functions:

  • Receive requests and parse contents of requests to obtain the to-be-predicted nodes and their associated data
  • Convert the data to a DGL heterogeneous graph as input for the RGCN model
  • Perform the real-time inference via the trained RGCN model
  • Return the prediction results to the requester

Following SageMaker conventions, the first two functions are implemented in the input_fn method. See the following code (for simplicity, we delete some commentary code):

def input_fn(request_body, request_content_type=’application/json’): # ——————— receive request ———————————————— # input_data = json.loads(request_body) subgraph_dict = input_data[‘graph’] n_feats = input_data[‘n_feats’] target_id = input_data[‘target_id’] graph, new_n_feats, new_pred_target_id = recreate_graph_data(subgraph_dict, n_feats, target_id) return (graph, new_n_feats, new_pred_target_id)

The constructed DGL graph and features are then passed to the predict_fn method to fulfill the third function. predict_fn takes two input arguments: the outputs of input_fn and the trained model. See the following code:

def predict_fn(input_data, model): # ——————— Inference ———————————————— # graph, new_n_feats, new_pred_target_id = input_data with th.no_grad(): logits = model(graph, new_n_feats) res = logits[new_pred_target_id].cpu().detach().numpy() return res[1]

The model used in perdict_fn is created by the model_fn method when the endpoint is called the first time. The function model_fn loads the saved model file and associated assets from the model_dir argument and the SageMaker model folder. See the following code:

def model_fn(model_dir): # —————— Loading model ——————- ntype_dict, etypes, in_size, hidden_size, out_size, n_layers, embedding_size = initialize_arguments(os.path.join(BASE_PATH, ‘metadata.pkl’)) rgcn_model = HeteroRGCN(ntype_dict, etypes, in_size, hidden_size, out_size, n_layers, embedding_size) stat_dict = th.load(‘model.pth’) rgcn_model.load_state_dict(stat_dict) return rgcn_model

The output of the predict_fn method is a list of two numbers, indicating the logits for class 0 and class 1, where 0 means legitimate and 1 means fraudulent. SageMaker takes this list and passes it to an inner method called output_fn to complete the final function.

To deploy our GNN model, we first wrap the GNN model into a SageMaker PyTorchModel class with the entry point file and other parameters (the path of the saved ZIP file, the PyTorch framework version, the Python version, and so on). Then we call its deploy method with instance settings. See the following code:

env = { ‘SAGEMAKER_MODEL_SERVER_WORKERS’: ‘1’ } print(f’Use model {repackged_model_path}’) sagemakerSession = sm.session.Session(boto3.session.Session(region_name=current_region)) fd_sl_model = PyTorchModel(model_data=repackged_model_path, role=sagemaker_exec_role, entry_point=’./FD_SL_DGL/code/’, framework_version=’1.6.0′, py_version=’py3′, predictor_cls=JSONPredictor, env=env, sagemaker_session=sagemakerSession) fd_sl_predictor *=* fd_sl_model*.*deploy(instance_type*=*’ml.c5.4xlarge’, initial_instance_count*=*1,)

The preceding procedures and code snippets demonstrate how to deploy your GNN model as an online inference endpoint from a Jupyter notebook. However, for production, we recommend using the previously mentioned MLOps pipeline orchestrated by Step Functions for the entire workflow, including processing data, training the model, and deploying an inference endpoint. The entire pipeline is implemented by an AWS CDK application, which can be easily replicated in different Regions and accounts.

Real-time inference

When a new transaction arrives, to perform real-time prediction, we need to complete four steps:

  1. Node and edge insertion – Extract the transaction’s information such as the TransactionID and ProductCD as nodes and edges, and insert the new nodes into the existing graph data stored at the Neptune database.
  2. Subgraph extraction – Set the to-be-predicted transaction node as the center node, and extract a n-hop subgraph according to the GNN model’s input requirements.
  3. Feature extraction – For the nodes and edges in the subgraph, extract their associated features.
  4. Call the inference endpoint – Pack the subgraph and features into the contents of a request, then send the request to the inference endpoint.

In this solution, we implement a RESTful API to achieve real-time fraudulent predication described in the preceding steps. See the following pseudo-code for real-time predictions. The full implementation is in the complete source code file.

For prediction in real time, the first three steps require lower latency. Therefore, a graph database is an optimal choice for these tasks, particularly for the subgraph extraction, which could be achieved efficiently with graph database queries. The underline functions that support the pseudo-code are based on Neptune’s gremlin queries.

def handler(event, context): graph_input = GraphModelClient(endpoints) # Step 1: node and edge insertion trans_dict, identity_dict, target_id, transaction_value_cols, union_li_cols = load_data_from_event(event, transactions_id_cols, transactions_cat_cols, dummied_col) graph_input.insert_new_transaction_vertex_and_edge(trans_dict, identity_dict , target_id, vertex_type = ‘Transaction’) # Setp 2: subgraph extraction subgraph_dict, transaction_embed_value_dict = graph_input.query_target_subgraph(target_id, trans_dict, transaction_value_cols, union_li_cols, dummied_col) # Step 3 & 4: feature extraction & call the inference endpoint transaction_id = int(target_id[(target_id.find(‘-‘)+1):]) pred_prob = invoke_endpoint_with_idx(endpointname = ENDPOINT_NAME, target_id = transaction_id, subgraph_dict = subgraph_dict, n_feats = transaction_embed_value_dict) function_res = { ‘id’: event[‘transaction_data’][0][‘TransactionID’], ‘flag’: pred_prob > MODEL_BTW, ‘pred_prob’: pred_prob } return function_res

One caveat about real-time fraud detection using GNNs is the GNN inference mode. To fulfill real-time inference, we need to convert the GNN model inference from transductive mode to inductive mode. GNN models in transductive inference mode can’t make predictions for newly appeared nodes and edges, whereas in inductive mode, GNN models can handle new nodes and edges. A demonstration of the difference between transductive and inductive mode is shown in the following figure.

In transductive mode, predicted nodes and edges coexist with labeled nodes and edges during training. Models identify them before inference, and they could be inferred in training. Models in inductive mode are trained on the training graph but need to predict unseen nodes (those in red dotted circles on the right) with their associated neighbors, which might be new nodes, like the gray triangle node on the right.

Our RGCN model is trained and tested in transductive mode. It has access to all nodes in training, and also trained an embedding for each featureless node, such as IP address and card types. In the testing stage, the RGCN model uses these embeddings as node features to predict nodes in the test set. When we do real-time inference, however, some of the newly added featureless nodes have no such embeddings because they’re not in the training graph. One way to tackle this issue is to assign the mean of all embeddings in the same node type to the new nodes. In this solution, we adopt this method.

In addition, this solution provides a web portal (as seen in the following screenshot) to demonstrate real-time fraudulent predictions from business operators’ perspectives. It can generate the simulated online transactions, and provide a live visualization of detected fraudulent transaction information.

Clean up

When you’re finished exploring the solution, you can clean the resources to avoid incurring charges.


In this post, we showed how to build a GNN-based real-time fraud detection solution using SageMaker, Neptune, and the DGL. This solution has three major advantages:

  • It has good performance in terms of prediction accuracy and AUC metrics
  • It can perform real-time inference via a streaming MLOps pipeline and SageMaker endpoints
  • It automates the total deployment process with the provided CloudFormation template so that interested developers can easily test this solution with custom data in their account

For more details about the solution, see the GitHub repo.

After you deploy this solution, we recommend customizing the data processing code to fit your own data format and modify the real-time inference mechanism while keeping the GNN model unchanged. Note that we split the real-time inference into four steps without further optimization of the latency. These four steps take a few seconds to get a prediction on the demo dataset. We believe that optimizing the Neptune graph data schema design and queries for subgraph and feature extraction can significantly reduce the inference latency.

About the authors

Jian Zhang is an applied scientist who has been using machine learning techniques to help customers solve various problems, such as fraud detection, decoration image generation, and more. He has successfully developed graph-based machine learning, particularly graph neural network, solutions for customers in China, USA, and Singapore. As an enlightener of AWS’s graph capabilities, Zhang has given many public presentations about the GNN, the Deep Graph Library (DGL), Amazon Neptune, and other AWS services.

Mengxin Zhu is a manager of Solutions Architects at AWS, with a focus on designing and developing reusable AWS solutions. He has been engaged in software development for many years and has been responsible for several startup teams of various sizes. He also is an advocate of open-source software and was an Eclipse Committer.

Haozhu Wang is a research scientist at Amazon ML Solutions Lab, where he co-leads the Reinforcement Learning Vertical. He helps customers build advanced machine learning solutions with the latest research on graph learning, natural language processing, reinforcement learning, and AutoML. Haozhu received his PhD in Electrical and Computer Engineering from the University of Michigan.


Continue Reading


New – AWS Private 5G – Build Your Own Private Mobile Network

Back in the mid-1990’s, I had a young family and 5 or 6 PCs in the basement. One day my son Stephen and I bought a single box that contained a bunch of 3COM network cards, a hub, some drivers, and some cables, and spent a pleasant weekend setting up our first home LAN. Introducing…




Back in the mid-1990’s, I had a young family and 5 or 6 PCs in the basement. One day my son Stephen and I bought a single box that contained a bunch of 3COM network cards, a hub, some drivers, and some cables, and spent a pleasant weekend setting up our first home LAN.

Introducing AWS Private 5G
Today I would like to introduce you to AWS Private 5G, the modern, corporate version of that very powerful box of hardware and software. This cool new service lets you design and deploy your own private mobile network in a matter of days. It is easy to install, operate, and scale, and does not require any specialized expertise. You can use the network to communicate with the sensors & actuators in your smart factory, or to provide better connectivity for handheld devices, scanners, and tablets for process automation.

The private mobile network makes use of CBRS spectrum. It supports 4G LTE (Long Term Evolution) today, and will support 5G in the future, both of which give you a consistent, predictable level of throughput with ultra low latency. You get long range coverage, indoors and out, and fine-grained access control.

AWS Private 5G runs on AWS-managed infrastructure. It is self-service and API-driven, and can scale with respect to geographic coverage, device count, and overall throughput. It also works nicely with other parts of AWS, and lets you use AWS Identity and Access Management (IAM) to control access to both devices and applications.

Getting Started with AWS Private 5G
To get started, I visit the AWS Private 5G Console and click Create network:

I assign a name to my network (JeffCell) and to my site (JeffSite) and click Create network:

The network and the site are created right away. Now I click Create order:

I fill in the shipping address, agree to the pricing (more on that later), and click Create order:

Then I await delivery, and click Acknowledge order to proceed:

The package includes a radio unit and ten SIM cards. The radio unit requires AC power and wired access to the public Internet, along with basic networking (IPv4 and DHCP).

When the order arrives, I click Acknowledge order and confirm that I have received the desired radio unit and SIMs. Then I engage a Certified Professional Installer (CPI) to set it up. As part of the installation process, the installer will enter the latitude, longitude, and elevation of my site.

Things to Know
Here are a couple of important things to know about AWS Private 5G:

Partners – Planning and deploying a private wireless network can be complex and not every enterprise will have the tools to do this work on their own. In addition, CBRS spectrum in the United States requires Certified Professional Installation (CPI) of radios. To address these needs, we are building an ecosystem of partners that can provide customers with radio planning, installation, CPI certification, and implementation of customer use cases. You can access these partners from the AWS Private 5G Console and work with them through the AWS Marketplace.

Deployment Options – In the demo above, I showed you the cloud–based deployment option, which is designed for testing and evaluation purposes, for time-limited deployments, and for deployments that do not use the network in latency-sensitive ways. With this option, the AWS Private 5G Mobile Core runs within a specific AWS Region. We are also working to enable on-premises hosting of the Mobile Core on a Private 5G compute appliance.

CLI and API Access – I can also use the create-network, create-network-site, and acknowledge-order-receipt commands to set up my AWS Private 5G network from the command line. I still need to use the console to place my equipment order.

Scaling and Expansion – Each network supports one radio unit that can provide up to 150 Mbps of throughput spread across up to 100 SIMs. We are working to add support for multiple radio units and greater number of SIM cards per network.

Regions and Locations – We are launching AWS Private 5G in the US East (Ohio), US East (N. Virginia), and US West (Oregon) Regions, and are working to make the service available outside of the United States in the near future.

Pricing – Each radio unit is billed at $10 per hour, with a 60 day minimum.

To learn more, read about AWS Private 5G.



Continue Reading


Build an air quality anomaly detector using Amazon Lookout for Metrics

Today, air pollution is a familiar environmental issue that creates severe respiratory and heart conditions, which pose serious health threats. Acid rain, depletion of the ozone layer, and global warming are also adverse consequences of air pollution. There is a need for intelligent monitoring and automation in order to prevent severe health issues and in…




Today, air pollution is a familiar environmental issue that creates severe respiratory and heart conditions, which pose serious health threats. Acid rain, depletion of the ozone layer, and global warming are also adverse consequences of air pollution. There is a need for intelligent monitoring and automation in order to prevent severe health issues and in extreme cases life-threatening situations. Air quality is measured using the concentration of pollutants in the air. Identifying symptoms early and controlling the pollutant level before it’s dangerous is crucial. The process of identifying the air quality and the anomaly in the weight of pollutants, and quickly diagnosing the root cause, is difficult, costly, and error-prone.

The process of applying AI and machine learning (ML)-based solutions to find data anomalies involves a lot of complexity in ingesting, curating, and preparing data in the right format and then optimizing and maintaining the effectiveness of these ML models over long periods of time. This has been one of the barriers to quickly implementing and scaling the adoption of ML capabilities.

This post shows you how to use an integrated solution with Amazon Lookout for Metrics and Amazon Kinesis Data Firehose to break these barriers by quickly and easily ingesting streaming data, and subsequently detecting anomalies in the key performance indicators of your interest.

Lookout for Metrics automatically detects and diagnoses anomalies (outliers from the norm) in business and operational data. It’s a fully managed ML service that uses specialized ML models to detect anomalies based on the characteristics of your data. For example, trends and seasonality are two characteristics of time series metrics in which threshold-based anomaly detection doesn’t work. Trends are continuous variations (increases or decreases) in a metric’s value. On the other hand, seasonality is periodic patterns that occur in a system, usually rising above a baseline and then decreasing again. You don’t need ML experience to use Lookout for Metrics.

We demonstrate a common air quality monitoring scenario, in which we detect anomalies in the pollutant concentration in the air. By the end of this post, you’ll learn how to use these managed services from AWS to help prevent health issues and global warming. You can apply this solution to other use cases for better environment management, such as detecting anomalies in water quality, land quality, and power consumption patterns, to name a few.

Solution overview

The architecture consists of three functional blocks:

  • Wireless sensors placed at strategic locations to sense the concentration level of carbon monoxide (CO), sulfur dioxide (SO2), and nitrogen dioxide(NO2) in the air
  • Streaming data ingestion and storage
  • Anomaly detection and notification

The solution provides a fully automated data path from the sensors all the way to a notification being raised to the user. You can also interact with the solution using the Lookout for Metrics UI in order to analyze the identified anomalies.

The following diagram illustrates our solution architecture.


You need the following prerequisites before you can proceed with solution. For this post, we use the us-east-1 Region.

  1. Download the Python script ( and data file from the GitHub repo.
  2. Open the live_data.csv file in your preferred editor and replace the dates to be today’s and tomorrow’s date. For example, if today’s date is July 8, 2022, then replace 2022-03-25 with 2022-07-08. Keep the format the same. This is required to simulate sensor data for the current date using the IoT simulator script.
  3. Create an Amazon Simple Storage Service (Amazon S3) bucket and a folder named air-quality. Create a subfolder inside air-quality named historical. For instructions, see Creating a folder.
  4. Upload the live_data.csv file in the root S3 bucket and historical_data.json in the historical folder.
  5. Create an AWS Cloud9 development environment, which we use to run the Python simulator program to create sensor data for this solution.

Ingest and transform data using AWS IoT Core and Kinesis Data Firehose

We use a Kinesis Data Firehose delivery stream to ingest the streaming data from AWS IoT Core and deliver it to Amazon S3. Complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Amazon S3.
  4. For Delivery stream name, enter a name for your delivery stream.
  5. For S3 bucket, enter the bucket you created as a prerequisite.
  6. Enter values for S3 bucket prefix and S3 bucket error output prefix.One of the key points to note is the configuration of the custom prefix that is configured for the Amazon S3 destination. This prefix pattern makes sure that the data is created in the S3 bucket as per the prefix hierarchy expected by Lookout for Metrics. (More on this later in this post.) For more information about custom prefixes, see Custom Prefixes for Amazon S3 Objects.
  7. For Buffer interval, enter 60.
  8. Choose Create or update IAM role.
  9. Choose Create delivery stream.

    Now we configure AWS IoT Core and run the air quality simulator program.
  10. On the AWS IoT Core console, create an AWS IoT policy called admin.
  11. In the navigation pane under Message Routing, choose Rules.
  12. Choose Create rule.
  13. Create a rule with the Kinesis Data Firehose(firehose) action.
    This sends data from an MQTT message to a Kinesis Data Firehose delivery stream.
  14. Choose Create.
  15. Create an AWS IoT thing with name Test-Thing and attach the policy you created.
  16. Download the certificate, public key, private key, device certificate, and root CA for AWS IoT Core.
  17. Save each of the downloaded files to the certificates subdirectory that you created earlier.
  18. Upload to the iot-test-publish folder.
  19. On the AWS IoT Core console, in the navigation pane, choose Settings.
  20. Under Custom endpoint, copy the endpoint.
    This AWS IoT Core custom endpoint URL is personal to your AWS account and Region.
  21. Replace customEndpointUrl with your AWS IoT Core custom endpoint URL, certificates with the name of certificate, and Your_S3_Bucket_Name with your S3 bucket name.
    Next, you install pip and the AWS IoT SDK for Python.
  22. Log in to AWS Cloud9 and create a working directory in your development environment. For example: aq-iot-publish.
  23. Create a subdirectory for certificates in your new working directory. For example: certificates.
  24. Install the AWS IoT SDK for Python v2 by running the following from the command line.
  25. To test the data pipeline, run the following command:

You can see the payload in the following screenshot.

Finally, the data is delivered to the specified S3 bucket in the prefix structure.

The data of the files is as follows:

  • {“TIMESTAMP”:”2022-03-20 00:00″,”LOCATION_ID”:”B-101″,”CO”:2.6,”SO2″:62,”NO2″:57}
  • {“TIMESTAMP”:”2022-03-20 00:05″,”LOCATION_ID”:”B-101″,”CO”:3.9,”SO2″:60,”NO2″:73}

The timestamps show that each file contains data for 5-minute intervals.

With minimal code, we have now ingested the sensor data, created an input stream from the ingested data, and stored the data in an S3 bucket based on the requirements for Lookout for Metrics.

In the following sections, we take a deeper look at the constructs within Lookout for Metrics, and how easy it is to configure these concepts using the Lookout for Metrics console.

Create a detector

A detector is a Lookout for Metrics resource that monitors a dataset and identifies anomalies at a predefined frequency. Detectors use ML to find patterns in data and distinguish between expected variations in data and legitimate anomalies. To improve its performance, a detector learns more about your data over time.

In our use case, the detector analyzes data from the sensor every 5 minutes.

To create the detector, navigate to the Lookout for Metrics console and choose Create detector. Provide the name and description (optional) for the detector, along with the interval of 5 minutes.

Your data is encrypted by default with a key that AWS owns and manages for you. You can also configure if you want to use a different encryption key from the one that is used by default.

Now let’s point this detector to the data that you want it to run anomaly detection on.

Create a dataset

A dataset tells the detector where to find your data and which metrics to analyze for anomalies. To create a dataset, complete the following steps:

  1. On the Amazon Lookout for Metrics console, navigate to your detector.
  2. Choose Add a dataset.
  3. For Name, enter a name (for example, air-quality-dataset).
  4. For Datasource, choose your data source (for this post, Amazon S3).
  5. For Detector mode, select your mode (for this post, Continuous).

With Amazon S3, you can create a detector in two modes:

    • Backtest – This mode is used to find anomalies in historical data. It needs all records to be consolidated in a single file.
    • Continuous – This mode is used to detect anomalies in live data. We use this mode with our use case because we want to detect anomalies as we receive air pollutant data from the air monitoring sensor.
  1. Enter the S3 path for the live S3 folder and path pattern.
  2. For Datasource interval, choose 5 minute intervals.If you have historical data from which the detector can learn patterns, you can provide it during this configuration. The data is expected to be in the same format that you use to perform a backtest. Providing historical data speeds up the ML model training process. If this isn’t available, the continuous detector waits for sufficient data to be available before making inferences.
  3. For this post, we already have historical data, so select Use historical data.
  4. Enter the S3 path of historical_data.json.
  5. For File format, select JSON lines.

At this point, Lookout for Metrics accesses the data source and validates whether it can parse the data. If the parsing is successful, it gives you a “Validation successful” message and takes you to the next page, where you configure measures, dimensions, and timestamps.

Configure measures, dimensions, and timestamps

Measures define KPIs that you want to track anomalies for. You can add up to five measures per detector. The fields that are used to create KPIs from your source data must be of numeric format. The KPIs can be currently defined by aggregating records within the time interval by doing a SUM or AVERAGE.

Dimensions give you the ability to slice and dice your data by defining categories or segments. This allows you to track anomalies for a subset of the whole set of data for which a particular measure is applicable.

In our use case, we add three measures, which calculate the AVG of the objects seen in the 5-minute interval, and have only one dimension, for which pollutants concentration is measured.

Every record in the dataset must have a timestamp. The following configuration allows you to choose the field that represents the timestamp value and also the format of the timestamp.

The next page allows you to review all the details you added and then save and activate the detector.

The detector then begins learning the data streaming into the data source. At this stage, the status of the detector changes to Initializing.

It’s important to note the minimum amount of data that is required before Lookout for Metrics can start detecting anomalies. For more information about requirements and limits, see Lookout for Metrics quotas.

With minimal configuration, you have created your detector, pointed it at a dataset, and defined the metrics that you want Lookout for Metrics to find anomalies in.

Visualize anomalies

Lookout for Metrics provides a rich UI experience for users who want to use the AWS Management Console to analyze the anomalies being detected. It also provides the capability to query the anomalies via APIs.

Let’s look at an example anomaly detected from our air quality data use case. The following screenshot shows an anomaly detected in CO concentration in the air at the designated time and date with a severity score of 93. It also shows the percentage contribution of the dimension towards the anomaly. In this case, 100% contribution comes from the location ID B-101 dimension.

Create alerts

Lookout for Metrics allows you to send alerts using a variety of channels. You can configure the anomaly severity score threshold at which the alerts must be triggered.

In our use case, we configure alerts to be sent to an Amazon Simple Notification Service (Amazon SNS) channel, which in turn sends an SMS. The following screenshots show the configuration details.

You can also use an alert to trigger automations using AWS Lambda functions in order to drive API-driven operations on AWS IoT Core.


In this post, we showed you how easy to use Lookout for Metrics and Kinesis Data Firehose to remove the undifferentiated heavy lifting involved in managing the end-to-end lifecycle of building ML-powered anomaly detection applications. This solution can help you accelerate your ability to find anomalies in key business metrics and allow you focus your efforts on growing and improving your business.

We encourage you to learn more by visiting the Amazon Lookout for Metrics Developer Guide and try out the end-to-end solution enabled by these services with a dataset relevant to your business KPIs.

About the author

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.


Continue Reading


Copyright © 2021 Today's Digital.