Pipeline Development Guide

Learn to build data processing pipelines using the ia_modules execution engine. This guide covers YAML pipeline definitions, custom Step classes, database persistence, error handling, and flow control.

30 minutes Intermediate Pipeline Development

What You Will Build

A complete data processing pipeline using the ia_modules execution engine. You will create custom Step classes, define pipeline flow in YAML, and persist results to the database.

What You Will Learn

  • The ia_modules execution engine architecture
  • YAML pipeline definition format with real examples
  • Creating custom Step classes that extend the base Step
  • Database persistence from pipeline steps using FiberApp services
  • Error handling and retry policies
  • Flow control and step routing

Key Concepts

Pipeline Architecture

  • Execution Engine - The ia_modules framework that runs your pipelines
  • Step Classes - Python classes that implement individual processing steps
  • Flow Control - YAML-defined routing between steps
  • Service Injection - Access to FiberApp services within steps

Pipeline Components

  • Pipeline YAML definition with execution configuration
  • Step modules with async run methods
  • Data models for persistence
  • Error handling and retry policies

📋 Prerequisites: Your Setup Checklist

Before you begin, you need a fully configured Fiberwise environment. This is the foundation for building any app on the platform.

🔧 Required Setup

✅ All Set?

Once all boxes are checked, you are ready to proceed. If not, please complete the linked guides first.

Step 1: Understanding the ia_modules Execution Engine

The ia_modules execution engine is the core framework for running pipelines in Fiberwise. It provides:

  • Step orchestration - Manages the execution order of pipeline steps
  • Service injection - Provides access to FiberApp services within steps
  • Data flow - Passes data between steps automatically
  • Error handling - Configurable retry and failure policies

Pipeline Registration in app_manifest.yaml

Pipelines are registered in your app manifest with execution engine configuration:

# app_manifest.yaml
app:
  app_slug: pipeline-test-app
  name: Pipeline Test App (ia_modules - REAL API)
  version: 3.0.7
  category: simple
  description: Pipeline test app
  entryPoint: index.js
  icon: fas fa-calculator
  publisher: FiberWise

pipelines:
- name: Number Doubler Pipeline
  slug: number-doubler-pipeline
  version: 3.0.0
  description: Simple 2-step pipeline using ia_modules execution engine
  execution_engine: ia_modules
  pipeline_definition: pipelines/pipeline.yaml
  engine_config:
    timeout: 30
    max_retries: 1
    continue_on_error: false

Key Configuration Options

  • execution_engine: ia_modules - Specifies the ia_modules framework
  • pipeline_definition - Path to the YAML pipeline definition file
  • engine_config.timeout - Maximum execution time in seconds
  • engine_config.max_retries - Number of retry attempts on failure
  • engine_config.continue_on_error - Whether to continue if a step fails

Step 2: YAML Pipeline Definition Format

The pipeline definition file describes the steps, their configuration, execution flow, and error handling. Here is a complete working example:

# pipelines/pipeline.yaml
name: number_doubler_pipeline
version: "2.0.0"
description: Simple 2-step pipeline using ia_modules framework

# Execution configuration
execution:
  engine: ia_modules
  timeout: 30
  max_retries: 1

# Step definitions
steps:
  - id: double_number
    name: Double the Number
    module: steps
    step_class: DoubleNumberStep
    config:
      description: Double a number by multiplying it by 2

  - id: save_result
    name: Save Result to Database
    module: steps
    step_class: SaveResultStep
    config:
      model: test-results
      description: Save the doubled number to database

# Execution flow
flow:
  start_at: double_number

  paths:
    - from: double_number
      to: save_result
      condition:
        type: always

    - from: save_result
      to: end_with_success
      condition:
        type: always

# Error handling
error_handling:
  on_step_failure: stop
  retry_policy:
    max_attempts: 2
    backoff: exponential

Pipeline Definition Sections

Metadata Section

name: number_doubler_pipeline
version: "2.0.0"
description: Simple 2-step pipeline using ia_modules framework

Execution Section

execution:
  engine: ia_modules
  timeout: 30
  max_retries: 1

Steps Section

