Ray Data

In this notebook we are going to explore the following topics:

  • Ray Datasets;

  • Ray Data - Api (Read, add columns, groupby operations, etc);

The dataset used in this notebook is in the following dir /leonardo_scratch/fast/tra26_castiel2/data/ray_data.

Introduction to Ray Data

Ray data is a Ray component based on the Ray Core functionalities presented in the notebook 1_ray_core.ipynb. Ray data provides functions for scalable data processing, batch inference, and distributed training. For an example about video processing, check this.

The basic building block of Ray Data are Ray Datasets. Datasets are abstractions used to represent a distributed data collection.

IMPORTANT NOTE: At the moment, Ray Data doesn’t support the presence of a Ray Client connecting to a remote cluster. To solve this issue, all ray.data operations will be encapsuled inside Ray tasks. In this way, the task will be sent to the cluster and the ray data operations will be executed by the worker nodes.

Imports

from datetime import datetime

import ray
from ray.util.actor_pool import ActorPool
import ray.data

import pandas as pd
import matplotlib.pyplot as plt
/leonardo_work/tra26_castiel2/mviscia1/ray_rag_venv/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
2026-01-29 08:02:15,848	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-01-29 08:02:33,850	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.

Environment configuration

ray.init(log_to_driver = False, ignore_reinit_error = True)
print(f"Notebook started {datetime.now()}")
2026-01-29 08:02:37,326	INFO worker.py:1520 -- Using address 10.11.1.34:23707 set in the environment variable RAY_ADDRESS
2026-01-29 08:02:37,327	INFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.11.1.34:23707...
2026-01-29 08:02:37,336	INFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at http://10.11.1.34:8265 
Notebook started 2026-01-29 08:02:38.866136
resources = ray.cluster_resources()
print(f"Cluster has {resources['CPU']} CPUs, {resources['GPU'] if 'GPU' in resources else 0} GPUs, execution memory {resources['memory'] * 1e-9} GBs, object storage memory {resources['object_store_memory'] * 1e-9} GBs")
Cluster has 8.0 CPUs, 1.0 GPUs, execution memory 361.684707328 GBs, object storage memory 155.007731712 GBs

Ray Data - Datasets

Ray Data supports data ingestion out of the box for the most common data formats (i.e. parquet, csv, json files, images, binary files, etc), and it supports reading data from cloud storage devices.
To speed up the reading operations, you want your data in a location readable from all workers. In this way, the loading will be parallelized using all the available workers.

DATA_PATH = "/leonardo_scratch/fast/tra26_castiel2/data/ray_data"

df = ray.data.read_csv(DATA_PATH).limit(1e6).repartition(int(resources["CPU"])).materialize()
df
2026-01-29 08:02:47,655	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:47,656	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> LimitOperator[limit=1000000.0] -> AllToAllOperator[Repartition]
Running 0: 0.00 row [00:00, ? row/s]
eadCSV 1: 0.00 row [00:00, ? row/s]

t=1000000.0 2: 0.00 row [00:00, ? row/s]


tion 3: 0.00 row [00:00, ? row/s]



