Designing a Real-Time ETA Prediction System Using Kafka, DynamoDB and Rockset

July 8, 2020

,
See Rockset
in action

Get a product tour with a Rockset engineer

Recently we've seen a big increase in the use of on-demand logistics services, such as online shopping and food delivery.

Most of these data applications provide a near real-time tracking of the ETA once you place the order. Building a scalable, distributed, and real-time ETA prediction system is a tough task, but what if we could simplify its design? We’ll break our system into pieces such that each component is responsible for one primary job.

Let’s take a look at components that constitute the system.

  1. Delivery driver/rider app - The Android/iOS app installed on a delivery person’s device.
  2. Customer app - The Android/iOS app installed on a customer’s device.
  3. Rockset - The query engine powering all the models and services.
  4. Message queue - Used for transferring data between various components. For this example, we will use Kafka.
  5. Key-value storage - Used for storing orders and parameters for the model. For this example, we will use DynamoDB.



Inputs to the Model

kafka-dynamodb-rockset

Driver Location

To get an accurate ETA estimation, you will need the delivery person’s position, specifically the latitude and longitude. You can get this information easily via GPS in a device. A call to the device GPS provider returns latitude, longitude, and the accuracy of the location in meters.

You can run a background service in the app that retrieves the GPS coordinates every 10 seconds. The coordinates, as such, are too fine-grained to make a prediction. To increase the granularity of the GPS, we will be using the concept of geohash. A geohash is a standardized N-letter hash of a location that represents an area of M sq. miles. N and M are inversely proportional, so a larger N represents a smaller area M. You can refer to this for more info on geohash.

There are tons of libraries available to convert latitude-longitude to geohash. Here we’ll be using geo by davidmoten to get a 6-7 letter geohash.

The service then pushes the geohash along with the coordinates to a Kafka topic. Rockset ingests data from this Kafka topic and updates it into a collection called locations.

Orders

The orders placed by a customer are stored in DynamoDB for further processing. An order generally goes through a life cycle consisting of the following states:

  • CREATED
  • PROCESSING
  • CONFIRMED
  • CANCELED
  • IN TRANSIT
  • DELIVERED

All of the above state changes are updated in DynamoDB along with additional data such as the source location, destination location, order details, etc. Once an order is delivered, the actual time of arrival is also stored in the database.

Rockset also ingests updates from DynamoDB orders table and updates it into a collection called orders.



ML Model

Exponential Smoothing

We have the actual time of arrival along with the source and the destination for order available from the orders table. We will refer to it as TA. You can take the mean of all the TA with source as delivery person's latest location and destination as customer's location, and you can get an approximate ETA. However, this is not that accurate as it doesn't account for changing factors, such as new construction activities in the area or new shorter routes to the destination.

To do that, we need a prediction model that is simplistic and easy to debug and has good accuracy.

This is where exponential smoothing comes into play. An exponentially smoothened value is calculated using the formula:

St = Alpha * Xt + (1 - Alpha) * St-1

where

  • St => Smoothened value at time t
  • Xt => Actual value at time t
  • Alpha => Smoothing factor

In our context, St represents the ETA and Xt represents the most recent actual time of arrival for a source-destination pair in our orders table.

ETAt = Alpha * TAt + (1 - Alpha) * ETAt-1

Rockset

The serving layer for the current system needs to satisfy three primary criteria:

  1. Ability to handle millions of writes per minute - Each delivery person's app will be pushing GPS coordinates every 5-10 seconds, which will lead to a new ETA. A typical large scale food delivery company has almost 100K delivery persons.
  2. The data fetch latency should be minimal - For a great UX, we should be able to update ETA on the customer app as soon as it is updated.
  3. Ability to handle schema changes on the fly - we can store additional metadata such as ETA prediction accuracy and model version in the future. We don't want to create a new data source whenever we add a new field.

Rockset satisfies all of them. It has:

  1. Dynamic Scaling - More resources are added as and when needed to handle large volumes of data.
  2. Distributed Query Processing - Parallelisation of queries across multiple nodes to minimize latency
  3. Schemaless Ingest - to support schema changes on the fly.

Rockset has a built-in connector to Apache Kafka. We can use this Kafka connector to ingest location data of the delivery person.

To perform exponential smoothing in Rockset, we create two Query Lambdas. Query Lambdas in Rockset are named, parameterized SQL queries stored in Rockset that can be executed from a dedicated REST endpoint.

  1. calculate_ETA: The Query Lambda expects alpha, source, and destination as a parameter. It returns an exponentially smoothened ETA. It runs the following query to get the desired result:
SELECT
    (:alpha * SUM(term)) + (POW((1 - :alpha), MAX(idx))* MIN_BY(ta_i, time_i)) as ans
FROM
    (
        (
            SELECT
                order_id,
                ta_i,
                (ta_i * POW((1 - :alpha), (idx - 1))) AS term,
                time_i,
                idx
            FROM
                (
                    SELECT
                        order_id,
                        CAST(ta AS int) as ta_i,
                        time_i,
                        ROW_NUMBER() OVER(
                            ORDER BY
                                time_i DESC, order_id ASC
                        ) AS idx
                    FROM
                        commons.orders_fixed
                    WHERE
                        source_geohash = :source
                        AND
                        destination_geohash = :destination
                    ORDER BY
                        time_i DESC, order_id ASC
                ) AS idx
        ) AS terms
    )
  1. calculate_speed: This Query Lambda requires order_id as param and returns the average speed of the delivery person while in transit. It runs the following query:
SELECT
    SUM(ST_DISTANCE(prev_geo, geo) /(ts - prev_ts)) / COUNT(*) AS speed
FROM
    (
        SELECT
            geo,
            LEAD(geo, 1) OVER(
                ORDER BY
                    ts DESC
            ) AS prev_geo,
            ts,
            LEAD(ts, 1) OVER(
                ORDER BY
                    ts DESC
            ) AS prev_ts
        FROM
            (
                SELECT
                    ST_GEOGPOINT(CAST(lng AS double), CAST(lat AS double)) AS geo,
                    order_id,
                    CAST(timestamp as int) AS ts
                FROM
                    commons.locations
                WHERE
                    order_id = :order_id
            ) AS ts
    ) As speed



Predict the ETA

predict-eta

The customer app initiates the request to predict the ETA. It passes the order id in the API call.

The request goes to the query service. Query service performs the following functions:

  1. Fetch the latest smoothing factors Alpha and Beta from DynamoDB. Here, Alpha is the smoothing parameter and Beta is the weight assigned to historical ETA while calculating the final ETA. Refer step 6 for more details
  2. Fetch the destination geohash for the order id.
  3. Fetch the current driver geohash from the locations collection.
  4. Trigger calculate_ETA Query Lamba in Rockset with smoothing factor alpha as param and driver geohash as source geohash and destination geohash from step 2. Let’s call this historical ETA.
curl --request POST \
--url https://api.rs2.usw2.rockset.com/v1/orgs/self/ws/commons/lambdas/calculateETA/versions/f7d73fb5a786076c \
-H 'Authorization: YOUR ROCKSET API KEY' \
-H 'Content-Type: application/json' \
  -d '{
    "parameters": [
      {
        "name": "alpha",
        "type": "float",
        "value": "0.7"
      },
      {
        "name": "destination",
        "type": "string",
        "value": "tdr38d"
      },
      {
        "name": "source",
        "type": "string",
        "value": "tdr706"
      }
    ]
  }'
  1. Trigger calculate_speed Query Lambda in Rockset with current order id as param
curl --request POST \
--url https://api.rs2.usw2.rockset.com/v1/orgs/self/ws/commons/lambdas/calculate_speed/versions/cadaf89cba111c06 \
-H 'Authorization: YOUR ROCKSET API KEY' \
-H 'Content-Type: application/json' \
  -d '{
    "parameters": [
      {
        "name": "order_id",
        "type": "string",
        "value": "abc"
      }
    ]
  }'
  1. The predicted ETA is then calculated by query service as

Predicted ETA = Beta * (historical ETA) + (1 - Beta) * distance(driver, destination)/speed

The predicted ETA is then returned to the customer app.



Feedback Loop

ML models require retraining so that their predictions are accurate. In our scenario, it is quite necessary to re-train the ML model so as to account for changing weather conditions, festivals, etc. This is where the parameter tuning service comes into play.

Parameter Tuning Service

parameter-tuning-service

Once an ETA is predicted, you can store the predicted ETA, and the actual ETA in a collection called predictions. The primary motivation to store this data in Rockset instead of any other datastore is to create a real-time dashboard for measuring the accuracy of the model. This is needed to make sure the customers do not see absurd ETA values in their apps.

The next question is how to determine the smoothing factor Alpha. To solve this issue, we create a parameter tuning service, which is just a Flink batch Job. We fetch all the historical ETAs and TAs for orders for the past 7-30 days. We use the difference in these ETAs to calculate appropriate Alpha and Beta values. This can be done using a simple model such as logistic regression.

Once the service calculates the Alpha and Beta parameters, they are stored in DynamoDB in a table named smoothing_parameters. The query service fetches the parameters from this table when it receives a request from the consumer app.

You can train the parameter tuning model once a week using ETA data in locations collection.



Conclusion

The architecture is designed to handle more than a million requests per minute while being flexible enough to support the scaling of the application on the fly. The architecture also allows developers to switch or insert components such as adding new features (e.g. weather) or adding a filter layer to refine the ETA predictions. Here, Rockset helps us solve three primary requirements:

  1. Low-latency complex queries - Rockset allows us to make complicated queries such as exponential smoothing with just an API call. This is done by leveraging Query Lambdas. The Lambdas also support parameters that allow us to query for different locations.
  2. Highly scalable real-time ingestion - If you have approximately 100K drivers on your platform and each of their apps sends a GPS location every 5 seconds, then you are dealing with a throughput of 1.2 million requests per minute. Rockset allows us to query this data within seconds of events occurring.
  3. Data from multiple sources - Rockset allows us to ingest from multiple sources, such as Kafka and DynamoDB, using fully managed connectors that require minimal configuration.




Kartik Khare has been a Data Engineer for 4 years and has also been blogging about deep-dives on Big Data Systems on a personal blog and Medium. He currently works at Walmart Labs where he works on the Realtime ML platforms. Prior to that, he was working for OlaCabs where he was involved in designing realtime surge pricing and recommendation systems.