Tasks

Aggregate Tasks

Aggregate tasks collect and combine results from multiple predecessor tasks, enabling sophisticated data transformation and result synthesis. They serve as collection points in workflows where outputs from different task branches need to be merged, calculated, or restructured.

Overview

Aggregate tasks provide powerful result combination capabilities:

  • Result Consolidation

    Merge outputs from multiple tasks into unified structures

  • Data Transformation

    Apply complex transformations and calculations

  • Conditional Logic

    Use dynamic conditions to shape output based on task results

  • Template Expressions

    Leverage full template power for result manipulation

  • Hierarchical Results

    Access outputs from nested task structures

  • Performance Optimization

    Efficient result processing without additional execution overhead

Task Structure

Basic Aggregate Task

id: combine-results
type: aggregate

outputs:
  # Simple field mapping
  total_count: "{{ add .tasks.task1.output.count .tasks.task2.output.count }}"

  # Complex object construction
  summary:
    user_data: "{{ .tasks.user_analysis.output }}"
    preferences: "{{ .tasks.preference_analysis.output }}"
    recommendations: "{{ .tasks.generate_recommendations.output }}"

  # Conditional outputs
  status: "{{ if gt .tasks.validation.output.errors 0 }}needs_review{{ else }}approved{{ end }}"

  # Array operations
  all_results: "{{ concat .tasks.task1.output.items .tasks.task2.output.items }}"

on_success:
  next: final-processing

Key Features

Combine results from multiple tasks:

id: merge-user-data
type: aggregate

outputs:
  # Merge user profile information
  complete_profile:
    # Basic information
    id: "{{ .tasks.fetch_user.output.id }}"
    name: "{{ .tasks.fetch_user.output.name }}"
    email: "{{ .tasks.fetch_user.output.email }}"

    # Enhanced data from different sources
    preferences: "{{ .tasks.get_preferences.output }}"
    activity_history: "{{ .tasks.get_activity.output }}"
    recommendations: "{{ .tasks.generate_recommendations.output }}"

    # Computed fields
    profile_completeness: "{{ .tasks.analyze_profile.output.completeness_score }}"
    last_updated: "{{ now }}"

  # Merge with conflict resolution
  contact_info:
    primary_email: "{{ .tasks.fetch_user.output.email }}"
    secondary_email: "{{ .tasks.get_preferences.output.backup_email | default .tasks.fetch_user.output.email }}"
    phone: "{{ .tasks.fetch_user.output.phone | default .tasks.get_preferences.output.phone }}"

    # Validation status
    email_verified: "{{ and .tasks.fetch_user.output.email_verified .tasks.validate_email.output.valid }}"
    phone_verified: "{{ .tasks.validate_phone.output.valid | default false }}"

  # Metadata about the aggregation
  aggregation_metadata:
    sources_used: ["fetch_user", "get_preferences", "get_activity", "generate_recommendations"]
    aggregation_time: "{{ now }}"
    data_freshness:
      user_data: "{{ .tasks.fetch_user.output.last_updated }}"
      preferences: "{{ .tasks.get_preferences.output.last_updated }}"
      activity: "{{ .tasks.get_activity.output.last_updated }}"

Access Patterns

Task Output Access

Access results from different task types:

id: access-patterns
type: aggregate

outputs:
  # Basic task outputs
  basic_result: "{{ .tasks.basic_task.output }}"

  # Parallel task outputs
  parallel_results:
    task1: "{{ .tasks.parallel_task.output.task1 }}"
    task2: "{{ .tasks.parallel_task.output.task2 }}"
    all_parallel: "{{ .tasks.parallel_task.output }}"

  # Collection task outputs
  collection_summary:
    processed_items: "{{ .tasks.collection_task.output }}"
    item_count: "{{ len .tasks.collection_task.output }}"
    successful_items: "{{ filter .tasks.collection_task.output 'status' 'eq' 'success' }}"

  # Composite task outputs
  composite_results:
    subtask1: "{{ .tasks.composite_task.output.subtask1 }}"
    subtask2: "{{ .tasks.composite_task.output.subtask2 }}"
    all_subtasks: "{{ .tasks.composite_task.output }}"

  # Nested task access
  nested_data: "{{ .tasks.parent_task.children.child_task.output }}"

Complex Data Structures

Handle complex nested structures:

id: complex-structures
type: aggregate

outputs:
  # Nested object construction
  comprehensive_report:
    executive_summary:
      total_revenue: "{{ .tasks.financial_analysis.output.revenue }}"
      growth_rate: "{{ .tasks.growth_analysis.output.rate }}"
      key_metrics: "{{ .tasks.kpi_analysis.output.metrics }}"

    detailed_analysis:
      financial_breakdown: "{{ .tasks.financial_analysis.output }}"
      market_analysis: "{{ .tasks.market_analysis.output }}"
      competitor_analysis: "{{ .tasks.competitor_analysis.output }}"

    recommendations:
      short_term: "{{ .tasks.recommendations.output.short_term }}"
      long_term: "{{ .tasks.recommendations.output.long_term }}"
      risk_mitigation: "{{ .tasks.risk_analysis.output.mitigation_strategies }}"

    appendices:
      raw_data: "{{ .tasks.data_collection.output }}"
      methodology: "{{ .tasks.analysis_metadata.output.methodology }}"
      assumptions: "{{ .tasks.analysis_metadata.output.assumptions }}"

  # Dynamic structure based on available data
  flexible_output: |
    {{
      $output := dict
      if .tasks.user_data.output
        $output = set $output "user_info" .tasks.user_data.output
      end
      if .tasks.preferences.output
        $output = set $output "preferences" .tasks.preferences.output
      end
      if .tasks.activity.output
        $output = set $output "activity" .tasks.activity.output
      end
      $output
    }}

Best Practices

  • Access Task Results Efficiently

    Use dot notation for simple access, complex expressions for transformations

  • Handle Missing Data

    Always check for null/undefined values before processing

  • Use Conditional Logic

    Implement fallbacks and default values for robust operation

  • Optimize Complex Operations

    Cache frequently accessed values

  • Structure Output Logically

    Organize outputs in a way that's easy for downstream tasks to consume

  • Document Complex Logic

    Use comments in template expressions for complex calculations

References

Aggregate tasks provide powerful capabilities for combining and transforming results from multiple tasks, enabling sophisticated data processing patterns in your Compozy workflows.