We want to try to build up some functionality to create etl pipelines for pandas operations. We want to define classic extract, transform and load functions which we can string together in a pipeline. We would also like to add some functionality to inspect the intermediate dataframes and some key properties.
Our background
We work as Data Science Consultants mainly with SME’s in manufacturing. We often have to build up etl pipelines from various sources to then use for analytics or ML workloads. The pipelines can get pretty complex with custom business logic, that why we want to have a way to quickly show all the steps we take.
For a the exploration and discussion with AI on the approaches see the notebooks in the nbs folder.
So we would write a decorator that adds a logging keyword to each function, then we can determine for each function in the pipeline what to log/how much to log. We want to keep it as simple as possible. The pipeline should just take a df and a list of functions as steps of the pipeline.
We include a way to handle pipeline overwrites of the vrbs argument for specific transformation steps. In this example, no information for aggregate_by_product is printed:
********** Step: filter_products **********
Input DataFrame shape: (5, 5)
Start time: 2026-01-21 15:36:59.046934
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110
Output DataFrame shape: (4, 5)
End time: 2026-01-21 15:36:59.048467
Total time: 0:00:00.001533
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
********** Step: aggregate_by_product **********
Input DataFrame shape: (4, 5)
Start time: 2026-01-21 15:36:59.049173
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
Output DataFrame shape: (2, 3)
End time: 2026-01-21 15:36:59.050753
Total time: 0:00:00.001580
quantity defects production_time
product
Widget A 125 5 270
Widget B 75 3 205
first reflection and next steps
right now we are still printing the whole dataframe. We want to change that to just show the first 3, 5 random and the last 3 rows, maybe with some visual clue to show that the rows are truncated.
we would also like show the dataframes to the right of the aggregate infos (like shape, etc.)
include information like df.info()
include information like df.describe()
include information like diff columns (num cols changed, cols just in before, cols just in after)
include information like diff rows (maybe just the number)
source code of functions
description of step, based on docstring
nicer representation maybe in html or display or something like that
new wrapper to include assertions and/or sanity checks (propertiy based, for example same number of mat_ids)
function generate pipeline report, in excel (or something else)
Okay our first idea after picking the problem back up is to store the information in a dictionary and then create different functions to display the information.
def track(func):@wraps(func)def wrapper(in_df, vrbs=False, *args, **kwargs): meta_dict = {'step_name':func.__name__, # name of the pipeline step'in_time':datetime.now(), # time when the pipeline step starts'in_df_shape':in_df.shape, # shape of the input dataframe'in_df_head':in_df.head(3), # head of the input dataframe'in_df_sample':in_df.sample(min(in_df.shape[0], 5)), # sample of the input dataframe'in_df_tail':in_df.tail(3), # tail of the input dataframe } out_df = func(in_df, *args, **kwargs) out_time = datetime.now() total_time = out_time - meta_dict['in_time'] meta_dict.update({'out_time':datetime.now(), # time when the pipeline step stops'out_df_shape':out_df.shape, # shape of the output dataframe'out_df_head':out_df.head(3), # head of the output dataframe'out_df_sample':out_df.sample(min(out_df.shape[0], 5)), # sample of the output dataframe'out_df_tail':out_df.tail(3), # tail of the output dataframe'total_time':total_time, # difference between in_time and out_time })if vrbs:print('Here we use a fuction to display the information')print(meta_dict)return out_dfreturn wrapper
We want to build to functions: 1. a display function for within our wrapper which only uses the meta_dict 2. a display function that just takes in one df and gets the head, sample, tail from there
In the future we might want to turn this into one function but for now we don’t want to store the whole df in meta_dict
@trackdef filter_products(df):'''Exclude products which are Widget C.'''return df[df["product"] !="Widget C"]
@trackdef aggregate_by_product(df):'''aggregate by product and sum columns "quantity", "defects", "production_time"'''return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()
*************** filter_products ***************
'''Exclude products which are Widget C.'''
Total Time: 574 µs
Start: 2026-01-21 15:36:59.369533
End: 2026-01-21 15:36:59.370107
Input DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
3 104 Widget C 20 0 80
2 103 Widget A 75 3 150
1 102 Widget B 30 1 95
0 101 Widget A 50 2 120
4 105 Widget B 45 2 110
: : : : :
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110
Input: 5 rows, 5 cols
↓ ↓
Diff: -1 rows, 0 cols
↓ ↓
Output: 4 rows, 5 cols
Output DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
3 104 Widget C 20 0 80
2 103 Widget A 75 3 150
1 102 Widget B 30 1 95
0 101 Widget A 50 2 120
4 105 Widget B 45 2 110
: : : : :
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110
_df = pipeline(df, steps, vrbs_default=True)
*************** filter_products ***************
'''Exclude products which are Widget C.'''
Total Time: 401 µs
Start: 2026-01-21 15:36:59.380798
End: 2026-01-21 15:36:59.381199
Input DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
3 104 Widget C 20 0 80
1 102 Widget B 30 1 95
4 105 Widget B 45 2 110
2 103 Widget A 75 3 150
0 101 Widget A 50 2 120
: : : : :
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110
Input: 5 rows, 5 cols
↓ ↓
Diff: -1 rows, 0 cols
↓ ↓
Output: 4 rows, 5 cols
Output DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
3 104 Widget C 20 0 80
1 102 Widget B 30 1 95
4 105 Widget B 45 2 110
2 103 Widget A 75 3 150
0 101 Widget A 50 2 120
: : : : :
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110
*************** aggregate_by_product ***************
'''aggregate by product and sum columns "quantity", "defects", "production_time"'''
Total Time: 800 µs
Start: 2026-01-21 15:36:59.386272
End: 2026-01-21 15:36:59.387072
Input DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
: : : : :
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
Input: 4 rows, 5 cols
↓ ↓
Diff: -2 rows, -2 cols
↓ ↓
Output: 2 rows, 3 cols
Output DataFrame:
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
: : : : :
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
: : : : :
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
4 105 Widget B 45 2 110
Summarize what we did so far
🤖Reply🤖
🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠
Here’s a summary of what you’ve built so far:
Goal: A lightweight ETL pipeline framework for pandas that lets you chain transformation steps and inspect intermediate results — aimed at your work as Data Science Consultants in manufacturing.
What’s been built:
pipeline(df, steps, vrbs_default) — Runs a list of (function, kwargs) steps sequentially on a DataFrame, with per-step control over verbosity via the vrbs flag.
@track decorator — Wraps each step function to optionally collect and display metadata (timing, shapes, data samples) using @wraps to preserve docstrings.
StepMeta dataclass — A structured container for all the metadata captured per step: name, description, timing, input/output shapes, head/sample/tail of DataFrames.