Tutorial - Part #4 - Steps

Steps: Processing Data

After we execute the line python in_corral load we have the iris data loaded in our database and now we want to calculate the mean, minimum and maximum values for sepal_length, sepal_width, petal_length and petal_width in parallel for each species.

Warning

All throughout this tutorial we have used SQLite as our database. SQLite does not support concurrency. Keep in mind this is just an excercise and a real pipeline should use a database like PostgreSQL, MySQL, Oracle or something even more powerful like Hive

A Model for the Statistics

To hold the statistics, we will define a model with the three statistical measures for the four observed properties of the species. It will also hold a reference to the Iris species to which it belong (a relation to the Name table.)

To do so, we add at the end of my_pipeline/models.py, the class

class Statistics(db.Model):

    __tablename__ = 'Statistics'

    id = db.Column(db.Integer, primary_key=True)

    name_id = db.Column(
        db.Integer, db.ForeignKey('Name.id'), nullable=False, unique=True)
    name = db.relationship(
        "Name", backref=db.backref("statistics"), uselist=False)

    mean_sepal_length = db.Column(db.Float, nullable=False)
    mean_sepal_width = db.Column(db.Float, nullable=False)
    mean_petal_length = db.Column(db.Float, nullable=False)
    mean_petal_width = db.Column(db.Float, nullable=False)

    min_sepal_length = db.Column(db.Float, nullable=False)
    min_sepal_width = db.Column(db.Float, nullable=False)
    min_petal_length = db.Column(db.Float, nullable=False)
    min_petal_width = db.Column(db.Float, nullable=False)

    max_sepal_length = db.Column(db.Float, nullable=False)
    max_sepal_width = db.Column(db.Float, nullable=False)
    max_petal_length = db.Column(db.Float, nullable=False)
    max_petal_width = db.Column(db.Float, nullable=False)

    def __repr__(self):
        return "<Statistics of '{}'>".format(self.name.name)

If you have already read our last tutorial, the only differences this model has with the previous ones are the parameters unique=True and userlist=False on the lines where we define the relation. These are used to enforce that each instance of Name has one and only one instance of Statistics.

To create the table we execute once again on the command line python in_corral createdb and only the new table will be crated without changing the shape and form of the previous ones.

The Steps

We will create four steps in the my_pypeline/steps.py module.

#. Step 1: Creating Statistics for each Name

First, uncomment on the import section the line # from . import models; and then edit the class MyStep so that it looks like the following:

class StatisticsCreator(run.Step):

    model = models.Name
    conditions = []

    def process(self, name):
        stats = self.session.query(models.Statistics).filter(
            models.Statistics.name_id==name.id).first()
        if stats is None:
            yield models.Statistics(
                name_id=name.id,
                mean_sepal_length=0., mean_sepal_width=0.,
                mean_petal_length=0., mean_petal_width=0.,
                min_sepal_length=0., min_sepal_width=0.,
                min_petal_length=0., min_petal_width=0.,
                max_sepal_length=0., max_sepal_width=0.,
                max_petal_length=0., max_petal_width=0.)

This step’s goal is to create an instance of Statistics for each different name it finds on the Name table.

Notice that we let the Step know in the variable model that it will be working with unconditioned instances of the model Name. Corral will sequentially send the stored (by the Loader) instances, that meet the conditions (all of the instances in our case).

The process() method receives each instance of Name and if there is no associated instance of Statistic, it will create one with all the values set to 0, yielding back the control to corral (with yield).

#. Step 2: Calculating Statistics for “Iris-Setosa”

If we create a Step SetosaStatistics and we assign to its model variable the class Statistics and we add the conditions:

conditions = [
        models.Statistics.name.has(name="Iris-setosa"),
        models.Statistics.mean_sepal_length==0.]

we will create a step that only calculates the statistics of Iris-setosa if they were not previously calculated (the mean for sepal_length is 0.)

