ETL

Helper to make ETL pipelines easier.

Our plan

We want to try to build up some functionality to create etl pipelines for pandas operations. We want to define classic extract, transform and load functions which we can string together in a pipeline. We would also like to add some functionality to inspect the intermediate dataframes and some key properties.

Our background

We work as Data Science Consultants mainly with SME’s in manufacturing. We often have to build up etl pipelines from various sources to then use for analytics or ML workloads. The pipelines can get pretty complex with custom business logic, that why we want to have a way to quickly show all the steps we take.

# Sample manufacturing data
data = {
    'order_id': [101, 102, 103, 104, 105],
    'product': ['Widget A', 'Widget B', 'Widget A', 'Widget C', 'Widget B'],
    'quantity': [50, 30, 75, 20, 45],
    'defects': [2, 1, 3, 0, 2],
    'production_time': [120, 95, 150, 80, 110]  # in minutes
}

df = pd.DataFrame(data)
df
order_id product quantity defects production_time
0 101 Widget A 50 2 120
1 102 Widget B 30 1 95
2 103 Widget A 75 3 150
3 104 Widget C 20 0 80
4 105 Widget B 45 2 110

source

get_demo_data


def get_demo_data(
    
):

Call self as a function.

df[df["product"] != "Widget C"].groupby(["product"])[["quantity", "defects" ,"production_time"]].sum()
quantity defects production_time
product
Widget A 125 5 270
Widget B 75 3 205

Solution space exploration

Our preferred approach

For a the exploration and discussion with AI on the approaches see the notebooks in the nbs folder.

So we would write a decorator that adds a logging keyword to each function, then we can determine for each function in the pipeline what to log/how much to log. We want to keep it as simple as possible. The pipeline should just take a df and a list of functions as steps of the pipeline.

def pipeline(df, steps, vrbs_default=True):
    for func, func_kwargs in steps:
        vrbs =  func_kwargs.get("vrbs", vrbs_default)
        func_kwargs.update({"vrbs": vrbs})
        df = func(df, **func_kwargs)
    return df
def track(func):
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        if vrbs:
            print(f"\n{'*'*10} Step: {func.__name__} {'*'*10}")
            print(f"\nInput DataFrame shape: {in_df.shape}")
            print(in_df)
        out_df = func(in_df, *args, **kwargs)
        if vrbs:
            print(f"\nOutput DataFrame shape: {out_df.shape}")
            print(out_df)
        return out_df
    return wrapper
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

We include a way to handle pipeline overwrites of the vrbs argument for specific transformation steps. In this example, no information for aggregate_by_product is printed:

steps = [
    (filter_products, {}),
    (aggregate_by_product, {"vrbs": False}),
]
pipeline(df, steps)

********** Step: filter_products **********

Input DataFrame shape: (5, 5)
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110

Output DataFrame shape: (4, 5)
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110
quantity defects production_time
product
Widget A 125 5 270
Widget B 75 3 205

We want to add basic profiling of time and memory.

start_time = datetime.now()
sleep(3.14)
end_time = datetime.now()
total_time = end_time - start_time
total_time
datetime.timedelta(seconds=3, microseconds=140237)
start_time
datetime.datetime(2026, 1, 21, 15, 36, 55, 877745)
def track(func):
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        start_time = datetime.now()
        if vrbs:
            
            print(f"\n{'*'*10} Step: {func.__name__} {'*'*10}")
            print(f"\nInput DataFrame shape: {in_df.shape}")
            print(f"Start time: {start_time}")
            print(in_df)
        out_df = func(in_df, *args, **kwargs)
        

        end_time = datetime.now()
        total_time = end_time - start_time
        if vrbs:
            print(f"\nOutput DataFrame shape: {out_df.shape}")
            print(f"End time: {end_time}")
            print(f"Total time: {total_time}")
            print(out_df)
        return out_df
    return wrapper
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()
steps = [
    (filter_products, {}),
    (aggregate_by_product, {}),
]
_df = pipeline(df, steps)

********** Step: filter_products **********

Input DataFrame shape: (5, 5)
Start time: 2026-01-21 15:36:59.046934
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110