Running Dataset. Active & requested resources: 5/8 CPU, 1.2GB/72.2GB object store: : 0.00 row [00:01, ? row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1253; Resources: 5.0 CPU, 1.2GB object store: : 0.00 row [00:01, ? row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1253; Resources: 5.0 CPU, 1.2GB object store: : 0.00 row [00:01, ? row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [00:01, ? row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [00:01, ? row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 0 rows output: : 0.00 row [00:01, ? row/s]


Running Dataset. Active & requested resources: 5/8 CPU, 1.2GB/72.2GB object store: : 0.00 row [00:02, ? row/s]0:01, ? row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1253; Resources: 5.0 CPU, 1.2GB object store: : 0.00 row [00:02, ? row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [00:02, ? row/s]


Running Dataset. Active & requested resources: 5/8 CPU, 1.2GB/72.2GB object store: : 0.00 row [00:03, ? row/s]0:02, ? row/s]



epartition:   0%|          | 0.00/1.00 [00:03<?, ? row/s]



epartition:   0%|          | 0.00/1.00M [00:03<?, ? row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1248; Resources: 5.0 CPU, 489.9MB object store: : 0.00 row [00:03, ? row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1248; Resources: 5.0 CPU, 489.9MB object store: : 8.93M row [00:03, 2.54M row/s]
eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1248; Resources: 5.0 CPU, 489.9MB object store: : 8.93M row [00:03, 2.54M row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store: : 0.00 row [00:03, ? row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store:   0%|          | 0.00/1.00M [00:03<?, ? row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 1000000.0 rows output: : 0.00 row [00:03, ? row/s]


                                                                                                                row [00:03, ? row/s]
                                                                                                                                 

                                                                                                                                        


                                                                                                                           



✔️  Dataset execution finished in 4.19 seconds: 100%|██████████| 1.00M/1.00M [00:04<00:00, 239k row/s]

eadCSV: Tasks: 5 [backpressured]; Queued blocks: 1248; Resources: 5.0 CPU, 489.9MB object store: : 8.93M row [00:03, 2.54M row/s]

t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 1000000.0 rows output: : 0.00 row [00:03, ? row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]
eadCSV: Tasks: 0; Queued blocks: 1248; Resources: 0.0 CPU, 489.9MB object store: : 8.93M row [00:03, 2.54M row/s]                
eadCSV: Tasks: 0; Queued blocks: 1248; Resources: 0.0 CPU, 489.9MB object store:  50%|█████     | 8.93M/17.9M [00:03<00:03, 2.54M row/s]
eadCSV: Tasks: 0; Queued blocks: 1248; Resources: 0.0 CPU, 489.9MB object store: 100%|██████████| 17.9M/17.9M [00:03<00:00, 2.54M row/s]
                                                                                                                                        

                                                                                                                                        


                                                                                                                           



- ReadCSV: Tasks: 0; Queued blocks: 1248; Resources: 0.0 CPU, 489.9MB object store: 100%|██████████| 17.9M/17.9M [00:03<00:00, 5.03M row/s]


t=1000000.0: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 1000000.0 rows output: : 0.00 row [00:03, ? row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]

t=1000000.0: Tasks: 0; Queued blocks: 5; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]  

t=1000000.0: Tasks: 0; Queued blocks: 5; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 284k row/s]

                                                                                                                                      


                                                                                                                           



- limit=1000000.0: Tasks: 0; Queued blocks: 5; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:03<00:00, 281k row/s]



tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 1000000.0 rows output: : 0.00 row [00:03, ? row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store; 1000000 rows output: : 0.00 row [00:03, ? row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store; 1000000 rows output:   0%|          | 0.00/1.00M [00:03<?, ? row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store; 1000000 rows output: 100%|██████████| 1.00M/1.00M [00:03<00:00, 281k row/s]


tion: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store; 1000000 rows output: 100%|██████████| 1.00M/1.00M [00:03<00:00, 281k row/s]


                                                                                                                                                      



- Repartition: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 54.9MB object store; 1000000 rows output: 100%|██████████| 1.00M/1.00M [00:03<00:00, 280k row/s]




epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]



epartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 285k row/s]



  *- Split Repartition: 100%|██████████| 1.00M/1.00M [00:03<00:00, 280k row/s]
MaterializedDataset(
   num_blocks=8,
   num_rows=1000000,
   schema={
      human_readable_ts: timestamp[s],
      id: int64,
      state: string,
      transaction_amount: double,
      transaction_category: string,
      unix_timestamp: int64
   }
)

Ray Data - API

In the previous cells a million rows were read in parallell. By default all the operations on datasets are executed lazily, to force the ingestion of the dataset we can use the materialize method. This triggers the execution of the read_csv method and pins the dataset blocks in the distributed object storage. Other operations which trigger data reading are operations like take_batch, write_csv, etc.

Datasets in Ray are divided into blocks that are processed independently. Each block is read into memory as required, making it possible to handle datasets larger than memory by processing blocks in parallel on different workers.

We can obtain information about the dataset using the following commands:

  • .columns(): get a list with the colnames;

  • .count(): get a count of the rows included in the dataset;

  • .size_bytes(): get the size in bytes of the dataset;

  • .num_blocks(): get the number of blocks in which the dataset is scattered.

cols = df.columns()
rows = df.count()
bytes = df.size_bytes()
n_blocks = df.num_blocks()
print(f"Dataset has {len(cols)} columns, {rows} rows. Dataset memory occupation is {bytes * 1e-9:.2f}GB and is scattered in {n_blocks} blocks.")
Dataset has 6 columns, 1000000 rows. Dataset memory occupation is 0.06GB and is scattered in 8 blocks.

With the take_batch method we can ask a batch of dataset rows, this is very useful if you need to debug something.
The batch_format parameters is used to express how you want your batch to be returned. Supported batch formats are:

  • pandas;

  • numpy;

  • default (dictionay).

df.take_batch(10, batch_format = "pandas")
2026-01-29 08:02:52,031	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:52,032	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> LimitOperator[limit=10]
Running 0: 0.00 row [00:00, ? row/s]
                                                                             
✔️  Dataset execution finished in 0.02 seconds: : 10.0 row [00:00, 531 row/s]

imit=10 1: 0.00 row [00:00, ? row/s]
imit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 581.0B object store: : 0.00 row [00:00, ? row/s]
imit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 581.0B object store: : 10.0 row [00:00, 471 row/s]
- limit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 581.0B object store: : 10.0 row [00:00, 418 row/s]
human_readable_ts id state transaction_amount transaction_category unix_timestamp
0 2011-01-04 12:26:51 445278366 Texas 48.804656 Utilities 1294140411
1 2004-08-20 04:32:17 445278367 Mississippi 67.424309 Gas Station 1092969137
2 1982-03-12 20:27:34 445278368 Wisconsin 29.831828 Electronics 384809254
3 1985-07-16 02:22:11 445278369 Virginia 51.495818 Utilities 490321331
4 2023-02-07 06:43:35 445278370 Ohio 53.365462 Electronics 1675748615
5 1994-12-15 09:16:08 445278371 Maryland 66.733325 Gas Station 787479368
6 1991-03-17 12:30:34 445278372 Pennsylvania 62.746037 Restaurant 669209434
7 2020-01-26 14:08:10 445278373 Maryland 113.996035 Electronics 1580044090
8 1987-07-04 21:31:10 445278374 Texas 66.904820 Restaurant 552425470
9 2018-05-08 22:40:20 445278375 Georgia 80.782422 Supermarket 1525812020

We can drop useless columns with drop_columns.

df.columns
<bound method Dataset.columns of MaterializedDataset(
   num_blocks=8,
   num_rows=1000000,
   schema={
      human_readable_ts: timestamp[s],
      id: int64,
      state: string,
      transaction_amount: double,
      transaction_category: string,
      unix_timestamp: int64
   }
)>
df_1 = df.drop_columns(["unix_timestamp"])
df_1.take_batch(10, batch_format="pandas")
2026-01-29 08:02:52,370	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:52,370	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> LimitOperator[limit=10]
Running 0: 0.00 row [00:00, ? row/s]
apBatches(drop_columns) 1: 0.00 row [00:00, ? row/s]

                                                                                                   
                                                    

✔️  Dataset execution finished in 0.08 seconds: 100%|██████████| 10.0/10.0 [00:00<00:00, 117 row/s]

apBatches(drop_columns) 1: 0.00 row [00:00, ? row/s]

t=10 2: 0.00 row [00:00, ? row/s]
apBatches(drop_columns): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [00:00, ? row/s]
apBatches(drop_columns): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store:   0%|          | 0.00/1.00M [00:00<?, ? row/s]
apBatches(drop_columns): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:00<00:00, 11.3M row/s]
                                                                                                                                                   

- MapBatches(drop_columns): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:00<00:00, 10.9M row/s]


t=10 2: 0.00 row [00:00, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 501.0B object store: : 0.00 row [00:00, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 501.0B object store:   0%|          | 0.00/10.0 [00:00<?, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 501.0B object store: 100%|██████████| 10.0/10.0 [00:00<00:00, 107 row/s]

- limit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 501.0B object store: 100%|██████████| 10.0/10.0 [00:00<00:00, 105 row/s]
human_readable_ts id state transaction_amount transaction_category
0 2011-01-04 12:26:51 445278366 Texas 48.804656 Utilities
1 2004-08-20 04:32:17 445278367 Mississippi 67.424309 Gas Station
2 1982-03-12 20:27:34 445278368 Wisconsin 29.831828 Electronics
3 1985-07-16 02:22:11 445278369 Virginia 51.495818 Utilities
4 2023-02-07 06:43:35 445278370 Ohio 53.365462 Electronics
5 1994-12-15 09:16:08 445278371 Maryland 66.733325 Gas Station
6 1991-03-17 12:30:34 445278372 Pennsylvania 62.746037 Restaurant
7 2020-01-26 14:08:10 445278373 Maryland 113.996035 Electronics
8 1987-07-04 21:31:10 445278374 Texas 66.904820 Restaurant
9 2018-05-08 22:40:20 445278375 Georgia 80.782422 Supermarket

We can add new columns using the add_column function. Remember that also transformations in Ray Data are executed lazily, and are recorded in a computation graph. The actual execution only occurs when non-delayable operations are triggered (e.g. write_csv, take_batch, materialize, etc).

df_2 = df.add_column("dollars", lambda x: x["transaction_amount"].map(lambda x: int(x)))
df_2.take_batch(10, batch_format="pandas")
2026-01-29 08:02:52,483	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:52,484	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_column)] -> LimitOperator[limit=10]
Running 0: 0.00 row [00:00, ? row/s]
apBatches(add_column) 1: 0.00 row [00:00, ? row/s]

                                                                                                    
                                                  

✔️  Dataset execution finished in 0.54 seconds: 100%|██████████| 10.0/10.0 [00:00<00:00, 18.3 row/s]

apBatches(add_column) 1: 0.00 row [00:00, ? row/s]

t=10 2: 0.00 row [00:00, ? row/s]
apBatches(add_column): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 37.1MB object store: : 0.00 row [00:00, ? row/s]
apBatches(add_column): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 37.1MB object store:   0%|          | 0.00/750k [00:00<?, ? row/s]
apBatches(add_column): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 37.1MB object store: 100%|██████████| 750k/750k [00:00<00:00, 1.37M row/s]
apBatches(add_column): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 37.1MB object store: 100%|██████████| 750k/750k [00:00<00:00, 1.37M row/s]
                                                                                                                                                 

- MapBatches(add_column): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 37.1MB object store: 100%|██████████| 750k/750k [00:00<00:00, 1.36M row/s]


t=10 2: 0.00 row [00:00, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.7KB object store: : 0.00 row [00:00, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.7KB object store:   0%|          | 0.00/10.0 [00:00<?, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.7KB object store: 100%|██████████| 10.0/10.0 [00:00<00:00, 18.0 row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.7KB object store: 100%|██████████| 10.0/10.0 [00:00<00:00, 18.0 row/s]

- limit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.7KB object store: 100%|██████████| 10.0/10.0 [00:00<00:00, 17.9 row/s]
human_readable_ts id state transaction_amount transaction_category unix_timestamp dollars
0 2011-01-04 12:26:51 445278366 Texas 48.804656 Utilities 1294140411 48
1 2004-08-20 04:32:17 445278367 Mississippi 67.424309 Gas Station 1092969137 67
2 1982-03-12 20:27:34 445278368 Wisconsin 29.831828 Electronics 384809254 29
3 1985-07-16 02:22:11 445278369 Virginia 51.495818 Utilities 490321331 51
4 2023-02-07 06:43:35 445278370 Ohio 53.365462 Electronics 1675748615 53
5 1994-12-15 09:16:08 445278371 Maryland 66.733325 Gas Station 787479368 66
6 1991-03-17 12:30:34 445278372 Pennsylvania 62.746037 Restaurant 669209434 62
7 2020-01-26 14:08:10 445278373 Maryland 113.996035 Electronics 1580044090 113
8 1987-07-04 21:31:10 445278374 Texas 66.904820 Restaurant 552425470 66
9 2018-05-08 22:40:20 445278375 Georgia 80.782422 Supermarket 1525812020 80

You can achieve the same results of the previous cell with better flexibility using map batches with a user defined function.
The function will receive a batch of data in the specified format. In this case, a pandas dataset.

def parse_in_batches(df_batch):
    df_batch["year"] = df_batch["human_readable_ts"].map(lambda x: x.year)
    df_batch["month"] = df_batch["human_readable_ts"].map(lambda x: x.month)
    df_batch["day"] = df_batch["human_readable_ts"].map(lambda x: x.day)
    df_batch["is_leap"] = (df_batch["month"] == 2) & (df_batch["day"] == 29)
    return df_batch

df_3 = df_2.map_batches(parse_in_batches, batch_format = "pandas")
df_3.take_batch(10, batch_format="pandas")
2026-01-29 08:02:53,061	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:53,061	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_column)->MapBatches(parse_in_batches)] -> LimitOperator[limit=10]
Running 0: 0.00 row [00:00, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches) 1: 0.00 row [00:00, ? row/s]

t=10 2: 0.00 row [00:00, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.9KB object store: : 0.00 row [00:01, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.9KB object store:   0%|          | 0.00/10.0 [00:01<?, ? row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.9KB object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.69 row/s]

Running Dataset. Active & requested resources: 2/8 CPU, 171.7MB/72.2GB object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.62 row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store: : 0.00 row [00:01, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store:   0%|          | 0.00/1.00M [00:01<?, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store:  75%|███████▌  | 750k/1.00M [00:01<00:00, 721k row/s]
                                                                                                                                         ▌  | 750k/1.00M [00:01<00:00, 721k row/s]
                                                                                                                                                                               

✔️  Dataset execution finished in 1.05 seconds: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.52 row/s]                                

apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store:  75%|███████▌  | 750k/1.00M [00:01<00:00, 721k row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.9KB object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.69 row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store: 100%|██████████| 750k/750k [00:01<00:00, 721k row/s] 
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store: 100%|██████████| 750k/750k [00:01<00:00, 721k row/s]
                                                                                                                                                                              

- MapBatches(add_column)->MapBatches(parse_in_batches): Tasks: 2; Queued blocks: 0; Resources: 2.0 CPU, 42.9MB object store: 100%|██████████| 750k/750k [00:01<00:00, 713k row/s]


t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 1.9KB object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.69 row/s]

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.69 row/s] 

t=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.69 row/s]

- limit=10: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 10.0/10.0 [00:01<00:00, 9.49 row/s]
human_readable_ts id state transaction_amount transaction_category unix_timestamp dollars year month day is_leap
0 2011-01-04 12:26:51 445278366 Texas 48.804656 Utilities 1294140411 48 2011 1 4 False
1 2004-08-20 04:32:17 445278367 Mississippi 67.424309 Gas Station 1092969137 67 2004 8 20 False
2 1982-03-12 20:27:34 445278368 Wisconsin 29.831828 Electronics 384809254 29 1982 3 12 False
3 1985-07-16 02:22:11 445278369 Virginia 51.495818 Utilities 490321331 51 1985 7 16 False
4 2023-02-07 06:43:35 445278370 Ohio 53.365462 Electronics 1675748615 53 2023 2 7 False
5 1994-12-15 09:16:08 445278371 Maryland 66.733325 Gas Station 787479368 66 1994 12 15 False
6 1991-03-17 12:30:34 445278372 Pennsylvania 62.746037 Restaurant 669209434 62 1991 3 17 False
7 2020-01-26 14:08:10 445278373 Maryland 113.996035 Electronics 1580044090 113 2020 1 26 False
8 1987-07-04 21:31:10 445278374 Texas 66.904820 Restaurant 552425470 66 1987 7 4 False
9 2018-05-08 22:40:20 445278375 Georgia 80.782422 Supermarket 1525812020 80 2018 5 8 False
2026-01-29 08:02:54,146	WARNING progress_bar.py:120 -- Truncating long operator name to 100 characters. To disable this behavior, set `ray.data.DataContext.get_current().DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION = False`.

apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 8 [backpressured]; Queued blocks: 0; Resources: 8.0 CPU, 2.0GB object store: : 0.00 row [00:01, ? row/s]
Running Dataset. Active & requested resources: 8/8 CPU, 2.0GB/72.2GB object store: : 0.00 row [00:01, ? row/s].0 CPU, 2.0GB object store: : 0.00 row [00:01, ? row/s]

We can filter out rows based on few conditions using the filter function or using normal pandas functions.

def drop_leap(df_batch:pd.DataFrame):
    return df_batch[~df_batch["is_leap"]]

df_4 = df_3.map_batches(drop_leap, batch_format = "pandas").materialize()
df_4
2026-01-29 08:02:54,135	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:54,135	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_column)->MapBatches(parse_in_batches)->MapBatches(drop_leap)]
Running 0: 0.00 row [00:00, ? row/s]
                                                                                                              
✔️  Dataset execution finished in 1.10 seconds: 100%|██████████| 999k/999k [00:01<00:00, 903k row/s]                                                                 

apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 8 [backpressured]; Queued blocks: 0; Resources: 8.0 CPU, 2.0GB object store: : 0.00 row [00:01, ? row/s]
apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 179.9MB object store: : 0.00 row [00:01, ? row/s]              
apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 179.9MB object store:   0%|          | 0.00/999k [00:01<?, ? row/s]
apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 179.9MB object store: 100%|██████████| 999k/999k [00:01<00:00, 904k row/s]
apBatches(add_column)->...->MapBatches(drop_leap): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 179.9MB object store: 100%|██████████| 999k/999k [00:01<00:00, 904k row/s]
- MapBatches(add_column)->...->MapBatches(drop_leap): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 179.9MB object store: 100%|██████████| 999k/999k [00:01<00:00, 902k row/s]
MaterializedDataset(
   num_blocks=8,
   num_rows=999304,
   schema={
      human_readable_ts: datetime64[s],
      id: int64,
      state: object,
      transaction_amount: float64,
      transaction_category: object,
      unix_timestamp: int64,
      dollars: int64,
      year: int64,
      month: int64,
      day: int64,
      is_leap: bool
   }
)
# Total leap tx dropped
df_3.count() - df_4.count()
2026-01-29 08:02:55,265	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:55,266	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_column)->MapBatches(parse_in_batches)] -> AggregateNumRows[AggregateNumRows]
Running 0: 0.00 row [00:00, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches) 1: 0.00 row [00:00, ? row/s]

                                                                              
                                                                                

✔️  Dataset execution finished in 0.91 seconds: : 1.00 row [00:00, 1.09 row/s]

apBatches(add_column)->MapBatches(parse_in_batches) 1: 0.00 row [00:00, ? row/s]

egateNumRows 2: 0.00 row [00:00, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [00:00, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store:   0%|          | 0.00/1.00M [00:00<?, ? row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:00<00:00, 1.09M row/s]
apBatches(add_column)->MapBatches(parse_in_batches): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:00<00:00, 1.09M row/s]
                                                                                                                                                                               

- MapBatches(add_column)->MapBatches(parse_in_batches): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: 100%|██████████| 1.00M/1.00M [00:00<00:00, 1.09M row/s]


egateNumRows 2: 0.00 row [00:00, ? row/s]

egateNumRows: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 8.0B object store: : 0.00 row [00:00, ? row/s]

egateNumRows: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 8.0B object store: : 1.00 row [00:00, 1.09 row/s]

egateNumRows: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 8.0B object store: : 1.00 row [00:00, 1.09 row/s]

- AggregateNumRows: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 8.0B object store: : 1.00 row [00:00, 1.09 row/s]
696
%%time
def get_txs_specific_category(df_batch:pd.DataFrame, category:str):
    return df_batch[(df_batch["transaction_category"] == category) & (df_batch["year"] == 1970)]

gas_expenses = df_4.map_batches(get_txs_specific_category, batch_format = "pandas", fn_args=["Gas Station"]).materialize().take_all()
gas_expenses = pd.DataFrame(gas_expenses)
gas_expenses.head()
2026-01-29 08:02:56,202	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:02:56,202	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(get_txs_specific_category)]
Running 0: 0.00 row [00:00, ? row/s]
                                                                                                       
✔️  Dataset execution finished in 0.04 seconds: 100%|██████████| 3.01k/3.01k [00:00<00:00, 77.0k row/s]

apBatches(get_txs_specific_category) 1: 0.00 row [00:00, ? row/s]
apBatches(get_txs_specific_category): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 560.1KB object store: : 0.00 row [00:00, ? row/s]
apBatches(get_txs_specific_category): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 560.1KB object store:   0%|          | 0.00/3.01k [00:00<?, ? row/s]
apBatches(get_txs_specific_category): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 560.1KB object store: 100%|██████████| 3.01k/3.01k [00:00<00:00, 76.2k row/s]
- MapBatches(get_txs_specific_category): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 560.1KB object store: 100%|██████████| 3.01k/3.01k [00:00<00:00, 74.2k row/s]
CPU times: user 7.57 s, sys: 138 ms, total: 7.7 s
Wall time: 7.74 s
human_readable_ts id state transaction_amount transaction_category unix_timestamp dollars year month day is_leap
0 1970-05-18 10:46:30 445278450 Virginia 73.834990 Gas Station 11871990 73 1970 5 18 False
1 1970-04-23 14:57:14 445279317 Wisconsin 69.104664 Gas Station 9727034 69 1970 4 23 False
2 1970-04-01 08:58:56 445279906 California 68.527096 Gas Station 7804736 68 1970 4 1 False
3 1970-10-17 00:57:52 445279952 Indiana 78.796898 Gas Station 24969472 78 1970 10 17 False
4 1970-06-04 03:59:47 445280413 Wisconsin 73.422859 Gas Station 13316387 73 1970 6 4 False

We can apply groupby operations to our dataset using the groupby operator. However, consider that the grouped data must fit in the memory of a single node (i.e. it must be possible to store all the data referring to Alabama 1970 in a single node, etc.).

stats = pd.DataFrame(df_4.groupby(key = ["state", "year"]).count().take_all())
stats.head()
2026-01-29 08:03:03,962	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:03:03,963	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
Running 0: 0.00 row [00:00, ? row/s]
ggregate 1: 0.00 row [00:00, ? row/s]

ample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]


ap 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]



ce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:01<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:01<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:02<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:03<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:04<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:05<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:06<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:07<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:08<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:09<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:10<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:11<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:12<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:13<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:14<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:15<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:16<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:17<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:18<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:19<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:20<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:21<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:22<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:23<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:24<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:25<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:26<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:27<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:28<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:29<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:30<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:31<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:32<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:33<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:34<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:35<?, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:36<?, ? row/s]


fle Map:   0%|          | 0.00/375k [00:37<?, ? row/s]


fle Map: 100%|██████████| 375k/375k [00:37<00:00, 10.1k row/s]


fle Map: 100%|██████████| 375k/375k [00:37<00:00, 10.1k row/s]


fle Map:  75%|███████▌  | 375k/500k [00:38<00:12, 10.1k row/s]


fle Map: 100%|██████████| 500k/500k [00:38<00:00, 14.3k row/s]


fle Map: 100%|██████████| 500k/500k [00:38<00:00, 14.3k row/s]


fle Map: 100%|██████████| 500k/500k [00:39<00:00, 14.3k row/s]


fle Map: 100%|██████████| 500k/500k [00:40<00:00, 14.3k row/s]


fle Map: 100%|██████████| 500k/500k [00:41<00:00, 14.3k row/s]


fle Map:  57%|█████▋    | 500k/874k [00:42<00:26, 14.3k row/s]


fle Map: 100%|██████████| 874k/874k [00:42<00:00, 28.2k row/s]


fle Map: 100%|██████████| 874k/874k [00:42<00:00, 28.2k row/s]


fle Map: 100%|██████████| 874k/874k [00:43<00:00, 28.2k row/s]



 Reduce:   0%|          | 0.00/1.00 [00:43<?, ? row/s]



 Reduce:   0%|          | 0.00/1.00 [00:43<?, ? row/s]



 Reduce:   0%|          | 0.00/107 [00:44<?, ? row/s] 



 Reduce: 100%|██████████| 107/107 [00:44<00:00, 2.39 row/s]



 Reduce: 100%|██████████| 107/107 [00:44<00:00, 2.39 row/s]



 Reduce:  12%|█▏        | 107/876 [00:45<05:21, 2.39 row/s]



 Reduce: 100%|██████████| 876/876 [00:45<00:00, 26.1 row/s]



 Reduce: 100%|██████████| 876/876 [00:45<00:00, 26.1 row/s]



 Reduce:  55%|█████▍    | 876/1.59k [00:46<00:27, 26.1 row/s]



 Reduce: 100%|██████████| 1.59k/1.59k [00:46<00:00, 55.4 row/s]



 Reduce: 100%|██████████| 1.59k/1.59k [00:46<00:00, 55.4 row/s]



 Reduce:  75%|███████▍  | 1.59k/2.13k [00:47<00:09, 55.4 row/s]



 Reduce: 100%|██████████| 2.13k/2.13k [00:47<00:00, 83.1 row/s]



 Reduce: 100%|██████████| 2.13k/2.13k [00:47<00:00, 83.1 row/s]
ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 0 rows output: : 0.00 row [00:48, ? row/s]
                                                                                                             :48, ? row/s]
                                                                                                                       

                                                      


                                                              



✔️  Dataset execution finished in 48.26 seconds: 100%|██████████| 2.81k/2.81k [00:48<00:00, 58.1 row/s]

ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 0 rows output: : 0.00 row [00:48, ? row/s]

ample 2:   0%|          | 0.00/1.00 [00:48<?, ? row/s]


fle Map: 100%|██████████| 874k/874k [00:48<00:00, 28.2k row/s]



 Reduce: 100%|██████████| 2.13k/2.13k [00:48<00:00, 83.1 row/s]
ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 203.1KB object store; 2805 rows output: : 0.00 row [00:48, ? row/s]
ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 203.1KB object store; 2805 rows output:   0%|          | 0.00/2.81k [00:48<?, ? row/s]
ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 203.1KB object store; 2805 rows output: 100%|██████████| 2.81k/2.81k [00:48<00:00, 58.1 row/s]
ggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 203.1KB object store; 2805 rows output: 100%|██████████| 2.81k/2.81k [00:48<00:00, 58.1 row/s]
                                                                                                                                                        

                                                      


                                                              



- Aggregate: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 203.1KB object store; 2805 rows output: 100%|██████████| 2.81k/2.81k [00:48<00:00, 58.1 row/s]


ample 2:   0%|          | 0.00/1.00 [00:48<?, ? row/s]


fle Map: 100%|██████████| 874k/874k [00:48<00:00, 28.2k row/s]



 Reduce: 100%|██████████| 2.13k/2.13k [00:48<00:00, 83.1 row/s]

ort Sample:   0%|          | 0.00/1.00 [00:48<?, ? row/s]

ort Sample:   0%|          | 0.00/80.0 [00:48<?, ? row/s]

ort Sample: 100%|██████████| 80.0/80.0 [00:48<00:00, 1.66 row/s]

ort Sample: 100%|██████████| 80.0/80.0 [00:48<00:00, 1.66 row/s]

                                                                


                                                              



  *- Sort Sample: 100%|██████████| 80.0/80.0 [00:48<00:00, 1.66 row/s]     



fle Map: 100%|██████████| 874k/874k [00:48<00:00, 28.2k row/s]



 Reduce: 100%|██████████| 2.13k/2.13k [00:48<00:00, 83.1 row/s]


fle Map:  87%|████████▋ | 874k/999k [00:48<00:04, 28.2k row/s]


fle Map: 100%|██████████| 999k/999k [00:48<00:00, 26.0k row/s]


fle Map: 100%|██████████| 999k/999k [00:48<00:00, 26.0k row/s]


                                                              



  *- Shuffle Map: 100%|██████████| 999k/999k [00:48<00:00, 20.7k row/s]    




 Reduce: 100%|██████████| 2.13k/2.13k [00:48<00:00, 83.1 row/s]



 Reduce:  76%|███████▌  | 2.13k/2.81k [00:48<00:08, 83.1 row/s]



 Reduce: 100%|██████████| 2.81k/2.81k [00:48<00:00, 131 row/s] 



 Reduce: 100%|██████████| 2.81k/2.81k [00:48<00:00, 131 row/s]



  *- Shuffle Reduce: 100%|██████████| 2.81k/2.81k [00:48<00:00, 58.1 row/s]
state year count()
0 Alabama 1970 283
1 Alabama 1971 252
2 Alabama 1972 279
3 Alabama 1973 280
4 Alabama 1974 264

After processing the data using the Ray workers, we can move the results on the client and continue to do our work.

for state in ["Texas", "Wisconsin", "Georgia"]:
    subset = stats[stats["state"] == state]
    plt.plot(subset["year"], subset["count()"], label = state)
    plt.legend()
    plt.xlabel("Year")
    plt.ylabel("Count")
    plt.title("Count of transactions over years")
../../../../_images/36bd39b970f9a8c32121699f60b76ae9302a19b09e236064462bfffe962187da.png

Similarly to map_batches we can map arbitrarly complex functions to groups with map_groups.

# For each year and tx category, get the state in which 
# it happened the first transaction of the year
def fist_transaction_of_year(df:pd.DataFrame):
    # Sort by date
    df.sort_values(by="unix_timestamp", inplace=True)
    return df.iloc[:1, :]
    
first_of_the_year = df_4.groupby(key=["year", "transaction_category"]).map_groups(fist_transaction_of_year, batch_format = "pandas").take_all()
first_of_the_year = pd.DataFrame(first_of_the_year)
first_of_the_year.head()
2026-01-29 08:03:54,569	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2026-01-29_07-44-12_707538_3508558/logs/ray-data
2026-01-29 08:03:54,570	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(fist_transaction_of_year)]
Running 0: 0.00 row [00:00, ? row/s]
ort 1: 0.00 row [00:00, ? row/s]

ample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]