The process() method will be passed by parameter said instance of Statistics. To fill the statistics out, the complete code for this step will be:

class SetosaStatistics(run.Step):

    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-setosa"),
        models.Statistics.mean_sepal_length==0.]

    def process(self, stats):
        sepal_length, sepal_width, petal_length, petal_width = [], [], [], []
        for obs in stats.name.observations:
            sepal_length.append(obs.sepal_length)
            sepal_width.append(obs.sepal_width)
            petal_length.append(obs.petal_length)
            petal_width.append(obs.petal_width)

        stats.mean_sepal_length = sum(sepal_length) / len(sepal_length)
        stats.mean_sepal_width = sum(sepal_width) / len(sepal_width)
        stats.mean_petal_length = sum(petal_length) / len(petal_length)
        stats.mean_petal_width = sum(petal_width) / len(petal_width)

        stats.min_sepal_length = min(sepal_length)
        stats.min_sepal_width = min(sepal_width)
        stats.min_petal_length = min(petal_length)
        stats.min_petal_width = min(petal_width)

        stats.max_sepal_length = max(sepal_length)
        stats.max_sepal_width = max(sepal_width)
        stats.max_petal_length = max(petal_length)
        stats.max_petal_width = max(petal_width)

#. Step 3 and 4: Calculating Statistics for “Iris-Virginica” and “Iris-Versicolor”

The last two steps are exactly the same as the previous ones, except for the variables model and conditions.

class VersicolorStatistics(run.Step):

    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-versicolor"),
        models.Statistics.mean_sepal_length==0.]

    def process(self, stats):
        # SAME CODE AS SetosaStatistics.process


class VirginicaStatistics(run.Step):

    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-virginica"),
        models.Statistics.mean_sepal_length==0.]

    def process(self, stats):
        # SAME CODE AS SetosaStatistics.process

#. Step 5: Add the new steps to settings.STEPS

The last piece is to make your pipeline aware of the new steps. For this, you need to add the full python path to the STEPS list inside the settings.py file.

# Pipeline processor steps
STEPS = [
    "my_pipeline.steps.StatisticsCreator",
    "my_pipeline.steps.SetosaStatistics",
    "my_pipeline.steps.VirginicaStatistics",
    "my_pipeline.steps.VersicolorStatistics"]

Finally you can inspect the registered steps with the lssteps command

$python in_corral.py lssteps
+----------------------+---------+---------+
|      Step Class      | Process | Groups  |
+======================+=========+=========+
| SetosaStatistics     | 1       | default |
| StatisticsCreator    | 1       | default |
| VersicolorStatistics | 1       | default |
| VirginicaStatistics  | 1       | default |
+----------------------+---------+---------+
  TOTAL PROCESSES: 4
  DEBUG PROCESS: Enabled

Also note that (by default) every step is on the default group.

Note

The command python in_corral groups shows all available groups in steps and alerts.

Running The Steps

The main command to run the corral steps is run.

when you execute python in_corral run all the steps are executed asynchronous. If for some particular case you need to run the steps sequentially (in the same order of settings.STEPS) you can add the --sync flag.

Warning

By design, SQLite is not capable to serve as a multiprocess database, so it is highly recommended to run the steps with the --sync flag.

Here is a run example output