Output DataFrame shape: (4, 5)
End time: 2026-01-21 15:36:59.048467
Total time: 0:00:00.001533
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110

********** Step: aggregate_by_product **********

Input DataFrame shape: (4, 5)
Start time: 2026-01-21 15:36:59.049173
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110

Output DataFrame shape: (2, 3)
End time: 2026-01-21 15:36:59.050753
Total time: 0:00:00.001580
          quantity  defects  production_time
product                                     
Widget A       125        5              270
Widget B        75        3              205

first reflection and next steps

  • right now we are still printing the whole dataframe. We want to change that to just show the first 3, 5 random and the last 3 rows, maybe with some visual clue to show that the rows are truncated.

  • we would also like show the dataframes to the right of the aggregate infos (like shape, etc.)

  • include information like df.info()

  • include information like df.describe()

  • include information like diff columns (num cols changed, cols just in before, cols just in after)

  • include information like diff rows (maybe just the number)

  • source code of functions

  • description of step, based on docstring

  • nicer representation maybe in html or display or something like that

  • new wrapper to include assertions and/or sanity checks (propertiy based, for example same number of mat_ids)

  • function generate pipeline report, in excel (or something else)

Okay our first idea after picking the problem back up is to store the information in a dictionary and then create different functions to display the information.

def track(func):
    @wraps(func)
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        meta_dict = {
            'step_name':func.__name__, # name of the pipeline step
            'in_time':datetime.now(), # time when the pipeline step starts
            'in_df_shape':in_df.shape, # shape of the input dataframe
            'in_df_head':in_df.head(3), # head of the input dataframe
            'in_df_sample':in_df.sample(min(in_df.shape[0], 5)), # sample of the input dataframe
            'in_df_tail':in_df.tail(3), # tail of the input dataframe
        }

        out_df = func(in_df, *args, **kwargs)

        out_time = datetime.now()
        total_time = out_time - meta_dict['in_time']
            
        meta_dict.update({
            'out_time':datetime.now(), # time when the pipeline step stops
            'out_df_shape':out_df.shape, # shape of the output dataframe
            'out_df_head':out_df.head(3), # head of the output dataframe
            'out_df_sample':out_df.sample(min(out_df.shape[0], 5)), # sample of the output dataframe
            'out_df_tail':out_df.tail(3), # tail of the output dataframe
            'total_time':total_time, # difference between in_time and out_time
        })

        if vrbs:
            print('Here we use a fuction to display the information')
            print(meta_dict)
        
        return out_df
    return wrapper
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()
steps = [
    (filter_products, {}),
    (aggregate_by_product, {}),
]
_df = pipeline(df, steps)
Here we use a fuction to display the information
{'step_name': 'filter_products', 'in_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 72286), 'in_df_shape': (5, 5), 'in_df_head':    order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, 'in_df_sample':    order_id   product  quantity  defects  production_time
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
0       101  Widget A        50        2              120
4       105  Widget B        45        2              110
3       104  Widget C        20        0               80, 'in_df_tail':    order_id   product  quantity  defects  production_time
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110, 'out_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 73085), 'out_df_shape': (4, 5), 'out_df_head':    order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, 'out_df_sample':    order_id   product  quantity  defects  production_time
4       105  Widget B        45        2              110
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, 'out_df_tail':    order_id   product  quantity  defects  production_time
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110, 'total_time': datetime.timedelta(microseconds=797)}
Here we use a fuction to display the information
{'step_name': 'aggregate_by_product', 'in_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 77323), 'in_df_shape': (4, 5), 'in_df_head':    order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, 'in_df_sample':    order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110
1       102  Widget B        30        1               95, 'in_df_tail':    order_id   product  quantity  defects  production_time
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110, 'out_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 78316), 'out_df_shape': (2, 3), 'out_df_head':           quantity  defects  production_time
product                                     
Widget A       125        5              270
Widget B        75        3              205, 'out_df_sample':           quantity  defects  production_time
product                                     
Widget B        75        3              205
Widget A       125        5              270, 'out_df_tail':           quantity  defects  production_time
product                                     
Widget A       125        5              270
Widget B        75        3              205, 'total_time': datetime.timedelta(microseconds=990)}

