agents.common.models.task_analysis.parallelizationΒΆ

Parallelization analysis for task execution planning.

This module analyzes task dependencies to identify parallelization opportunities, execution phases, join points, and optimal execution strategies.

ClassesΒΆ

ExecutionPhase

Represents a phase in the overall task execution plan.

ExecutionStrategy

Strategies for executing parallel tasks.

JoinPoint

Represents a point where multiple parallel tasks must synchronize.

ParallelGroup

Represents a group of tasks that can execute in parallel.

ParallelizationAnalysis

Complete analysis of parallelization opportunities for a task.

ParallelizationAnalyzer

Analyzer for identifying parallelization opportunities in tasks.

Module ContentsΒΆ

class agents.common.models.task_analysis.parallelization.ExecutionPhase(/, **data)ΒΆ

Bases: pydantic.BaseModel

Represents a phase in the overall task execution plan.

Execution phases organize the task execution into sequential stages, where each phase must complete before the next phase can begin.

Parameters:

data (Any)

phase_numberΒΆ

Sequential phase number

nameΒΆ

Descriptive name for this phase

parallel_groupsΒΆ

Groups of tasks that can run in parallel within this phase

dependenciesΒΆ

What this phase depends on

estimated_duration_minutesΒΆ

Total time for this phase

critical_path_tasksΒΆ

Tasks on the critical path within this phase

Example