$ python in_corral.py run --sync
[INFO] Executing step '<class 'my_pipeline.steps.SetosaStatistics'>' #1
[INFO] SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
[INFO] ()
[INFO] SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
[INFO] ()
[INFO] BEGIN (implicit)
[INFO] SELECT "Statistics".id AS "Statistics_id", "Statistics".name_id AS "Statistics_name_id", "Statistics".mean_sepal_length AS "Statistics_mean_sepal_length", "Statistics".mean_sepal_width AS "Statistics_mean_sepal_width", "Statistics".mean_petal_length AS "Statistics_mean_petal_length", "Statistics".mean_petal_width AS "Statistics_mean_petal_width", "Statistics".min_sepal_length AS "Statistics_min_sepal_length", "Statistics".min_sepal_width AS "Statistics_min_sepal_width", "Statistics".min_petal_length AS "Statistics_min_petal_length", "Statistics".min_petal_width AS "Statistics_min_petal_width", "Statistics".max_sepal_length AS "Statistics_max_sepal_length", "Statistics".max_sepal_width AS "Statistics_max_sepal_width", "Statistics".max_petal_length AS "Statistics_max_petal_length", "Statistics".max_petal_width AS "Statistics_max_petal_width"
FROM "Statistics"
WHERE (EXISTS (SELECT 1
FROM "Name"
WHERE "Name".id = "Statistics".name_id AND "Name".name = ?)) AND "Statistics".mean_sepal_length = ?
[INFO] ('Iris-setosa', 0.0)
[INFO] COMMIT
[INFO] Done Step '<class 'pipeline.steps.SetosaStatistics'>' #1
[INFO] Executing step '<class 'pipeline.steps.StatisticsCreator'>' #1
[INFO] BEGIN (implicit)
[INFO] SELECT "Name".id AS "Name_id", "Name".name AS "Name_name"
FROM "Name"
...

Selective Steps Runs By Name and Groups

In some cases it is useful to run only a single or a group of steps.

Run by Name

You can run a single step by using the --steps|-s flag followed by the class-names of the steps you want to run.

$ python in_corral.py run --steps SetosaStatistics VersicolorStatistics
[INFO] Executing step '<class 'irispl.steps.SetosaStatistics'>' #1
[INFO] Executing step '<class 'irispl.steps.VersicolorStatistics'>' #1
...

Run by Groups

One of the most important concepts with Corral steps is the notion of groups.

Certain steps can be grouped together by adding a groups attribute to a Step class. For example, if we want to add the tree statistics calculators steps to a statistics group, we’d write:

class SetosaStatistics(run.Step):
    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-versicolor"),
        models.Statistics.mean_sepal_length==0.]
    groups = ["default", "statistics"]

    ...


class VersicolorStatistics(run.Step):

    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-versicolor"),
        models.Statistics.mean_sepal_length==0.]
    groups = ["default", "statistics"]

    ...


class VirginicaStatistics(run.Step):

    model = models.Statistics
    conditions = [
        models.Statistics.name.has(name="Iris-virginica"),
        models.Statistics.mean_sepal_length==0.]
    groups = ["default", "statistics"]

You can check the changes on the column Groups by running lssteps again

$ python in_corral.py lssteps
+----------------------+---------+--------------------+
|      Step Class      | Process |       Groups       |
+======================+=========+====================+
| SetosaStatistics     | 1       | default:statistics |
| StatisticsCreator    | 1       | default            |
| VersicolorStatistics | 1       | default:statistics |
| VirginicaStatistics  | 1       | default:statistics |
+----------------------+---------+--------------------+
  TOTAL PROCESSES: 4
  DEBUG PROCESS: Enabled

You can also list only the steps of a particular group with the --groups|-g flag

$ python in_corral.py lssteps -g statistics
+----------------------+---------+--------------------+
|      Step Class      | Process |       Groups       |
+======================+=========+====================+
| SetosaStatistics     | 1       | default:statistics |
| VersicolorStatistics | 1       | default:statistics |
| VirginicaStatistics  | 1       | default:statistics |
+----------------------+---------+--------------------+
  TOTAL PROCESSES: 3
  DEBUG PROCESS: Enabled

Finally, you can run the group of your choice with the --step-groups|--sg flag on the run command

$ python in_corral.py run -sg statistics
[INFO] Executing step '<class 'irispl.steps.SetosaStatistics'>' #1
[INFO] Executing step '<class 'irispl.steps.VersicolorStatistics'>' #1
[INFO] Executing step '<class 'irispl.steps.VirginicaStatistics'>' #1
...

As you can see, the StatisticsCreator step didn’t run.