display functions

Here we created a sample meta_dict to play around and create different functions to display the information

meta_dict = {
    'step_name': 'filter_products',
    'step_description': 'Exclude products which are Widget C',
    'in_time': datetime.now(),
    'in_df_shape': (5, 5),
    'in_df_head': df.head(3),
    'in_df_sample': df.sample(min(df.shape[0], 5)),
    'in_df_tail': df.tail(3),
    'out_time': datetime.now(),
    'out_df_shape': (4, 5),
    'out_df_head': df[df["product"] != "Widget C"].head(3),
    'out_df_sample': df[df["product"] != "Widget C"].sample(min(4, 5)),
    'out_df_tail': df[df["product"] != "Widget C"].tail(3),
    'total_time': timedelta(microseconds=1287)
}

meta_dict
{'step_name': 'filter_products',
 'step_description': 'Exclude products which are Widget C',
 'in_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 86925),
 'in_df_shape': (5, 5),
 'in_df_head':    order_id   product  quantity  defects  production_time
 0       101  Widget A        50        2              120
 1       102  Widget B        30        1               95
 2       103  Widget A        75        3              150,
 'in_df_sample':    order_id   product  quantity  defects  production_time
 2       103  Widget A        75        3              150
 1       102  Widget B        30        1               95
 3       104  Widget C        20        0               80
 4       105  Widget B        45        2              110
 0       101  Widget A        50        2              120,
 'in_df_tail':    order_id   product  quantity  defects  production_time
 2       103  Widget A        75        3              150
 3       104  Widget C        20        0               80
 4       105  Widget B        45        2              110,
 'out_time': datetime.datetime(2026, 1, 21, 15, 36, 59, 87152),
 'out_df_shape': (4, 5),
 'out_df_head':    order_id   product  quantity  defects  production_time
 0       101  Widget A        50        2              120
 1       102  Widget B        30        1               95
 2       103  Widget A        75        3              150,
 'out_df_sample':    order_id   product  quantity  defects  production_time
 4       105  Widget B        45        2              110
 0       101  Widget A        50        2              120
 2       103  Widget A        75        3              150
 1       102  Widget B        30        1               95,
 'out_df_tail':    order_id   product  quantity  defects  production_time
 1       102  Widget B        30        1               95
 2       103  Widget A        75        3              150
 4       105  Widget B        45        2              110,
 'total_time': datetime.timedelta(microseconds=1287)}

source

StepMeta


def StepMeta(
    step_name:str, step_description:str, in_time:datetime, in_df_shape:tuple, in_df_head:DataFrame,
    in_df_sample:DataFrame, in_df_tail:DataFrame, out_time:datetime, out_df_shape:tuple, out_df_head:DataFrame,
    out_df_sample:DataFrame, out_df_tail:DataFrame, total_time:timedelta
)->None:

Metadata collected for a single ETL pipeline step.

step_meta = StepMeta(
    step_name='filter_products',
    step_description='Exclude products which are Widget C',
    in_time=datetime.now(),
    in_df_shape=df.shape,
    in_df_head=df.head(3),
    in_df_sample=df.sample(min(df.shape[0], 5)),
    in_df_tail=df.tail(3),
    out_time=datetime.now(),
    out_df_shape=(4, 5),
    out_df_head=df[df["product"] != "Widget C"].head(3),
    out_df_sample=df[df["product"] != "Widget C"].sample(4),
    out_df_tail=df[df["product"] != "Widget C"].tail(3),
    total_time=timedelta(microseconds=1287)
)
step_meta
StepMeta(step_name='filter_products', step_description='Exclude products which are Widget C', in_time=datetime.datetime(2026, 1, 21, 15, 36, 59, 108163), in_df_shape=(5, 5), in_df_head=   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, in_df_sample=   order_id   product  quantity  defects  production_time
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, in_df_tail=   order_id   product  quantity  defects  production_time
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110, out_time=datetime.datetime(2026, 1, 21, 15, 36, 59, 108396), out_df_shape=(4, 5), out_df_head=   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, out_df_sample=   order_id   product  quantity  defects  production_time
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95, out_df_tail=   order_id   product  quantity  defects  production_time
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110, total_time=datetime.timedelta(microseconds=1287))