ap 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]



ce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]




                                                                                                 
                                

                                                      


                                                   



                                                   




✔️  Dataset execution finished in 1.03 seconds: 100%|██████████| 330/330 [00:01<00:00, 243 row/s]

ort 1: 0.00 row [00:01, ? row/s]

ample 2:   0%|          | 0.00/1.00 [00:01<?, ? row/s]


ap 3:   0%|          | 0.00/1.00 [00:01<?, ? row/s]



ce 4:   0%|          | 0.00/1.00 [00:01<?, ? row/s]




st_transaction_of_year) 5: 0.00 row [00:01, ? row/s]
ort: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 999304 rows output: : 0.00 row [00:01, ? row/s]
ort: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 999304 rows output:   0%|          | 0.00/999k [00:01<?, ? row/s]
ort: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 999304 rows output: 100%|██████████| 999k/999k [00:01<00:00, 735k row/s]
ort: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 999304 rows output: 100%|██████████| 999k/999k [00:01<00:00, 735k row/s]
                                                                                                                                                

                                                      


                                                   



                                                   




- Sort: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 999304 rows output: 100%|██████████| 999k/999k [00:01<00:00, 731k row/s]


ample 2:   0%|          | 0.00/1.00 [00:01<?, ? row/s]


ap 3:   0%|          | 0.00/1.00 [00:01<?, ? row/s]



