A framework for building and running simple data engineering pipelines in Python.

In Data Science or Data Engineering you constantly hear term “data pipeline”. But there are so many meanings to this term and people often are refering to very specific tools or packages depending on their own background/needs. There are pipelines for pretty much everything and in Python alone I can think of Luigi, Airflow, scikit-learn pipelines, and Pandas pipes just off the top of my head - this article does a good job of helping you understand what is out there.

It can be quite confusing especially if you want a simple and agnostic pipeline that you can customize for your specific needs with no bells and whistles or lock-ins to libraries etc. That is where PyDuct comes in. It is for the simple data engineer who just wants to get stuff done in an ordered and repeatable way.

PyDuct is a simple data pipeline that automates a chain of transformations performed on some data.

PyDuct data pipelines are a great way of introducing automation, reproducibility, structure, and flow to your data engineering projects.



What is it?

The PyDuct transformation pipelines use user defined transformation functions linked together into a TransformationPipe. The key feature of PyDuct is that the datasource passed in can be almost anything that you desire - e.g. a pandas dataframe, a geopandas dataframe, and iris datacube, a numppy array, so long as your transformation steps read and write the same object PyDuct will work for you.

pypipe arch

Install

pip install pyduct

How to use

The TransformationPipe class accepts a list of transformation functions,'steps', to be applied sequentially. Each step contains a name and a function, applied to the input DataObject and will return a transformed DataObject. There is also a third argument in a step that is an optional dictionary of parameters to be passed to your step transformation functions.

In order to use PyDuct you need two things - a DataObject and a set of transformation steps

DataObject

In this very simplified example we will use a geopandas.GeoDataFrame as our input DataObject. To do this we will load an example data set from Kaggle on the global distribution of Volcano Eruptions: https://www.kaggle.com/datasets/texasdave/volcano-eruptions that we have stored in the repo for this package as 'volcano_data_2010.csv'

import pandas
import geopandas

Load the data and put it into a geopandas dataframe:

df1 = pandas.read_csv('../test_data/volcano_data_2010.csv')
# Keep only relevant columns
df = df1.loc[:, ("Year", "Name", "Country", "Latitude", "Longitude", "Type")]
# Create point geometries
geometry = geopandas.points_from_xy(df.Longitude, df.Latitude)
geo_df = geopandas.GeoDataFrame(df[['Year','Name','Country', 'Latitude', 'Longitude', 'Type']], geometry=geometry)
geo_df.head()
Year Name Country Latitude Longitude Type geometry
0 2010 Tungurahua Ecuador -1.467 -78.442 Stratovolcano POINT (-78.44200 -1.46700)
1 2010 Eyjafjallajokull Iceland 63.630 -19.620 Stratovolcano POINT (-19.62000 63.63000)
2 2010 Pacaya Guatemala 14.381 -90.601 Complex volcano POINT (-90.60100 14.38100)
3 2010 Sarigan United States 16.708 145.780 Stratovolcano POINT (145.78000 16.70800)
4 2010 Karangetang [Api Siau] Indonesia 2.780 125.480 Stratovolcano POINT (125.48000 2.78000)

Steps

Just as an example of something to do we will define only one transformation steps to spatially subset to the Australian region. Yes, i know that this is an unrealistic example but it is just here to show you how to implement pipelines.

We must now write our transformation function - keep in mind that the function must take our DataObject as an input and return a transformed DataObject as a return... in this example that is a geopandas.GeoDataFrame

from pyproj import crs
from shapely.geometry import Polygon, MultiPolygon, box, Point
def spatialCrop(gdf: geopandas.GeoDataFrame, **kwargs):
    """
    This function will apply a sptial limit to a GeoDataFrame based on user-defined limits.
    ----------
    parameters:
        gdf (geopandas.GeoDataFrame): an input GeoDataFrame
        kwargs (dict): parameters, 
            - boundingBox (list): an iterable (lon_min, lat_min, lon_max, lat_max) of the specified region.
    Output:
        transformed_gdf (gdp.GeoDataFrame): GeoDataFrame that is spatially limited to the boundingBox.
    """
    if "boundingBox" not in kwargs:
        return gdf

    boundingBox = kwargs["boundingBox"]
    # just an example so we are doing naughty things with the CRS... look away here...
    coord_system = crs.crs.CRS('WGS 84')

    bounding = geopandas.GeoDataFrame(
        {
            'limit': ['bounding box'],
            'geometry': [
                box(boundingBox[0], boundingBox[1], boundingBox[2],
                    boundingBox[3])
            ]
        },
        crs=coord_system)
    limited_gdf = geopandas.tools.sjoin(gdf,
                                        bounding,
                                        op='intersects',
                                        how='left')
    limited_gdf = limited_gdf[limited_gdf['limit'] == 'bounding box']
    limited_gdf = limited_gdf.drop(columns=['index_right', 'limit'])

    return limited_gdf