Testing the pipeline


source

pipeline


def pipeline(
    df, steps, vrbs_default:bool=True
):

Call self as a function.


source

track


def track(
    func
):

Call self as a function.

@track
def filter_products(df):
    '''Exclude products which are Widget C.'''
    return df[df["product"] != "Widget C"]
@track
def aggregate_by_product(df):
    '''aggregate by product and sum columns "quantity", "defects", "production_time"'''
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()
steps = [
    (filter_products, {'vrbs':True}),
    (aggregate_by_product, {}),
]
_df = pipeline(df, steps, vrbs_default=False)
*************** filter_products ***************
'''Exclude products which are Widget C.'''


Total Time: 574 µs

Start: 2026-01-21 15:36:59.369533
  End: 2026-01-21 15:36:59.370107

Input DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
3      104  Widget C       20       0              80
2      103  Widget A       75       3             150
1      102  Widget B       30       1              95
0      101  Widget A       50       2             120
4      105  Widget B       45       2             110
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110

        Input:   5 rows, 5 cols
                     ↓       ↓
        Diff:   -1 rows, 0 cols
                     ↓       ↓
        Output:  4 rows, 5 cols
        
Output DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
3      104  Widget C       20       0              80
2      103  Widget A       75       3             150
1      102  Widget B       30       1              95
0      101  Widget A       50       2             120
4      105  Widget B       45       2             110
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110

_df = pipeline(df, steps, vrbs_default=True)
*************** filter_products ***************
'''Exclude products which are Widget C.'''


Total Time: 401 µs

Start: 2026-01-21 15:36:59.380798
  End: 2026-01-21 15:36:59.381199

Input DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
3      104  Widget C       20       0              80
1      102  Widget B       30       1              95
4      105  Widget B       45       2             110
2      103  Widget A       75       3             150
0      101  Widget A       50       2             120
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110

        Input:   5 rows, 5 cols
                     ↓       ↓
        Diff:   -1 rows, 0 cols
                     ↓       ↓
        Output:  4 rows, 5 cols
        
Output DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
3      104  Widget C       20       0              80
1      102  Widget B       30       1              95
4      105  Widget B       45       2             110
2      103  Widget A       75       3             150
0      101  Widget A       50       2             120
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110


*************** aggregate_by_product ***************
'''aggregate by product and sum columns "quantity", "defects", "production_time"'''


Total Time: 800 µs

Start: 2026-01-21 15:36:59.386272
  End: 2026-01-21 15:36:59.387072

Input DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
2      103  Widget A       75       3             150
4      105  Widget B       45       2             110
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
         :         :        :       :               :
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
4      105  Widget B       45       2             110

        Input:   4 rows,  5 cols
                     ↓        ↓
        Diff:   -2 rows, -2 cols
                     ↓        ↓
        Output:  2 rows,  3 cols
        
Output DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
2      103  Widget A       75       3             150
4      105  Widget B       45       2             110
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
         :         :        :       :               :
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
4      105  Widget B       45       2             110

Summarize what we did so far

🤖Reply🤖

🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠🧠

Here’s a summary of what you’ve built so far:

Goal: A lightweight ETL pipeline framework for pandas that lets you chain transformation steps and inspect intermediate results — aimed at your work as Data Science Consultants in manufacturing.

What’s been built:

  1. pipeline(df, steps, vrbs_default) — Runs a list of (function, kwargs) steps sequentially on a DataFrame, with per-step control over verbosity via the vrbs flag.

  2. @track decorator — Wraps each step function to optionally collect and display metadata (timing, shapes, data samples) using @wraps to preserve docstrings.

  3. StepMeta dataclass — A structured container for all the metadata captured per step: name, description, timing, input/output shapes, head/sample/tail of DataFrames.

  4. Display helper functions:

Still on the to-do list (from your reflection note): - Column truncation for wide DataFrames - Side-by-side layout (metadata next to DataFrames) - df.info() / df.describe() integration - Column diff (added/removed columns) - Source code display of step functions - HTML/rich display - Assertion/sanity-check wrapper - Pipeline report generation (e.g. Excel)