agents.common.models.task_analysis.parallelizationΒΆ
Parallelization analysis for task execution planning.
This module analyzes task dependencies to identify parallelization opportunities, execution phases, join points, and optimal execution strategies.
ClassesΒΆ
Represents a phase in the overall task execution plan. |
|
Strategies for executing parallel tasks. |
|
Represents a point where multiple parallel tasks must synchronize. |
|
Represents a group of tasks that can execute in parallel. |
|
Complete analysis of parallelization opportunities for a task. |
|
Analyzer for identifying parallelization opportunities in tasks. |
Module ContentsΒΆ
- class agents.common.models.task_analysis.parallelization.ExecutionPhase(/, **data)ΒΆ
Bases:
pydantic.BaseModel
Represents a phase in the overall task execution plan.
Execution phases organize the task execution into sequential stages, where each phase must complete before the next phase can begin.
- Parameters:
data (Any)
- phase_numberΒΆ
Sequential phase number
- nameΒΆ
Descriptive name for this phase
- parallel_groupsΒΆ
Groups of tasks that can run in parallel within this phase
- dependenciesΒΆ
What this phase depends on
- estimated_duration_minutesΒΆ
Total time for this phase
- critical_path_tasksΒΆ
Tasks on the critical path within this phase
Example
phase = ExecutionPhase( phase_number=1, name="Data Collection Phase", parallel_groups=[research_group, survey_group], estimated_duration_minutes=180 )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_sequential_duration()ΒΆ
Calculate duration if all tasks ran sequentially.
- Returns:
Sequential execution duration in minutes
- Return type:
- get_max_parallelism()ΒΆ
Get maximum number of tasks that can run simultaneously.
- Returns:
Maximum parallelism level
- Return type:
- get_parallelization_benefit()ΒΆ
Calculate benefit from parallelization as a ratio.
- Returns:
Parallelization benefit (sequential_time / parallel_time)
- Return type:
- get_total_task_count()ΒΆ
Get total number of tasks across all parallel groups.
- Returns:
Total task count
- Return type:
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.common.models.task_analysis.parallelization.ExecutionStrategyΒΆ
-
Strategies for executing parallel tasks.
- SEQUENTIALΒΆ
Execute all tasks one after another
- MAX_PARALLELΒΆ
Execute as many tasks in parallel as possible
- RESOURCE_CONSTRAINEDΒΆ
Parallel execution limited by resource availability
- PRIORITY_BASEDΒΆ
Execute high-priority tasks first, parallelize when possible
- BALANCEDΒΆ
Balance between parallelization and resource usage
Initialize self. See help(type(self)) for accurate signature.
- class agents.common.models.task_analysis.parallelization.JoinPoint(/, **data)ΒΆ
Bases:
pydantic.BaseModel
Represents a point where multiple parallel tasks must synchronize.
Join points are critical for understanding where parallel execution must wait for all dependencies to complete before proceeding.
- Parameters:
data (Any)
- idΒΆ
Unique identifier for this join point
- nameΒΆ
Descriptive name for the join point
- input_task_idsΒΆ
IDs of tasks that must complete before this join
- output_task_idsΒΆ
IDs of tasks that can start after this join
- join_typeΒΆ
Type of join operation
- estimated_wait_timeΒΆ
Expected time to wait for all inputs
- is_critical_pathΒΆ
Whether this join point is on the critical path
Example
join_point = JoinPoint( id="analysis_join", name="Combine Analysis Results", input_task_ids=["data_collection", "background_research"], output_task_ids=["final_report"], join_type="synchronous" )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- get_input_count()ΒΆ
Get the number of input tasks for this join point.
- Returns:
Number of input tasks
- Return type:
- get_output_count()ΒΆ
Get the number of output tasks from this join point.
- Returns:
Number of output tasks
- Return type:
- is_merge_point()ΒΆ
Check if this is a merge point (multiple inputs, single output).
- Returns:
True if multiple inputs merge to single output
- Return type:
- is_split_point()ΒΆ
Check if this is a split point (single input, multiple outputs).
- Returns:
True if single input splits to multiple outputs
- Return type:
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.common.models.task_analysis.parallelization.ParallelGroup(/, **data)ΒΆ
Bases:
pydantic.BaseModel
Represents a group of tasks that can execute in parallel.
Parallel groups identify sets of tasks that have no blocking dependencies between them and can therefore run simultaneously.
- Parameters:
data (Any)
- group_idΒΆ
Unique identifier for this parallel group
- task_idsΒΆ
IDs of tasks in this parallel group
- estimated_duration_minutesΒΆ
Time for the longest task in the group
- resource_requirementsΒΆ
Combined resource requirements
- can_be_interleavedΒΆ
Whether tasks can be interleaved or must run fully parallel
- priorityΒΆ
Priority level for this group
- phaseΒΆ
Execution phase this group belongs to
Example
parallel_group = ParallelGroup( group_id="research_phase", task_ids=["web_research", "library_research", "expert_interviews"], estimated_duration_minutes=120, resource_requirements={"researchers": 3, "internet": True} )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_actual_duration(sequential_duration)ΒΆ
Calculate actual duration considering parallelization.
- get_task_count()ΒΆ
Get the number of tasks in this parallel group.
- Returns:
Number of tasks in the group
- Return type:
- get_theoretical_speedup()ΒΆ
Calculate theoretical speedup from parallelization.
- Returns:
Theoretical speedup factor
- Return type:
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.common.models.task_analysis.parallelization.ParallelizationAnalysis(/, **data)ΒΆ
Bases:
pydantic.BaseModel
Complete analysis of parallelization opportunities for a task.
This is the main result of parallelization analysis, containing all the information needed to optimize task execution.
- Parameters:
data (Any)
- execution_phasesΒΆ
Sequential phases of execution
- parallel_groupsΒΆ
All identified parallel groups
- join_pointsΒΆ
Critical synchronization points
- critical_pathΒΆ
Tasks on the critical path
- execution_strategyΒΆ
Recommended execution strategy
- estimated_speedupΒΆ
Expected speedup from parallelization
- resource_requirementsΒΆ
Peak resource requirements
- bottlenecksΒΆ
Identified bottlenecks and constraints
Example
analysis = ParallelizationAnalysis( execution_phases=[phase1, phase2, phase3], parallel_groups=[group1, group2], join_points=[join1, join2], critical_path=["task_1", "task_3", "task_5"], estimated_speedup=2.5 )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- calculate_time_savings()ΒΆ
Calculate time savings from parallelization.
- Returns:
Time savings in minutes
- Return type:
- get_critical_path_duration()ΒΆ
Get duration of the critical path.
- Returns:
Critical path duration in minutes
- Return type:
- get_efficiency_percentage()ΒΆ
Get parallelization efficiency as a percentage.
- Returns:
Efficiency percentage (0-100)
- Return type:
- get_max_parallelism()ΒΆ
Get maximum parallelism across all phases.
- Returns:
Maximum number of tasks that can run simultaneously
- Return type:
- get_total_phases()ΒΆ
Get total number of execution phases.
- Returns:
Number of phases
- Return type:
- is_worth_parallelizing(min_speedup=1.2)ΒΆ
Determine if parallelization is worthwhile.
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.common.models.task_analysis.parallelization.ParallelizationAnalyzer(/, **data)ΒΆ
Bases:
pydantic.BaseModel
Analyzer for identifying parallelization opportunities in tasks.
This class performs sophisticated analysis of task dependencies to identify optimal parallelization strategies, execution phases, and resource requirements.
- Parameters:
data (Any)
- max_parallel_tasksΒΆ
Maximum number of tasks to run in parallel
- resource_constraintsΒΆ
Resource limitations that affect parallelization
- prefer_balanced_groupsΒΆ
Whether to prefer balanced parallel groups
- include_coordination_overheadΒΆ
Whether to include coordination overhead
Example
analyzer = ParallelizationAnalyzer( max_parallel_tasks=8, resource_constraints={"cpu_cores": 4, "memory_gb": 16} ) analysis = analyzer.analyze_task(complex_task) print(f"Recommended speedup: {analysis.estimated_speedup:.1f}x")
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- analyze_task(task)ΒΆ
Analyze a task for parallelization opportunities.
- Parameters:
task (haive.agents.common.models.task_analysis.base.Task) β Task to analyze
- Returns:
Complete parallelization analysis
- Return type:
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].