8000 Migrated fix pipeline to a quick action. by srtab · Pull Request #450 · srtab/daiv · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Migrated fix pipeline to a quick action. #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"source.fixAll.ruff": "explicit"
}
},
"jupyter.notebookFileRoot": "${workspaceFolder}"
"jupyter.notebookFileRoot": "${workspaceFolder}",
"cursorpyright.analysis.extraPaths": [
"./daiv"
],
"cursorpyright.analysis.typeCheckingMode": "off"
}
8 changes: 3 additions & 5 deletions daiv/automation/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def get_model_kwargs(

if thinking_level and _kwargs["model"].startswith(CLAUDE_THINKING_MODELS):
max_tokens, thinking_tokens = self._get_anthropic_thinking_tokens(
thinking_level=thinking_level, max_tokens=kwargs.get("max_tokens")
thinking_level=thinking_level, max_tokens=kwargs.get("max_tokens", CLAUDE_MAX_TOKENS)
)
# When using thinking the temperature need to be set to 1
_kwargs["temperature"] = 1
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_model_kwargs(

if _kwargs["model"].startswith(CLAUDE_THINKING_MODELS):
max_tokens, thinking_tokens = self._get_anthropic_thinking_tokens(
thinking_level=thinking_level, max_tokens=_kwargs.get("max_tokens")
thinking_level=thinking_level, max_tokens=_kwargs.get("max_tokens", CLAUDE_MAX_TOKENS)
)
_kwargs["max_tokens"] = max_tokens
_kwargs["extra_body"] = {"reasoning": {"max_tokens": thinking_tokens}}
Expand All @@ -165,9 +165,7 @@ def get_model_kwargs(

return _kwargs

def _get_anthropic_thinking_tokens(
self, *, thinking_level: ThinkingLevel, max_tokens: int = CLAUDE_MAX_TOKENS
) -> tuple[int, int]:
def _get_anthropic_thinking_tokens(self, *, thinking_level: ThinkingLevel, max_tokens: int) -> tuple[int, int]:
"""
Get the thinking tokens and max tokens for the model.
"""
Expand Down
2 changes: 1 addition & 1 deletion daiv/automation/agents/issue_addressor/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

💡 **Next Steps:**

- ✅ If the plan is good, leave a comment with `@{{ bot_username }} plan execute` to execute the plan.
- ✅ If the plan is good, leave a comment with `{{ approve_plan_command }}` to execute the plan.
- ❌ If the plan doesn't meet your expectations, please **refine the issue description/title** and add more details or examples to help me understand the problem better. I will then refine the plan.
""" # noqa: E501

Expand Down
199 changes: 51 additions & 148 deletions daiv/automation/agents/pipeline_fixer/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import logging
import uuid
from typing import Literal, cast
from typing import Literal

from django.utils import timezone

from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import (
Runnable,
Expand All @@ -19,44 +18,21 @@

from automation.agents import BaseAgent
from automation.agents.plan_and_execute import PlanAndExecuteAgent
from automation.agents.plan_and_execute.schemas import ChangeInstructions, Plan
from automation.tools import think
from automation.tools.sandbox import RunSandboxCommandsTool
from automation.tools.toolkits import ReadRepositoryToolkit
from core.config import RepositoryConfig

from .conf import settings
from .prompts import (
command_output_evaluator_human,
same_error_evaluator_human,
same_error_evaluator_system,
troubleshoot_human,
troubleshoot_system,
)
from .schemas import (
CommandOuputEvaluation,
CommandOuputInput,
SameErrorEvaluation,
SameErrorInput,
TroubleshootingDetail,
)
from .prompts import command_output_evaluator_human, pipeline_fixer_human, troubleshoot_human, troubleshoot_system
from .schemas import CommandOuputEvaluation, CommandOuputInput, TroubleshootingDetail
from .state import OverallState, TroubleshootState
from .tools import complete_task

logger = logging.getLogger("daiv.agents")


class SameErrorEvaluator(BaseAgent[Runnable[SameErrorInput, SameErrorEvaluation]]):
"""
Chain for evaluating if two logs are the same error.
"""

async def compile(self) -> Runnable:
return (
ChatPromptTemplate.from_messages([same_error_evaluator_system, same_error_evaluator_human])
| self.get_model(model=settings.SAME_ERROR_MODEL_NAME).bind_tools([SameErrorEvaluation], tool_choice="auto")
| PydanticToolsParser(tools=[SameErrorEvaluation], first_tool_only=True)
).with_config({"run_name": "SameErrorEvaluator"})
MAX_FORMAT_ITERATIONS = 2


class CommandOutputEvaluator(BaseAgent[Runnable[CommandOuputInput, CommandOuputEvaluation]]):
Expand Down Expand Up @@ -88,96 +64,17 @@ async def compile(self) -> CompiledStateGraph:
"""
workflow = StateGraph(OverallState)

workflow.add_node("should_try_to_fix", self.should_try_to_fix)
workflow.add_node("troubleshoot", self.troubleshoot)
workflow.add_node("execute_remediation_steps", self.execute_remediation_steps)
workflow.add_node("plan_and_execute", self.plan_and_execute)
workflow.add_node("apply_format_code", self.apply_format_code)

workflow.set_entry_point("should_try_to_fix")
workflow.set_entry_point("troubleshoot")

return workflow.compile(checkpointer=self.checkpointer, store=self.store, name=settings.NAME)

async def should_try_to_fix(
self, state: OverallState, config: RunnableConfig
) -> Command[Literal["troubleshoot", "__end__"]]:
"""
Determine if the agent should try to fix the pipeline issue.

If the agent has reached the maximum number of retries, or if the previous job logs are not available,
the agent will try to fix the pipeline issue.

If the previous job logs are available, the agent will invoke the error log evaluator to determine if the
error is the same as the previous one. This is important to prevent the agent from applying the same fix
over and over.

Args:
state (OverallState): The state of the agent.
config (RunnableConfig): The config to use for the agent.

Returns:
Command[Literal["troubleshoot", "__end__"]]: The next step in the workflow.
"""
if state.get("iteration", 0) >= settings.MAX_ITERATIONS:
logger.warning(
"Max retry iterations reached for pipeline fix on %s[%s] for job %s",
config["configurable"]["source_repo_id"],
config["configurable"]["source_ref"],
config["configurable"]["job_name"],
)
return Command(
goto=END,
update={
"need_manual_fix": True,
"troubleshooting": [
TroubleshootingDetail(
title="Automatic fix failed",
details=(
"Maximum retry iterations reached for automatic pipeline fix. "
"Please review the logs and apply the necessary fixes manually."
),
)
],
},
)

if state.get("previous_job_logs") is None:
# This means that it's the first time the agent is running, so we need to troubleshoot the issue.
return Command(goto="troubleshoot", update={"iteration": state.get("iteration", 0) + 1})

same_error_evaluator = await SameErrorEvaluator().agent
result = await same_error_evaluator.ainvoke({
"log_trace_1": cast("str", state["previous_job_logs"]),
"log_trace_2": state["job_logs"],
})

if result.is_same_error:
logger.warning(
"Not applying pipeline fix on %s[%s] for job %s because it's the same error as the previous one",
config["configurable"]["source_repo_id"],
config["configurable"]["source_ref"],
config["configurable"]["job_name"],
)
return Command(
goto=END,
update={
"need_manual_fix": True,
"troubleshooting": [
TroubleshootingDetail(
title="Automatic fix skipped",
details=(
"Automatic fix skipped because the error is the same as the previous one. "
"Please review the logs and apply the necessary fixes manually."
),
)
],
},
)

return Command(goto="troubleshoot", update={"iteration": state.get("iteration", 0) + 1})

async def troubleshoot(
self, state: OverallState, store: BaseStore, config: RunnableConfig
) -> Command[Literal["execute_remediation_steps", "apply_format_code", "__end__"]]:
) -> Command[Literal["plan_and_execute", "__end__"]]:
"""
Troubleshoot the issue based on the logs from the failed CI/CD pipeline.

Expand All @@ -189,16 +86,15 @@ async def troubleshoot(
config (RunnableConfig): The config to use for the agent.

Returns:
Command[Literal["execute_remediation_steps", "apply_format_code", "__end__"]]: The next step in
the workflow.
Command[Literal["plan_and_execute", "__end__"]]: The next step in the workflow.
"""
tools = ReadRepositoryToolkit.create_instance().get_tools()
tools = ReadRepositoryToolkit.create_instance().get_tools() + [complete_task, think]

agent = create_react_agent(
model=self.get_model(
model=settings.TROUBLESHOOTING_MODEL_NAME, thinking_level=settings.TROUBLESHOOTING_THINKING_LEVEL
),
tools=tools + [complete_task, think],
tools=tools,
state_schema=TroubleshootState,
prompt=ChatPromptTemplate.from_messages([
troubleshoot_system,
Expand All @@ -223,51 +119,52 @@ async def troubleshoot(
goto=END,
update={
"need_manual_fix": True,
"troubleshooting": [
TroubleshootingDetail(
title="Pipeline fix failed",
details=(
"Couldn't fix the pipeline automatically due to an unexpected error. "
"Please review the logs and apply the necessary fixes manually."
),
)
]
+ state.get("troubleshooting", []),
"troubleshooting": state.get(
"troubleshooting",
[
TroubleshootingDetail(
category="other",
details=(
"Couldn't fix the pipeline automatically due to an unexpected error. "
"Please review the logs and apply the necessary fixes manually."
),
)
],
),
},
)

async def execute_remediation_steps(self, state: OverallState, store: BaseStore) -> Command[Literal["__end__"]]:
async def plan_and_execute(
self, state: OverallState, store: BaseStore, config: RunnableConfig
) -> Command[Literal["__end__"]]:
"""
Execute the remediation steps to fix the pipeline issue.
Plan and execute the remediation steps to fix the identified codebase issues.

Args:
state (OverallState): The state of the agent.
store (BaseStore): The store to use for caching.
config (RunnableConfig): The config for the agent.

Returns:
Command[Literal["__end__"]]: The next step in the workflow.
"""
plan = Plan(
changes=[
ChangeInstructions(
file_path=troubleshooting.file_path,
details="\n".join(troubleshooting.remediation_steps),
relevant_files=[troubleshooting.file_path],
)
for troubleshooting in state["troubleshooting"]
]
)

plan_and_execute = await PlanAndExecuteAgent(
store=store, skip_planning=True, skip_approval=True, checkpointer=False
).agent
await plan_and_execute.ainvoke({"plan_tasks": plan.changes})
plan_and_execute = await PlanAndExecuteAgent(store=store, checkpointer=False).agent

await plan_and_execute.ainvoke({
"messages": await pipeline_fixer_human.aformat_messages(
troubleshooting_details=[
troubleshooting_detail
for troubleshooting_detail in state["troubleshooting"]
if troubleshooting_detail.category == "codebase"
]
)
})

return Command(goto=END)

async def apply_format_code(
self, state: OverallState, store: BaseStore, config: RunnableConfig
) -> Command[Literal["execute_remediation_steps", "__end__"]]:
) -> Command[Literal["plan_and_execute", "__end__"]]:
"""
Apply format code to the repository to fix the linting issues in the pipeline.

Expand All @@ -277,16 +174,16 @@ async def apply_format_code(
config (RunnableConfig): The config for the agent.

Returns:
Command[Literal["execute_remediation_steps", "__end__"]]: The next step in the workflow.
Command[Literal["plan_and_execute", "__end__"]]: The next step in the workflow.
"""
repo_config = RepositoryConfig.get_config(config["configurable"]["source_repo_id"])

if not repo_config.commands.enabled():
logger.info("Format code is disabled for this repository, skipping.")
# If format code is disabled, we need to try to fix the linting issues by executing the remediation steps.
# If format code is disabled, we need to try to fix the linting issues by planning the remediation steps.
# This is less effective than actually formatting the code, but it's better than nothing. For instance,
# linting errors like whitespaces can be challenging to fix by an agent, or even impossible.
return Command(goto="execute_remediation_steps")
return Command(goto="plan_and_execute")

tool_message = await RunSandboxCommandsTool().ainvoke(
{
Expand All @@ -313,8 +210,14 @@ async def apply_format_code(
command_output_evaluator = await CommandOutputEvaluator().agent
result = await command_output_evaluator.ainvoke({"output": tool_message.artifact[-1].output})

if result.has_errors:
# If there are still errors, we need to try to fix them by executing the remediation steps.
return Command(goto="execute_remediation_steps")
if result.has_errors and state.get("format_iteration", 0) < MAX_FORMAT_ITERATIONS:
# If there are still errors, we need to try to fix them by executing troubleshooting again.
return Command(
goto="troubleshoot",
update={
"job_logs": tool_message.artifact[-1].output,
"format_iteration": state.get("format_iteration", 0) + 1,
},
)

return Command(goto=END)
8 changes: 2 additions & 6 deletions daiv/automation/agents/pipeline_fixer/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ class PipelineFixerSettings(BaseSettings):
model_config = SettingsConfigDict(secrets_dir="/run/secrets", env_prefix="PIPELINE_FIXER_")

NAME: str = Field(default="PipelineFixer", description="Name of the pipeline fixer agent.")
MAX_ITERATIONS: int = Field(default=20, description="Maximum number of retry iterations for pipeline fixer.")
SAME_ERROR_MODEL_NAME: ModelName | str = Field(
default=ModelName.GPT_4_1_MINI, description="Model name to be used for same error evaluator."
)
TROUBLESHOOTING_MODEL_NAME: ModelName | str = Field(
default=ModelName.O4_MINI, description="Model name to be used for pipeline fixer."
default=ModelName.CLAUDE_SONNET_4, description="Model name to be used for pipeline fixer."
)
TROUBLESHOOTING_THINKING_LEVEL: ThinkingLevel = Field(
default=ThinkingLevel.HIGH, description="Thinking level to be used for pipeline fixer."
default=ThinkingLevel.MEDIUM, description="Thinking level to be used for pipeline fixer."
)
COMMAND_OUTPUT_MODEL_NAME: ModelName | str = Field(
default=ModelName.GPT_4_1_MINI, description="Model name to be used for command output evaluator."
Expand Down
Loading
Loading
0