phase = ExecutionPhase(
phase_number=1,
name="Data Collection Phase",
parallel_groups=[research_group, survey_group],
estimated_duration_minutes=180
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

calculate_sequential_duration()ΒΆ

Calculate duration if all tasks ran sequentially.

Returns:

Sequential execution duration in minutes

Return type:

float

get_max_parallelism()ΒΆ

Get maximum number of tasks that can run simultaneously.

Returns:

Maximum parallelism level

Return type:

int

get_parallelization_benefit()ΒΆ

Calculate benefit from parallelization as a ratio.

Returns:

Parallelization benefit (sequential_time / parallel_time)

Return type:

float

get_total_task_count()ΒΆ

Get total number of tasks across all parallel groups.

Returns:

Total task count

Return type:

int

model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.common.models.task_analysis.parallelization.ExecutionStrategyΒΆ

Bases: str, enum.Enum

Strategies for executing parallel tasks.

SEQUENTIALΒΆ

Execute all tasks one after another

MAX_PARALLELΒΆ

Execute as many tasks in parallel as possible

RESOURCE_CONSTRAINEDΒΆ

Parallel execution limited by resource availability

PRIORITY_BASEDΒΆ

Execute high-priority tasks first, parallelize when possible

BALANCEDΒΆ

Balance between parallelization and resource usage

Initialize self. See help(type(self)) for accurate signature.

class agents.common.models.task_analysis.parallelization.JoinPoint(/, **data)ΒΆ

Bases: pydantic.BaseModel

Represents a point where multiple parallel tasks must synchronize.

Join points are critical for understanding where parallel execution must wait for all dependencies to complete before proceeding.

Parameters:

data (Any)

idΒΆ

Unique identifier for this join point

nameΒΆ

Descriptive name for the join point

input_task_idsΒΆ

IDs of tasks that must complete before this join

output_task_idsΒΆ

IDs of tasks that can start after this join

join_typeΒΆ

Type of join operation

estimated_wait_timeΒΆ

Expected time to wait for all inputs

is_critical_pathΒΆ

Whether this join point is on the critical path

Example

join_point = JoinPoint(
id="analysis_join",
name="Combine Analysis Results",
input_task_ids=["data_collection", "background_research"],
output_task_ids=["final_report"],
join_type="synchronous"
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

get_input_count()ΒΆ

Get the number of input tasks for this join point.

Returns:

Number of input tasks

Return type:

int

get_output_count()ΒΆ

Get the number of output tasks from this join point.

Returns:

Number of output tasks

Return type:

int

is_merge_point()ΒΆ

Check if this is a merge point (multiple inputs, single output).

Returns:

True if multiple inputs merge to single output

Return type:

bool

is_split_point()ΒΆ

Check if this is a split point (single input, multiple outputs).

Returns:

True if single input splits to multiple outputs

Return type:

bool

model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.common.models.task_analysis.parallelization.ParallelGroup(/, **data)ΒΆ

Bases: pydantic.BaseModel

Represents a group of tasks that can execute in parallel.

Parallel groups identify sets of tasks that have no blocking dependencies between them and can therefore run simultaneously.

Parameters:

data (Any)

group_idΒΆ

Unique identifier for this parallel group

task_idsΒΆ

IDs of tasks in this parallel group

estimated_duration_minutesΒΆ

Time for the longest task in the group

resource_requirementsΒΆ

Combined resource requirements

can_be_interleavedΒΆ

Whether tasks can be interleaved or must run fully parallel

priorityΒΆ

Priority level for this group

phaseΒΆ

Execution phase this group belongs to

Example

parallel_group = ParallelGroup(
group_id="research_phase",
task_ids=["web_research", "library_research", "expert_interviews"],
estimated_duration_minutes=120,
resource_requirements={"researchers": 3, "internet": True}
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

calculate_actual_duration(sequential_duration)ΒΆ

Calculate actual duration considering parallelization.

Parameters:

sequential_duration (float) – Duration if tasks ran sequentially

Returns:

Actual duration with parallelization

Return type:

float

get_task_count()ΒΆ

Get the number of tasks in this parallel group.

Returns:

Number of tasks in the group

Return type:

int

get_theoretical_speedup()ΒΆ

Calculate theoretical speedup from parallelization.

Returns:

Theoretical speedup factor

Return type:

float

model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.common.models.task_analysis.parallelization.ParallelizationAnalysis(/, **data)ΒΆ

Bases: pydantic.BaseModel

Complete analysis of parallelization opportunities for a task.

This is the main result of parallelization analysis, containing all the information needed to optimize task execution.

Parameters:

data (Any)

execution_phasesΒΆ

Sequential phases of execution

parallel_groupsΒΆ

All identified parallel groups

join_pointsΒΆ

Critical synchronization points

critical_pathΒΆ

Tasks on the critical path

execution_strategyΒΆ

Recommended execution strategy

estimated_speedupΒΆ

Expected speedup from parallelization

resource_requirementsΒΆ

Peak resource requirements

bottlenecksΒΆ

Identified bottlenecks and constraints

Example

analysis = ParallelizationAnalysis(
execution_phases=[phase1, phase2, phase3],
parallel_groups=[group1, group2],
join_points=[join1, join2],
critical_path=["task_1", "task_3", "task_5"],
estimated_speedup=2.5
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

calculate_time_savings()ΒΆ

Calculate time savings from parallelization.

Returns:

Time savings in minutes

Return type:

float

get_critical_path_duration()ΒΆ

Get duration of the critical path.

Returns:

Critical path duration in minutes

Return type:

float

get_efficiency_percentage()ΒΆ

Get parallelization efficiency as a percentage.

Returns:

Efficiency percentage (0-100)

Return type:

float

get_max_parallelism()ΒΆ

Get maximum parallelism across all phases.

Returns:

Maximum number of tasks that can run simultaneously

Return type:

int

get_total_phases()ΒΆ

Get total number of execution phases.

Returns:

Number of phases

Return type:

int

is_worth_parallelizing(min_speedup=1.2)ΒΆ

Determine if parallelization is worthwhile.

Parameters:

min_speedup (float) – Minimum speedup required to justify parallelization

Returns:

True if parallelization provides sufficient benefit

Return type:

bool

model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.common.models.task_analysis.parallelization.ParallelizationAnalyzer(/, **data)ΒΆ

Bases: pydantic.BaseModel

Analyzer for identifying parallelization opportunities in tasks.

This class performs sophisticated analysis of task dependencies to identify optimal parallelization strategies, execution phases, and resource requirements.

Parameters:

data (Any)

max_parallel_tasksΒΆ

Maximum number of tasks to run in parallel

resource_constraintsΒΆ

Resource limitations that affect parallelization

prefer_balanced_groupsΒΆ

Whether to prefer balanced parallel groups

include_coordination_overheadΒΆ

Whether to include coordination overhead

Example

analyzer = ParallelizationAnalyzer(
max_parallel_tasks=8,
resource_constraints={"cpu_cores": 4, "memory_gb": 16}
)

analysis = analyzer.analyze_task(complex_task)
print(f"Recommended speedup: {analysis.estimated_speedup:.1f}x")

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

analyze_task(task)ΒΆ

Analyze a task for parallelization opportunities.

Parameters:

task (haive.agents.common.models.task_analysis.base.Task) – Task to analyze

Returns:

Complete parallelization analysis

Return type:

ParallelizationAnalysis

model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].