Each step definition includes:

  • id - Unique identifier for the step (used in flow routing)
  • name - Human-readable name for logging and display
  • module - Python module containing the step class
  • step_class - Name of the Step subclass to instantiate
  • config - Step-specific configuration options
steps:
  - id: double_number
    name: Double the Number
    module: steps
    step_class: DoubleNumberStep
    config:
      description: Double a number by multiplying it by 2

Step 3: Creating Custom Step Classes

Step classes extend the base Step class from ia_modules.pipeline.core and implement an async run method.

Basic Step Structure

# steps/double_step.py
import logging
from ia_modules.pipeline.core import Step

logger = logging.getLogger(__name__)


class DoubleNumberStep(Step):
    """Step that doubles an input number."""

    async def run(self, data: dict) -> dict:
        """
        Double the input number.

        Args:
            data: Input data containing 'input_number' or 'number'

        Returns:
            dict with input_number and doubled_number
        """
        logger.info(f"[DoubleNumberStep] Starting with input data: {data}")

        # Accept either 'input_number' or 'number' as input
        input_number = data.get("input_number") or data.get("number")

        if input_number is None:
            logger.error("[DoubleNumberStep] Missing required parameter: input_number or number")
            return {
                "success": False,
                "error": "Missing required parameter: input_number or number"
            }

        logger.info(f"[DoubleNumberStep] Extracted input_number: {input_number}")

        # Validate and convert
        try:
            number = int(input_number)
            logger.info(f"[DoubleNumberStep] Converted to integer: {number}")
        except (ValueError, TypeError):
            logger.error(f"[DoubleNumberStep] Invalid number format: {input_number}")
            return {
                "success": False,
                "error": f"input_number must be a number, got: {input_number}"
            }

        # Double it
        doubled = number * 2
        logger.info(f"[DoubleNumberStep] Calculated result: {number} * 2 = {doubled}")

        result = {
            "success": True,
            "input_number": number,
            "doubled_number": doubled
        }
        logger.info(f"[DoubleNumberStep] Returning result: {result}")

        return result

Step Class Requirements

  • Extend the Step base class from ia_modules.pipeline.core
  • Implement an async run(self, data: dict) -> dict method
  • Return a dictionary with results (include success flag for error handling)
  • Use logging for debugging and monitoring

Input Handling Best Practices

  • Accept multiple input field names for flexibility (input_number or number)
  • Validate required inputs and return clear error messages
  • Use type conversion with proper exception handling
  • Log input data for debugging

Step 4: Database Persistence from Pipeline Steps

Steps can access FiberApp services through the injected services object. This enables database operations, API calls, and more.

Step with Database Persistence

# steps/save_step.py
import logging
from ia_modules.pipeline.core import Step

logger = logging.getLogger(__name__)


class SaveResultStep(Step):
    """Step that saves the doubled number result to the database."""

    async def run(self, data: dict) -> dict:
        """
        Save the result to the test-results model.

        Args:
            data: Input data from previous step containing input_number and doubled_number

        Returns:
            dict with result_id and success status
        """
        logger.info(f"[SaveResultStep] Starting with input data: {data}")

        # Access FiberApp through services (injected by framework)
        fiber = self.services.get('fiber') if hasattr(self, 'services') and self.services else None

        logger.info(f"[SaveResultStep] FiberApp available: {fiber is not None}")

        input_number = data.get("input_number")
        doubled_number = data.get("doubled_number")

        logger.info(f"[SaveResultStep] Extracted values - input: {input_number}, doubled: {doubled_number}")

        if fiber:
            try:
                logger.info(f"[SaveResultStep] Attempting to save to database via FiberApp")
                result = await fiber.data.create_item(
                    model_id='test-results',
                    data={
                        'input_number': input_number,
                        'doubled_number': doubled_number
                    }
                )
                logger.info(f"[SaveResultStep] Successfully saved to database, result_id: {result.get('item_id')}")
                return {
                    "success": True,
                    "result_id": result.get('item_id'),
                    "input_number": input_number,
                    "doubled_number": doubled_number
                }
            except Exception as e:
                logger.error(f"[SaveResultStep] Database save failed: {str(e)}", exc_info=True)
                return {
                    "success": False,
                    "error": f"Database error: {str(e)}",
                    "input_number": input_number,
                    "doubled_number": doubled_number
                }

        # No fiber available - return data without saving
        logger.warning("[SaveResultStep] FiberApp not available - skipping database save")
        return {
            "success": True,
            "result_id": None,
            "warning": "Fiber not available - result not saved to database",
            "input_number": input_number,
            "doubled_number": doubled_number
        }