ce 4:   0%|          | 0.00/1.00 [00:01<?, ? row/s]




st_transaction_of_year) 5: 0.00 row [00:01, ? row/s]

ort Sample:   0%|          | 0.00/1.00 [00:01<?, ? row/s]

ort Sample:   0%|          | 0.00/80.0 [00:01<?, ? row/s]

ort Sample: 100%|██████████| 80.0/80.0 [00:01<00:00, 58.4 row/s]

ort Sample: 100%|██████████| 80.0/80.0 [00:01<00:00, 58.4 row/s]

                                                                


                                                   



                                                   




  *- Sort Sample: 100%|██████████| 80.0/80.0 [00:01<00:00, 58.2 row/s]



ap 3:   0%|          | 0.00/1.00 [00:01<?, ? row/s]



ce 4:   0%|          | 0.00/1.00 [00:01<?, ? row/s]




st_transaction_of_year) 5: 0.00 row [00:01, ? row/s]


fle Map:   0%|          | 0.00/1.00 [00:01<?, ? row/s]


fle Map:   0%|          | 0.00/999k [00:01<?, ? row/s]


fle Map: 100%|██████████| 999k/999k [00:01<00:00, 725k row/s]


fle Map: 100%|██████████| 999k/999k [00:01<00:00, 725k row/s]


                                                             



                                                   




  *- Shuffle Map: 100%|██████████| 999k/999k [00:01<00:00, 724k row/s]