Define a PyDuct Pipe

Now that we have a step or function and some data we can now define our transformation pipeline:

pipe = TransformationPipe(steps=[
    ('refine region', spatialCrop, {"boundingBox": [80, -50, 180, 0]})
])

Evaluate your PyDuct Pipe

This where things get interesting... we can now call evaluate on our pipe and watch the magic happen:

Input data:

geo_df
Year Name Country Latitude Longitude Type geometry
0 2010 Tungurahua Ecuador -1.467 -78.442 Stratovolcano POINT (-78.44200 -1.46700)
1 2010 Eyjafjallajokull Iceland 63.630 -19.620 Stratovolcano POINT (-19.62000 63.63000)
2 2010 Pacaya Guatemala 14.381 -90.601 Complex volcano POINT (-90.60100 14.38100)
3 2010 Sarigan United States 16.708 145.780 Stratovolcano POINT (145.78000 16.70800)
4 2010 Karangetang [Api Siau] Indonesia 2.780 125.480 Stratovolcano POINT (125.48000 2.78000)
... ... ... ... ... ... ... ...
58 2018 Kilauea United States 19.425 -155.292 Shield volcano POINT (-155.29200 19.42500)
59 2018 Kadovar Papua New Guinea -3.620 144.620 Stratovolcano POINT (144.62000 -3.62000)
60 2018 Ijen Indonesia -8.058 114.242 Stratovolcano POINT (114.24200 -8.05800)
61 2018 Kilauea United States 19.425 -155.292 Shield volcano POINT (-155.29200 19.42500)
62 2018 Aoba Vanuatu -15.400 167.830 Shield volcano POINT (167.83000 -15.40000)

63 rows × 7 columns

Evaluation:

transformed_geo_df = pipe.evaluate(geo_df)

Transformed data:

transformed_geo_df
Year Name Country Latitude Longitude Type geometry
6 2010 Merapi Indonesia -7.542 110.442 Stratovolcano POINT (110.44200 -7.54200)
8 2010 Tengger Caldera Indonesia -7.942 112.950 Stratovolcano POINT (112.95000 -7.94200)
9 2011 Merapi Indonesia -7.542 110.442 Stratovolcano POINT (110.44200 -7.54200)
22 2013 Merapi Indonesia -7.542 110.442 Stratovolcano POINT (110.44200 -7.54200)
23 2013 Paluweh Indonesia -8.320 121.708 Stratovolcano POINT (121.70800 -8.32000)
25 2013 Paluweh Indonesia -8.320 121.708 Stratovolcano POINT (121.70800 -8.32000)
29 2013 Okataina New Zealand -38.120 176.500 Lava dome POINT (176.50000 -38.12000)
31 2014 Kelut Indonesia -7.930 112.308 Stratovolcano POINT (112.30800 -7.93000)
39 2015 Manam Papua New Guinea -4.100 145.061 Stratovolcano POINT (145.06100 -4.10000)
41 2015 Okataina New Zealand -38.120 176.500 Lava dome POINT (176.50000 -38.12000)
45 2016 Rinjani Indonesia -8.420 116.470 Stratovolcano POINT (116.47000 -8.42000)
50 2017 Dieng Volc Complex Indonesia -7.200 109.920 Complex volcano POINT (109.92000 -7.20000)
52 2017 Aoba Vanuatu -15.400 167.830 Shield volcano POINT (167.83000 -15.40000)
53 2017 Merapi Indonesia -7.542 110.442 Stratovolcano POINT (110.44200 -7.54200)
55 2018 Kadovar Papua New Guinea -3.620 144.620 Stratovolcano POINT (144.62000 -3.62000)
59 2018 Kadovar Papua New Guinea -3.620 144.620 Stratovolcano POINT (144.62000 -3.62000)
60 2018 Ijen Indonesia -8.058 114.242 Stratovolcano POINT (114.24200 -8.05800)
62 2018 Aoba Vanuatu -15.400 167.830 Shield volcano POINT (167.83000 -15.40000)

The power of this work is in its reproducibility and scalablilty.

Credits

  • Logo art from "Vecteezy.com"
  • Demo data from "Kaggle.com"