Accessing Injected Services

The ia_modules framework injects services into steps through the self.services dictionary:

# Access FiberApp services
fiber = self.services.get('fiber') if hasattr(self, 'services') and self.services else None

# Use the data service to create records
result = await fiber.data.create_item(
    model_id='test-results',
    data={
        'input_number': input_number,
        'doubled_number': doubled_number
    }
)

Available Services

  • fiber.data - Database operations (create_item, get_item, update_item, delete_item)
  • fiber.config - Application configuration access
  • fiber.auth - Authentication and user context

Data Model Definition

Define the data model in your app_manifest.yaml for database persistence:

# In app_manifest.yaml
app:
  models:
  - description: Test results from pipeline execution
    model_slug: test-results
    name: Test Result
    fields:
    - description: Primary key for the result record
      field_column: result_id
      is_primary_key: true
      name: Result ID
      required: true
      type: uuid
    - field_column: input_number
      name: Input Number
      required: true
      type: integer
    - field_column: doubled_number
      name: Doubled Number
      required: true
      type: integer
    - description: ID of the user who created this result
      field_column: user_id
      is_system_field: true
      name: User ID
      type: string
    - default: CURRENT_TIMESTAMP
      field_column: created_at
      is_system_field: true
      name: Created At
      type: timestamp

Step 5: Error Handling and Retry Policies

The ia_modules framework provides configurable error handling at both the pipeline and step level.

Pipeline-Level Error Handling

# In pipeline.yaml
error_handling:
  on_step_failure: stop      # Options: stop, continue, skip
  retry_policy:
    max_attempts: 2          # Number of retry attempts
    backoff: exponential     # Options: none, linear, exponential

Error Handling Options

  • stop - Halt pipeline execution immediately on step failure
  • continue - Continue to next step even if current step fails
  • skip - Skip the failed step and continue with the flow

Retry Backoff Strategies

  • none - Retry immediately without delay
  • linear - Increase delay linearly between retries
  • exponential - Double the delay between each retry attempt

Step-Level Error Handling

Handle errors gracefully within your step implementation:

async def run(self, data: dict) -> dict:
    # Validate required inputs
    if input_number is None:
        logger.error("[Step] Missing required parameter")
        return {
            "success": False,
            "error": "Missing required parameter: input_number"
        }

    # Handle type conversion errors
    try:
        number = int(input_number)
    except (ValueError, TypeError):
        logger.error(f"[Step] Invalid number format: {input_number}")
        return {
            "success": False,
            "error": f"input_number must be a number, got: {input_number}"
        }

    # Handle database errors
    try:
        result = await fiber.data.create_item(...)
    except Exception as e:
        logger.error(f"[Step] Database save failed: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": f"Database error: {str(e)}"
        }

    return {"success": True, ...}

Error Handling Best Practices

  • Always return a success flag in step results
  • Include descriptive error messages in the error field
  • Use logger.error() with exc_info=True for full stack traces
  • Gracefully handle missing services (e.g., when fiber is not available)
  • Pass through relevant data even on failure for debugging

Step 6: Flow Control and Step Routing

The flow section defines how data moves between steps and supports conditional routing.

Basic Flow Definition

flow:
  start_at: double_number    # First step to execute

  paths:
    - from: double_number    # Source step
      to: save_result        # Destination step
      condition:
        type: always         # Always follow this path

    - from: save_result
      to: end_with_success   # Special terminal state
      condition:
        type: always

Flow Elements

  • start_at - The step ID where pipeline execution begins
  • paths - Array of routing rules between steps
  • from - Source step ID
  • to - Destination step ID or terminal state
  • condition - When to follow this path

Terminal States

  • end_with_success - Pipeline completed successfully
  • end_with_failure - Pipeline completed with failure

Condition Types

# Always follow this path
condition:
  type: always