ce 4:   0%|          | 0.00/1.00 [00:01<?, ? row/s]




st_transaction_of_year) 5: 0.00 row [00:01, ? row/s]



 Reduce:   0%|          | 0.00/1.00 [00:01<?, ? row/s]



 Reduce:   0%|          | 0.00/999k [00:01<?, ? row/s]



 Reduce: 100%|██████████| 999k/999k [00:01<00:00, 723k row/s]



 Reduce: 100%|██████████| 999k/999k [00:01<00:00, 723k row/s]



                                                             




  *- Shuffle Reduce: 100%|██████████| 999k/999k [00:01<00:00, 722k row/s]





st_transaction_of_year) 5: 0.00 row [00:01, ? row/s]




st_transaction_of_year): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 39.4KB object store: : 0.00 row [00:01, ? row/s]




st_transaction_of_year): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 39.4KB object store:   0%|          | 0.00/330 [00:01<?, ? row/s]




st_transaction_of_year): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 39.4KB object store: 100%|██████████| 330/330 [00:01<00:00, 238 row/s]




st_transaction_of_year): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 39.4KB object store: 100%|██████████| 330/330 [00:01<00:00, 238 row/s]




- MapBatches(fist_transaction_of_year): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 39.4KB object store: 100%|██████████| 330/330 [00:01<00:00, 238 row/s]
human_readable_ts id state transaction_amount transaction_category unix_timestamp dollars year month day is_leap
0 1970-01-01 07:40:29 444992194 Maryland 2.029032 Bar 24029 2 1970 1 1 False
1 1970-01-01 03:31:03 445241344 North Carolina 64.604061 Electronics 9063 64 1970 1 1 False
2 1970-01-01 01:03:52 445081263 Tennessee 56.808570 Gas Station 232 56 1970 1 1 False
3 1970-01-01 01:37:01 445347932 Tennessee 66.079686 Restaurant 2221 66 1970 1 1 False
4 1970-01-01 04:39:12 445472208 California 83.857468 Supermarket 13152 83 1970 1 1 False

Finally, when we are happy with our transformations we can write data on disk using write_{something} command. The same considerations made for read_csv are valid also in this case.

#df_4.limit(100).write_csv(OUTPUT_PATH)

Exercises

  • Get the sum of all transactions about Restaurants for the state of Texas during 1970

  • Get the std for the Gas Station transactions in Ohio during 1971

  • How would you use an llm to create a description for the transactions? Try to write some code to accomplish this task following this example.

Release resources

ray.shutdown()