Aggregate Tasks
Aggregate tasks collect and combine results from multiple predecessor tasks, enabling sophisticated data transformation and result synthesis. They serve as collection points in workflows where outputs from different task branches need to be merged, calculated, or restructured.
Overview
Aggregate tasks provide powerful result combination capabilities:
- Result Consolidation
Merge outputs from multiple tasks into unified structures
- Data Transformation
Apply complex transformations and calculations
- Conditional Logic
Use dynamic conditions to shape output based on task results
- Template Expressions
Leverage full template power for result manipulation
- Hierarchical Results
Access outputs from nested task structures
- Performance Optimization
Efficient result processing without additional execution overhead
Task Structure
Basic Aggregate Task
id: combine-results
type: aggregate
outputs:
# Simple field mapping
total_count: "{{ add .tasks.task1.output.count .tasks.task2.output.count }}"
# Complex object construction
summary:
user_data: "{{ .tasks.user_analysis.output }}"
preferences: "{{ .tasks.preference_analysis.output }}"
recommendations: "{{ .tasks.generate_recommendations.output }}"
# Conditional outputs
status: "{{ if gt .tasks.validation.output.errors 0 }}needs_review{{ else }}approved{{ end }}"
# Array operations
all_results: "{{ concat .tasks.task1.output.items .tasks.task2.output.items }}"
on_success:
next: final-processing
Key Features
Combine results from multiple tasks:
id: merge-user-data
type: aggregate
outputs:
# Merge user profile information
complete_profile:
# Basic information
id: "{{ .tasks.fetch_user.output.id }}"
name: "{{ .tasks.fetch_user.output.name }}"
email: "{{ .tasks.fetch_user.output.email }}"
# Enhanced data from different sources
preferences: "{{ .tasks.get_preferences.output }}"
activity_history: "{{ .tasks.get_activity.output }}"
recommendations: "{{ .tasks.generate_recommendations.output }}"
# Computed fields
profile_completeness: "{{ .tasks.analyze_profile.output.completeness_score }}"
last_updated: "{{ now }}"
# Merge with conflict resolution
contact_info:
primary_email: "{{ .tasks.fetch_user.output.email }}"
secondary_email: "{{ .tasks.get_preferences.output.backup_email | default .tasks.fetch_user.output.email }}"
phone: "{{ .tasks.fetch_user.output.phone | default .tasks.get_preferences.output.phone }}"
# Validation status
email_verified: "{{ and .tasks.fetch_user.output.email_verified .tasks.validate_email.output.valid }}"
phone_verified: "{{ .tasks.validate_phone.output.valid | default false }}"
# Metadata about the aggregation
aggregation_metadata:
sources_used: ["fetch_user", "get_preferences", "get_activity", "generate_recommendations"]
aggregation_time: "{{ now }}"
data_freshness:
user_data: "{{ .tasks.fetch_user.output.last_updated }}"
preferences: "{{ .tasks.get_preferences.output.last_updated }}"
activity: "{{ .tasks.get_activity.output.last_updated }}"
Access Patterns
Task Output Access
Access results from different task types:
id: access-patterns
type: aggregate
outputs:
# Basic task outputs
basic_result: "{{ .tasks.basic_task.output }}"
# Parallel task outputs
parallel_results:
task1: "{{ .tasks.parallel_task.output.task1 }}"
task2: "{{ .tasks.parallel_task.output.task2 }}"
all_parallel: "{{ .tasks.parallel_task.output }}"
# Collection task outputs
collection_summary:
processed_items: "{{ .tasks.collection_task.output }}"
item_count: "{{ len .tasks.collection_task.output }}"
successful_items: "{{ filter .tasks.collection_task.output 'status' 'eq' 'success' }}"
# Composite task outputs
composite_results:
subtask1: "{{ .tasks.composite_task.output.subtask1 }}"
subtask2: "{{ .tasks.composite_task.output.subtask2 }}"
all_subtasks: "{{ .tasks.composite_task.output }}"
# Nested task access
nested_data: "{{ .tasks.parent_task.children.child_task.output }}"
Complex Data Structures
Handle complex nested structures:
id: complex-structures
type: aggregate
outputs:
# Nested object construction
comprehensive_report:
executive_summary:
total_revenue: "{{ .tasks.financial_analysis.output.revenue }}"
growth_rate: "{{ .tasks.growth_analysis.output.rate }}"
key_metrics: "{{ .tasks.kpi_analysis.output.metrics }}"
detailed_analysis:
financial_breakdown: "{{ .tasks.financial_analysis.output }}"
market_analysis: "{{ .tasks.market_analysis.output }}"
competitor_analysis: "{{ .tasks.competitor_analysis.output }}"
recommendations:
short_term: "{{ .tasks.recommendations.output.short_term }}"
long_term: "{{ .tasks.recommendations.output.long_term }}"
risk_mitigation: "{{ .tasks.risk_analysis.output.mitigation_strategies }}"
appendices:
raw_data: "{{ .tasks.data_collection.output }}"
methodology: "{{ .tasks.analysis_metadata.output.methodology }}"
assumptions: "{{ .tasks.analysis_metadata.output.assumptions }}"
# Dynamic structure based on available data
flexible_output: |
{{
$output := dict
if .tasks.user_data.output
$output = set $output "user_info" .tasks.user_data.output
end
if .tasks.preferences.output
$output = set $output "preferences" .tasks.preferences.output
end
if .tasks.activity.output
$output = set $output "activity" .tasks.activity.output
end
$output
}}
Best Practices
- Access Task Results Efficiently
Use dot notation for simple access, complex expressions for transformations
- Handle Missing Data
Always check for null/undefined values before processing
- Use Conditional Logic
Implement fallbacks and default values for robust operation
- Optimize Complex Operations
Cache frequently accessed values
- Structure Output Logically
Organize outputs in a way that's easy for downstream tasks to consume
- Document Complex Logic
Use comments in template expressions for complex calculations
References
Aggregate tasks provide powerful capabilities for combining and transforming results from multiple tasks, enabling sophisticated data processing patterns in your Compozy workflows.
Router Tasks
Implement conditional branching and intelligent workflow routing with router tasks in Compozy workflows
Collection Tasks
Collection tasks provide powerful iteration patterns for processing arrays and collections in Compozy workflows. They transform array data into parallel or sequential task executions, enabling efficient batch processing with sophisticated filtering, error handling, and result aggregation capabilities.