# Follow based on step output
condition:
  type: output_equals
  field: success
  value: true

# Follow on step success
condition:
  type: on_success

# Follow on step failure
condition:
  type: on_failure

Conditional Branching Example

flow:
  start_at: validate_input

  paths:
    # On successful validation, proceed to processing
    - from: validate_input
      to: process_data
      condition:
        type: output_equals
        field: success
        value: true

    # On validation failure, go to error handler
    - from: validate_input
      to: handle_error
      condition:
        type: output_equals
        field: success
        value: false

    # Success path continues to save
    - from: process_data
      to: save_result
      condition:
        type: always

    # Save completes the pipeline
    - from: save_result
      to: end_with_success
      condition:
        type: always

    # Error handler ends with failure
    - from: handle_error
      to: end_with_failure
      condition:
        type: always

Step 7: Complete Pipeline Example

Here is the complete working pipeline from the pipeline-test-app:

Project Structure

pipeline-test-app/
  app_manifest.yaml
  index.js
  pipelines/
    pipeline.yaml
  steps/
    __init__.py
    double_step.py
    save_step.py

Running the Pipeline

# Execute the pipeline with input data
fiber pipeline execute number-doubler-pipeline \
  --input '{"input_number": 5}' \
  --verbose
Executing pipeline: number-doubler-pipeline
Input: {"input_number": 5}

[Step 1] double_number: Double the Number
  Input: {"input_number": 5}
  Output: {"success": true, "input_number": 5, "doubled_number": 10}

[Step 2] save_result: Save Result to Database
  Input: {"success": true, "input_number": 5, "doubled_number": 10}
  Output: {"success": true, "result_id": "abc123...", "input_number": 5, "doubled_number": 10}

[SUCCESS] Pipeline completed successfully!
Final result:
{
  "success": true,
  "result_id": "abc123-def456-ghi789",
  "input_number": 5,
  "doubled_number": 10
}

Data Flow Visualization

Input: {"input_number": 5}
         |
         v
+-------------------+
| DoubleNumberStep  |
| - Validates input |
| - Doubles number  |
+-------------------+
         |
         | {"success": true, "input_number": 5, "doubled_number": 10}
         v
+-------------------+
| SaveResultStep    |
| - Gets fiber svc  |
| - Saves to DB     |
+-------------------+
         |
         | {"success": true, "result_id": "...", ...}
         v
   end_with_success

Step 8: Testing and Debugging

Logging Configuration

Use Python's logging module for debugging:

import logging

logger = logging.getLogger(__name__)

class MyStep(Step):
    async def run(self, data: dict) -> dict:
        logger.info(f"[MyStep] Starting with input data: {data}")

        # Log important values
        logger.info(f"[MyStep] Extracted value: {value}")

        # Log errors with stack traces
        logger.error(f"[MyStep] Operation failed: {str(e)}", exc_info=True)

        logger.info(f"[MyStep] Returning result: {result}")
        return result

Testing Steps Independently

# test_steps.py
import asyncio
from steps.double_step import DoubleNumberStep

async def test_double_step():
    step = DoubleNumberStep()

    # Test valid input
    result = await step.run({"input_number": 5})
    assert result["success"] == True
    assert result["doubled_number"] == 10

    # Test missing input
    result = await step.run({})
    assert result["success"] == False
    assert "error" in result

    # Test invalid input
    result = await step.run({"input_number": "not a number"})
    assert result["success"] == False

asyncio.run(test_double_step())

Viewing Pipeline Execution History

# List recent pipeline executions
fiber pipeline list-executions --pipeline number-doubler-pipeline

# View specific execution details
fiber pipeline show-execution <execution-id> --verbose

Next Steps

You have learned how to build data processing pipelines using the ia_modules execution engine. Here are some next steps to explore:

Summary

You have successfully learned how to:

  • Configure pipelines in app_manifest.yaml with the ia_modules execution engine
  • Define pipeline steps, flow, and error handling in YAML
  • Create custom Step classes that extend the base Step class
  • Access FiberApp services for database persistence
  • Implement proper error handling and retry policies
  • Define flow control and conditional routing between steps

You are now ready to build sophisticated data processing pipelines with Fiberwise!