Pipeline Development Guide
Learn to build data processing pipelines using the ia_modules execution engine. This guide covers YAML pipeline definitions, custom Step classes, database persistence, error handling, and flow control.
What You Will Build
A complete data processing pipeline using the ia_modules execution engine. You will create custom Step classes, define pipeline flow in YAML, and persist results to the database.
What You Will Learn
- The ia_modules execution engine architecture
- YAML pipeline definition format with real examples
- Creating custom Step classes that extend the base Step
- Database persistence from pipeline steps using FiberApp services
- Error handling and retry policies
- Flow control and step routing
Key Concepts
Pipeline Architecture
- Execution Engine - The ia_modules framework that runs your pipelines
- Step Classes - Python classes that implement individual processing steps
- Flow Control - YAML-defined routing between steps
- Service Injection - Access to FiberApp services within steps
Pipeline Components
- Pipeline YAML definition with execution configuration
- Step modules with async run methods
- Data models for persistence
- Error handling and retry policies
📋 Prerequisites: Your Setup Checklist
Before you begin, you need a fully configured Fiberwise environment. This is the foundation for building any app on the platform.
🔧 Required Setup
✅ All Set?
Once all boxes are checked, you are ready to proceed. If not, please complete the linked guides first.
Step 1: Understanding the ia_modules Execution Engine
The ia_modules execution engine is the core framework for running pipelines in Fiberwise. It provides:
- Step orchestration - Manages the execution order of pipeline steps
- Service injection - Provides access to FiberApp services within steps
- Data flow - Passes data between steps automatically
- Error handling - Configurable retry and failure policies
Pipeline Registration in app_manifest.yaml
Pipelines are registered in your app manifest with execution engine configuration:
# app_manifest.yaml
app:
app_slug: pipeline-test-app
name: Pipeline Test App (ia_modules - REAL API)
version: 3.0.7
category: simple
description: Pipeline test app
entryPoint: index.js
icon: fas fa-calculator
publisher: FiberWise
pipelines:
- name: Number Doubler Pipeline
slug: number-doubler-pipeline
version: 3.0.0
description: Simple 2-step pipeline using ia_modules execution engine
execution_engine: ia_modules
pipeline_definition: pipelines/pipeline.yaml
engine_config:
timeout: 30
max_retries: 1
continue_on_error: false
Key Configuration Options
execution_engine: ia_modules- Specifies the ia_modules frameworkpipeline_definition- Path to the YAML pipeline definition fileengine_config.timeout- Maximum execution time in secondsengine_config.max_retries- Number of retry attempts on failureengine_config.continue_on_error- Whether to continue if a step fails
Step 2: YAML Pipeline Definition Format
The pipeline definition file describes the steps, their configuration, execution flow, and error handling. Here is a complete working example:
# pipelines/pipeline.yaml
name: number_doubler_pipeline
version: "2.0.0"
description: Simple 2-step pipeline using ia_modules framework
# Execution configuration
execution:
engine: ia_modules
timeout: 30
max_retries: 1
# Step definitions
steps:
- id: double_number
name: Double the Number
module: steps
step_class: DoubleNumberStep
config:
description: Double a number by multiplying it by 2
- id: save_result
name: Save Result to Database
module: steps
step_class: SaveResultStep
config:
model: test-results
description: Save the doubled number to database
# Execution flow
flow:
start_at: double_number
paths:
- from: double_number
to: save_result
condition:
type: always
- from: save_result
to: end_with_success
condition:
type: always
# Error handling
error_handling:
on_step_failure: stop
retry_policy:
max_attempts: 2
backoff: exponential
Pipeline Definition Sections
Metadata Section
name: number_doubler_pipeline
version: "2.0.0"
description: Simple 2-step pipeline using ia_modules framework
Execution Section
execution:
engine: ia_modules
timeout: 30
max_retries: 1
Steps Section
Each step definition includes:
id- Unique identifier for the step (used in flow routing)name- Human-readable name for logging and displaymodule- Python module containing the step classstep_class- Name of the Step subclass to instantiateconfig- Step-specific configuration options
steps:
- id: double_number
name: Double the Number
module: steps
step_class: DoubleNumberStep
config:
description: Double a number by multiplying it by 2
Step 3: Creating Custom Step Classes
Step classes extend the base Step class from ia_modules.pipeline.core and implement an async run method.
Basic Step Structure
# steps/double_step.py
import logging
from ia_modules.pipeline.core import Step
logger = logging.getLogger(__name__)
class DoubleNumberStep(Step):
"""Step that doubles an input number."""
async def run(self, data: dict) -> dict:
"""
Double the input number.
Args:
data: Input data containing 'input_number' or 'number'
Returns:
dict with input_number and doubled_number
"""
logger.info(f"[DoubleNumberStep] Starting with input data: {data}")
# Accept either 'input_number' or 'number' as input
input_number = data.get("input_number") or data.get("number")
if input_number is None:
logger.error("[DoubleNumberStep] Missing required parameter: input_number or number")
return {
"success": False,
"error": "Missing required parameter: input_number or number"
}
logger.info(f"[DoubleNumberStep] Extracted input_number: {input_number}")
# Validate and convert
try:
number = int(input_number)
logger.info(f"[DoubleNumberStep] Converted to integer: {number}")
except (ValueError, TypeError):
logger.error(f"[DoubleNumberStep] Invalid number format: {input_number}")
return {
"success": False,
"error": f"input_number must be a number, got: {input_number}"
}
# Double it
doubled = number * 2
logger.info(f"[DoubleNumberStep] Calculated result: {number} * 2 = {doubled}")
result = {
"success": True,
"input_number": number,
"doubled_number": doubled
}
logger.info(f"[DoubleNumberStep] Returning result: {result}")
return result
Step Class Requirements
- Extend the
Stepbase class fromia_modules.pipeline.core - Implement an async
run(self, data: dict) -> dictmethod - Return a dictionary with results (include
successflag for error handling) - Use logging for debugging and monitoring
Input Handling Best Practices
- Accept multiple input field names for flexibility (
input_numberornumber) - Validate required inputs and return clear error messages
- Use type conversion with proper exception handling
- Log input data for debugging
Step 4: Database Persistence from Pipeline Steps
Steps can access FiberApp services through the injected services object. This enables database operations, API calls, and more.
Step with Database Persistence
# steps/save_step.py
import logging
from ia_modules.pipeline.core import Step
logger = logging.getLogger(__name__)
class SaveResultStep(Step):
"""Step that saves the doubled number result to the database."""
async def run(self, data: dict) -> dict:
"""
Save the result to the test-results model.
Args:
data: Input data from previous step containing input_number and doubled_number
Returns:
dict with result_id and success status
"""
logger.info(f"[SaveResultStep] Starting with input data: {data}")
# Access FiberApp through services (injected by framework)
fiber = self.services.get('fiber') if hasattr(self, 'services') and self.services else None
logger.info(f"[SaveResultStep] FiberApp available: {fiber is not None}")
input_number = data.get("input_number")
doubled_number = data.get("doubled_number")
logger.info(f"[SaveResultStep] Extracted values - input: {input_number}, doubled: {doubled_number}")
if fiber:
try:
logger.info(f"[SaveResultStep] Attempting to save to database via FiberApp")
result = await fiber.data.create_item(
model_id='test-results',
data={
'input_number': input_number,
'doubled_number': doubled_number
}
)
logger.info(f"[SaveResultStep] Successfully saved to database, result_id: {result.get('item_id')}")
return {
"success": True,
"result_id": result.get('item_id'),
"input_number": input_number,
"doubled_number": doubled_number
}
except Exception as e:
logger.error(f"[SaveResultStep] Database save failed: {str(e)}", exc_info=True)
return {
"success": False,
"error": f"Database error: {str(e)}",
"input_number": input_number,
"doubled_number": doubled_number
}
# No fiber available - return data without saving
logger.warning("[SaveResultStep] FiberApp not available - skipping database save")
return {
"success": True,
"result_id": None,
"warning": "Fiber not available - result not saved to database",
"input_number": input_number,
"doubled_number": doubled_number
}
Accessing Injected Services
The ia_modules framework injects services into steps through the self.services dictionary:
# Access FiberApp services
fiber = self.services.get('fiber') if hasattr(self, 'services') and self.services else None
# Use the data service to create records
result = await fiber.data.create_item(
model_id='test-results',
data={
'input_number': input_number,
'doubled_number': doubled_number
}
)
Available Services
fiber.data- Database operations (create_item, get_item, update_item, delete_item)fiber.config- Application configuration accessfiber.auth- Authentication and user context
Data Model Definition
Define the data model in your app_manifest.yaml for database persistence:
# In app_manifest.yaml
app:
models:
- description: Test results from pipeline execution
model_slug: test-results
name: Test Result
fields:
- description: Primary key for the result record
field_column: result_id
is_primary_key: true
name: Result ID
required: true
type: uuid
- field_column: input_number
name: Input Number
required: true
type: integer
- field_column: doubled_number
name: Doubled Number
required: true
type: integer
- description: ID of the user who created this result
field_column: user_id
is_system_field: true
name: User ID
type: string
- default: CURRENT_TIMESTAMP
field_column: created_at
is_system_field: true
name: Created At
type: timestamp
Step 5: Error Handling and Retry Policies
The ia_modules framework provides configurable error handling at both the pipeline and step level.
Pipeline-Level Error Handling
# In pipeline.yaml
error_handling:
on_step_failure: stop # Options: stop, continue, skip
retry_policy:
max_attempts: 2 # Number of retry attempts
backoff: exponential # Options: none, linear, exponential
Error Handling Options
stop- Halt pipeline execution immediately on step failurecontinue- Continue to next step even if current step failsskip- Skip the failed step and continue with the flow
Retry Backoff Strategies
none- Retry immediately without delaylinear- Increase delay linearly between retriesexponential- Double the delay between each retry attempt
Step-Level Error Handling
Handle errors gracefully within your step implementation:
async def run(self, data: dict) -> dict:
# Validate required inputs
if input_number is None:
logger.error("[Step] Missing required parameter")
return {
"success": False,
"error": "Missing required parameter: input_number"
}
# Handle type conversion errors
try:
number = int(input_number)
except (ValueError, TypeError):
logger.error(f"[Step] Invalid number format: {input_number}")
return {
"success": False,
"error": f"input_number must be a number, got: {input_number}"
}
# Handle database errors
try:
result = await fiber.data.create_item(...)
except Exception as e:
logger.error(f"[Step] Database save failed: {str(e)}", exc_info=True)
return {
"success": False,
"error": f"Database error: {str(e)}"
}
return {"success": True, ...}
Error Handling Best Practices
- Always return a
successflag in step results - Include descriptive error messages in the
errorfield - Use
logger.error()withexc_info=Truefor full stack traces - Gracefully handle missing services (e.g., when fiber is not available)
- Pass through relevant data even on failure for debugging
Step 6: Flow Control and Step Routing
The flow section defines how data moves between steps and supports conditional routing.
Basic Flow Definition
flow:
start_at: double_number # First step to execute
paths:
- from: double_number # Source step
to: save_result # Destination step
condition:
type: always # Always follow this path
- from: save_result
to: end_with_success # Special terminal state
condition:
type: always
Flow Elements
start_at- The step ID where pipeline execution beginspaths- Array of routing rules between stepsfrom- Source step IDto- Destination step ID or terminal statecondition- When to follow this path
Terminal States
end_with_success- Pipeline completed successfullyend_with_failure- Pipeline completed with failure
Condition Types
# Always follow this path
condition:
type: always
# Follow based on step output
condition:
type: output_equals
field: success
value: true
# Follow on step success
condition:
type: on_success
# Follow on step failure
condition:
type: on_failure
Conditional Branching Example
flow:
start_at: validate_input
paths:
# On successful validation, proceed to processing
- from: validate_input
to: process_data
condition:
type: output_equals
field: success
value: true
# On validation failure, go to error handler
- from: validate_input
to: handle_error
condition:
type: output_equals
field: success
value: false
# Success path continues to save
- from: process_data
to: save_result
condition:
type: always
# Save completes the pipeline
- from: save_result
to: end_with_success
condition:
type: always
# Error handler ends with failure
- from: handle_error
to: end_with_failure
condition:
type: always
Step 7: Complete Pipeline Example
Here is the complete working pipeline from the pipeline-test-app:
Project Structure
pipeline-test-app/
app_manifest.yaml
index.js
pipelines/
pipeline.yaml
steps/
__init__.py
double_step.py
save_step.py
Running the Pipeline
# Execute the pipeline with input data
fiber pipeline execute number-doubler-pipeline \
--input '{"input_number": 5}' \
--verbose
Executing pipeline: number-doubler-pipeline
Input: {"input_number": 5}
[Step 1] double_number: Double the Number
Input: {"input_number": 5}
Output: {"success": true, "input_number": 5, "doubled_number": 10}
[Step 2] save_result: Save Result to Database
Input: {"success": true, "input_number": 5, "doubled_number": 10}
Output: {"success": true, "result_id": "abc123...", "input_number": 5, "doubled_number": 10}
[SUCCESS] Pipeline completed successfully!
Final result:
{
"success": true,
"result_id": "abc123-def456-ghi789",
"input_number": 5,
"doubled_number": 10
}
Data Flow Visualization
Input: {"input_number": 5}
|
v
+-------------------+
| DoubleNumberStep |
| - Validates input |
| - Doubles number |
+-------------------+
|
| {"success": true, "input_number": 5, "doubled_number": 10}
v
+-------------------+
| SaveResultStep |
| - Gets fiber svc |
| - Saves to DB |
+-------------------+
|
| {"success": true, "result_id": "...", ...}
v
end_with_success
Step 8: Testing and Debugging
Logging Configuration
Use Python's logging module for debugging:
import logging
logger = logging.getLogger(__name__)
class MyStep(Step):
async def run(self, data: dict) -> dict:
logger.info(f"[MyStep] Starting with input data: {data}")
# Log important values
logger.info(f"[MyStep] Extracted value: {value}")
# Log errors with stack traces
logger.error(f"[MyStep] Operation failed: {str(e)}", exc_info=True)
logger.info(f"[MyStep] Returning result: {result}")
return result
Testing Steps Independently
# test_steps.py
import asyncio
from steps.double_step import DoubleNumberStep
async def test_double_step():
step = DoubleNumberStep()
# Test valid input
result = await step.run({"input_number": 5})
assert result["success"] == True
assert result["doubled_number"] == 10
# Test missing input
result = await step.run({})
assert result["success"] == False
assert "error" in result
# Test invalid input
result = await step.run({"input_number": "not a number"})
assert result["success"] == False
asyncio.run(test_double_step())
Viewing Pipeline Execution History
# List recent pipeline executions
fiber pipeline list-executions --pipeline number-doubler-pipeline
# View specific execution details
fiber pipeline show-execution <execution-id> --verbose
Next Steps
You have learned how to build data processing pipelines using the ia_modules execution engine. Here are some next steps to explore:
Summary
You have successfully learned how to:
- Configure pipelines in app_manifest.yaml with the ia_modules execution engine
- Define pipeline steps, flow, and error handling in YAML
- Create custom Step classes that extend the base Step class
- Access FiberApp services for database persistence
- Implement proper error handling and retry policies
- Define flow control and conditional routing between steps
You are now ready to build sophisticated data processing pipelines with Fiberwise!