Batch processing is often employed when grappling with large amounts of data. Historically, this scheme has been pervasive in data engineering tasks: building scalable data pipelines; extracting, transforming, loading (ETL) data from myriad data sources into a central or common data store.
More recently, batch processing has become equally pervasive and increasingly common for machine learning training, tuning or model scoring. Simple common use cases include time-series forecasting; training hundreds or thousands of models, each model for a unique product SKU or a geographical zone, such as zip code or a pick up location, each trained on its specific batch of data; or even a unique customer id for personalized model product recommendations, each batch holding data pertinent to the unique customer id.
In a previous blog, we explained why train hundreds of thousands of models, why Ray is being used for these many models, and how to use different approaches with Ray Core APIs to accomplish this endeavor at scale.
source: https://github.com/justin-hj-kim/NYCtaxi_data_science
In this blog, we continue to demonstrate yet another example of how to conduct batch training on the NYC Taxi Dataset using only Ray Core and stateless Ray tasks. Because Ray tasks are asynchronous and can be embarrassingly parallelized, we will examine two approaches to employ Ray tasks to scale:
Distributed data loading
Centralized data loading
The first approach is distributed data loading. That is, delegate each independent task to read its respective batch into memory, ensuring that the desired data fits into memory. The second approach is centralized data loading. We preload each data partition once into the Ray object store, and extract each batch per location_id, store it into the Ray object store, from which each task fetches its batch data via object references, albeit at a higher cost of memory to reduce reading and training times.
But first, let's peek at our data, understand what relationship we seek to establish among features, what transformation or projection we will need, and what features we want to train on.
The NYC data set contains many columns that are not of much interest to us for our task at hand, so we can discard or filter out, and only focus on columns of interest. For our use case, we want to establish a relationship between the pickup location and drop off location, and the trip duration. Because each drop off and pickup location relationship will vary at different times of the day, we need to train a separate model, each for a combination of pickup location-month combination as a batch, as shown in the Figure 1.
It turns out that the data is already partitioned into each month and year, so we can use the pickup_location_id
column in the dataset to project and group it into respective data batches. Using these features, we can fit three distinct scikit-learn linear models for each batch and choose the best MAE score.
We used Anyscale Ray clusters with the configurations described below for this machine learning workload. Anyscale provides a seamlessly easy way to configure, provision, launch, autoscale, and manage Ray clusters, along with insightful Ray Dashboard metrics to gauge and observe your jobs’ progress. For example, see all the Ray dashboard figures below.
Data: In all, we use 18 months of data (the year 2018 and six months of the year 2019), collectively giving us 18 files, each file with ~7M rows, so a total of 126M rows.
Anyscale Ray cluster configuration:
64 CPU cores
4-5 m5.2xlarge AWS EC2 instances
124 GiB RAM + 51.38 GiB Node memory
Models:
Three scikit-learn model linear estimators trained and fitted for each unique pickup_locations_id
LinearRegression,
DecisionTreeRegressor
, DecisionTreeRegressor
, with random splitter
Total models trained 14100 for a total of 4700 unique pickup_locations_id
In this approach, we divide our training of models into three modular and functional work:
Reading the parquet data
Creating Ray tasks to preprocess, train, and evaluate data batches
Dividing data into batches to spawn a Ray task for each batch
Each of these units of functional work is modularized as a Python function, defined in mmd_utils.py.
Using the PyArrow and Parquet push-down predicate, each task reads a parquet file, projects and extracts the batch we want to train and fit the model on, providing all the rows associated with a pickup_location_id
. Achieved through the read_data() function in each Ray task, the function reads data and extracts batches separately.
1def read_data(file: str, pickup_location_id: int) -> pd.DataFrame:
2 """
3 Read a file into a PyArrow table, convert to pandas and return
4 as Pandas DataFrame. Use push-down predicates since PyArrow
5 supports it and only extract the needed fields, filtered
6 on pickup_location_id
7 Args:
8 file: str path to the parquet file containing data
9 pickup_location_id: int id to filter out
10 Returns:
11 Pandas DataFrame filtered by pickup_location_id and respective
12 columns
13 """
14 return pq.read_table(
15 file,
16 filters=[("pickup_location_id", "=", pickup_location_id)],
17 columns=[
18 "pickup_at",
19 "dropoff_at",
20 "pickup_location_id",
21 "dropoff_location_id",
22 ],
23 ).to_pandas()
By breaking into batches specific to a pickup_location_id
, we avoid loading the entire partition into memory, preventing OOM errors, and extracting the desired data per pickup_location_id
. Converting it to pandas allows us to train with scikit-learn estimators.
In order to compute a trip duration, we transform our batch data pick and drop off times in standard date format to compute our duration, the time we want to predict. As part of this transformation, we fill in any missing entries. This transformation is done per task per its respective batch.
1def transform_batch(df: pd.DataFrame) -> pd.DataFrame:
2 """
3 Given a Pandas DataFrame as an argument, tranform time format
4 for the pickup and drop times, and return the transformed Pandas
5 DataFrame. Having the duration in only seconds helps for easy
6 math operations.
7 Args:
8 df: Pandas DataFrame to be transformed
9 Returns:
10 a transformed Pandas DataFrame with time formats and duration
11 in seconds as an additonal column
12 """
13 df["pickup_at"] = pd.to_datetime(
14 df["pickup_at"], format="%Y-%m-%d %H:%M:%S"
15 )
16 df["dropoff_at"] = pd.to_datetime(
17 df["dropoff_at"], format="%Y-%m-%d %H:%M:%S"
18 )
19 df["trip_duration"] = (df["dropoff_at"] - df["pickup_at"]).dt.seconds
20 df["pickup_location_id"] = df["pickup_location_id"].fillna(-1)
21 df["dropoff_location_id"] = df["dropoff_location_id"].fillna(-1)
22 return df
Once transformed, we use the transformed DataFrame to fit and score the scikit-learn model and calculate the mean absolute error (MAE) on the validation set, giving us a simple regression model to predict the relationship between the pick up and drop-off location and the trip duration.
1@ray.remote
2def fit_and_score_sklearn(
3 train: pd.DataFrame, test: pd.DataFrame, model: BaseEstimator
4) -> Tuple[BaseEstimator, float]:
5 """
6 A Ray remote task that fits and scores a sklearn model base estimator with the train and test
7 data set supplied. Each Ray task will train on its respective batch of dataframe comprised of
8 a pickup_location_id.The model will establish a linear relationship between the dropoff location
9 and the trip duration.
10 Args:
11 train: Pandas DataFrame training data
12 test: Pandas DataFrame test data
13 model: sklearn BaseEstimator
14 Returns:
15 a Tuple of a fitted model and its corrosponding mean absolute error (MAE)
16 """
17 train_X = train[["dropoff_location_id"]]
18 train_y = train["trip_duration"]
19 test_X = test[["dropoff_location_id"]]
20 test_y = test["trip_duration"]
21
22 # Start training.
23 model = model.fit(train_X, train_y)
24 pred_y = model.predict(test_X)
25 error = round(mean_absolute_error(test_y, pred_y), 3)
26 return model, error
Finally, we define a train_and_evaluate()
Ray task that embodies all necessary logic to load a data batch, transform it, split it into train and test, and fit and evaluate models on it. Returning a tuple to the file path and location id used for training can map the fitted models back to pick_location_id
for experimental tracking, say with MLflow or Weight & Biases.
To load and transform data, we use the read_data()
and transform_batch()
functions. For blog brevity, see the code details in mmd_utils.py. The driver notebook mmd_tasks.ipynb runs and trains the models in incremental batches of three files, culminating to a total of 18 files, prints the cumulative stats, and plots training times. There is an equivalent Python command line driver mmd_tasks.py that produces the same results. (See the figures below.)
Now, let's consider an optimized approach, working with the same data and workload but with lesser training times by using Ray’s object store.
This approach assumes two things: 1) you’ve sufficient memory for the object store and enough memory in each node in the cluster and 2) you don’t mind a higher memory cost with the merits and benefits of lower execution and training times.
We optimize by loading and processing each partition into memory, extract all the relevant batches for the pick_location_id
, and store the batches’ references into Ray’s object store.
At the heart of this optimization is the read_into_object_store()
function. Four optimization techniques are used here: 1) delay calling ray.get() until necessary or when batch data is needed 2) the function yields or returns a ray.ObjectRefGenerator
3) the returned object reference generator, used as an iterator in the calling function to yield object ref, is sent to another remote Ray task train_and_evaluate_optimized.remote(...)
, and 4) use the SPREAD scheduling strategy to load each file on a separate node as an OOM safeguard.
1@ray.remote(num_returns="dynamic")
2def read_into_object_store(file: str) -> ray.ObjectRefGenerator:
3 """
4 This function creates a Ray Task that is a generator, returning an
5 object reference generator. Read the table from the file. It stores the data
6 into the Ray object store.
7 Args:
8 str path to the file name
9 Returns:
10 Yields Ray Object reference as tuple of pickup_id and associated batch data
11 """
12 # print(f"Loading {file} into arrow table")
13 # Read the entire single file into memory.
14 try:
15 locdf = pq.read_table(
16 file,
17 columns=[
18 "pickup_at",
19 "dropoff_at",
20 "pickup_location_id",
21 "dropoff_location_id",
22 ],
23 )
24 # print(f"Size of pyarrow table: {locdf.shape}")
25 except Exception:
26 return []
27
28 pickup_location_ids = locdf["pickup_location_id"].unique()
29
30 for pickup_location_id in pickup_location_ids:
31 # Each id-data batch tuple will be put as a separate object into the Ray object store,
32 # part of the yield statement
33 # Cast PyArrow scalar to Python if needed.
34 try:
35 pickup_location_id = pickup_location_id.as_py()
36 except Exception:
37 pass
38
39 yield (
40 pickup_location_id,
41 locdf.filter(
42 pc.equal(locdf["pickup_location_id"], pickup_location_id)
43 ).to_pandas(),
44 )
Together, these optimization techniques allow for the batch data to stay in the object store until it is actually needed.
Again, for blog brevity, we refer to the relevant functions’ code defined in mmo_utils.py, particularly the read_into_object_store()
and train_and_evaluate_optimized.remote(...)
functions. Like its counterpart approach 1 above, there are corresponding Python drivers command line and notebook mmo_tasks.py and mmo_tasks.ipynb respectively.
With an optimized approach of using Ray’s central object store, we see an average training time per batch of files approximately 3-5X faster than the previous approach, with an incremental number of files processed. See Figure 4.
In the Ray Dashboard screenshots for approaches 1 and 2 below, you can observe the respective difference in utilization metrics for each resource, particularly node memory and object store.
While resources such as number of tasks, nodes, and CPU cores usage are similar across both approaches, we see more node memory consumption, hardly any Ray object store usage, and more CPU utilization, which explains why the training times take longer for approach 1.
For example, the last batch of all 18 files takes about 158 seconds to train 14100 models for a total of 4700 unique pickup locations.
By contrast, we see more Ray object store memory usage, along with node memory spikes, at a benefit of significantly lower training times, in order of ~3-5X faster. Compared to approach 1, the training time for the last batch of 18 files is only 35 seconds (hardly discernible in the above bar graph) compared to approach 1’s 158 seconds.
Although this second approach was ~5X faster to train all models for all pickup locations, one caveat to keep in mind is that if the dataset partitions were larger and unable to fit into our cluster memory, we would have to resort to distributed reading of data, by further repartitioning parquet data files into smaller files that can fit into memory. That is, approach 1.
Both patterns, using Ray Core, are viable approaches to scale many models training on a specific feature attribute. Both patterns above showcase how to accomplish the use case and workloads to train many models to a single feature in the training set. Which one should you choose depends on your use case and size of the dataset. In either case, we recommend trying both and evaluating what works best for you, given your available cluster resources.
Lastly, if you have a similar Ray use case or story, our Ray Summit 2023 CfP is open for call to proposals. Submit your Ray story and share with the growing global Ray community. Also, we have a regular cadence of monthly Ray Meetups. Do join us to learn how the community is using Ray and Anyscale to scale their machine learning workloads.