diff --git a/platoon/episode/loop.py b/platoon/episode/loop.py index 917b4a4..5b721fa 100644 --- a/platoon/episode/loop.py +++ b/platoon/episode/loop.py @@ -14,16 +14,53 @@ finish_message, ) +CLEANUP_TIMEOUT = 180 # seconds to allow each close() call before giving up +# from openhands.sdk.conversation import ConversationExecutionStatus +# from platoon.utils.openhands_utils import is_finished +# def agent_finished(obs): +# if obs.conversation_state.execution_status in [ +# ConversationExecutionStatus.FINISHED, +# ConversationExecutionStatus.STUCK, +# ConversationExecutionStatus.ERROR +# ]: +# return True +# return False + # NOTE: This function should be called using asyncio.create_task() to make sure edits to contextvars do not leak to parent context -async def run_episode(agent: Agent, env: Env, verbose: bool = False, timeout: int = 300) -> Trajectory: +async def run_episode(agent: Agent, env: Env, verbose: bool = True, timeout: int = 300) -> Trajectory: + curr = "pre reset" try: step_count = 0 set_context_vars(agent, env) + print("waiting for env.reset()", flush=True) obs = await env.reset() + curr = "reset done" + # while True: + # import time + # time.sleep(10000000) while not halt_episode(obs): + # if agent_finished(obs): + # print("OpenHands Finished -- waiting for agent.act() to complete", flush=True) + print("waiting for agent.act()", flush=True) + curr = "before act" action = await asyncio.wait_for(agent.act(obs), timeout=timeout) + # if agent_finished(obs): + # print("OpenHands Finished -- waiting for env.step() to complete", flush=True) + print("waiting for env.step()", flush=True) + curr = "before step" obs = await asyncio.wait_for(env.step(action), timeout=timeout) + print("completed env.step()", flush=True) + # if agent_finished(obs): + # print("OpenHands Finished -- env.step() completed", flush=True) + # if not is_finished(obs): + # print(f"WARNING: Conversation execution status is {obs.conversation_state.execution_status} but is_finished() returned False", flush=True) step_count += 1 + except asyncio.CancelledError: + # Task was cancelled by parent (e.g. rollout timeout via wait_for). + # Catch it so the finally block can run normally without re-cancellation. + error_message.set(f"Episode cancelled at step {step_count} (likely rollout timeout)") + if verbose: + print(f"Episode cancelled at step {step_count}", flush=True) except Exception as e: tb_summary = traceback.extract_tb(e.__traceback__) origin = "" @@ -40,13 +77,30 @@ async def run_episode(agent: Agent, env: Env, verbose: bool = False, timeout: in print(detailed_msg) error_message.set(detailed_msg) finally: - await agent.close() - await env.close() + # Cleanup with bounded timeouts so a blocking close() can't stall the process. + # Use asyncio.shield() so that CancelledError from a parent wait_for() + # doesn't prevent cleanup from running. + # for label, closeable in [("agent", agent), ("env", env)]: + # try: + # await asyncio.shield( + # asyncio.wait_for(closeable.close(), timeout=CLEANUP_TIMEOUT) + # ) + # except asyncio.CancelledError: + # print(f"Warning: {label}.close() was cancelled, cleanup may be incomplete", flush=True) + # except asyncio.TimeoutError: + # print(f"Warning: {label}.close() timed out after {CLEANUP_TIMEOUT}s, skipping", flush=True) + # except Exception as e: + # print(f"Warning: {label}.close() raised {e}, skipping", flush=True) # Finalize trajectory and emit a finish event to sinks traj_collection = current_trajectory_collection.get() traj = current_trajectory.get() traj.error_message = error_message.get() traj.finish_message = finish_message.get() + print(f"Current state: {curr}", flush=True) + if traj.finish_message is None: + traj.finish_message = f"Episode finished without a finish message: {curr}" + # if traj.error_message is None: + # traj.error_message = "Rollout finished without an error or finish message" # TODO: We could move out trajectory finish logic (adding up rewards, setting finish message, etc.) from env logic to here. traj_collection.finish_trajectory(traj.id) return traj diff --git a/platoon/train/areal/patches.py b/platoon/train/areal/patches.py index 56b66cf..a774bf7 100644 --- a/platoon/train/areal/patches.py +++ b/platoon/train/areal/patches.py @@ -208,6 +208,14 @@ async def patched_create( # Convert messages to prompt format tools_val = tools if not is_omitted(tools) else None if self.chat_template_type == "hf": + for message in messages_list: + if isinstance(message["content"], list): + new_content = "".join( + item.get("text", "") + for item in message["content"] + if isinstance(item, dict) and item.get("type") == "text" + ) + message["content"] = new_content prompt_token_ids = self.tokenizer.apply_chat_template( messages_list, tools=tools_val, diff --git a/platoon/train/areal/rl.py b/platoon/train/areal/rl.py index 8789c69..56ce665 100644 --- a/platoon/train/areal/rl.py +++ b/platoon/train/areal/rl.py @@ -128,7 +128,7 @@ def __init__( self.ref.initialize(None, self.ft_spec) # Setup proxy server - self.llm_client = ArealOpenAI(engine=self.rollout, tokenizer=self.tokenizer) + self.llm_client = ArealOpenAI(engine=self.rollout, tokenizer=self.tokenizer, tool_call_parser="qwen25") free_port = find_free_ports(1)[0] self.proxy_server = ProxyServer(free_port, client=self.llm_client) self.proxy_server.start(wait_until_ready=True) diff --git a/platoon/utils/openhands_utils.py b/platoon/utils/openhands_utils.py index d39f3ba..ee086f8 100644 --- a/platoon/utils/openhands_utils.py +++ b/platoon/utils/openhands_utils.py @@ -1,60 +1,170 @@ from typing import Sequence from openhands.sdk.event import ActionEvent, AgentErrorEvent, Event, EventID, MessageEvent -from openhands.sdk.conversation.base import ConversationStateProtocol -from openhands.sdk.conversation.state import ConversationExecutionStatus +from openhands.sdk.tool.builtins.finish import FinishAction +from openhands.sdk.event.conversation_error import ConversationErrorEvent +from openhands.sdk.conversation import ConversationExecutionStatus from platoon.openhands.types import OpenHandsObservation +from collections import defaultdict +def _is_terminal_status(conversation_state) -> bool: + """Return True if the conversation execution status is a terminal state.""" + return conversation_state.execution_status in ( + ConversationExecutionStatus.FINISHED, + ConversationExecutionStatus.STUCK, + ConversationExecutionStatus.ERROR, + ) + def is_action(event: Event) -> bool: return isinstance(event, ActionEvent) \ or (isinstance(event, MessageEvent) and event.source == "agent") -# TODO: Logic can probably be simplified now, by looking at changes in llm_response_id. Anytime llm_response_id changes, we can consider it as a new action. -def get_actions_for_last_obs(observation: OpenHandsObservation, require_same_llm_call_id: bool = False) -> list[Event]: - """Collect Event(s) we consider as actions that immediately follow a past ObservationEvent and are - fully observed by a subsequent ObservationBaseEvent referencing them. - """ + +def group_actions(events: Sequence[Event]): + """Build a map of llm_response_id -> list of ActionEvent IDs.""" + batches: dict[EventID, list[EventID]] = defaultdict(list) + action_id_to_response_id: dict[EventID, EventID] = {} + tool_call_id_to_action_id = {} + action_id_to_tool_call_id = {} + + for event in events: + if isinstance(event, ActionEvent) or (isinstance(event, MessageEvent) and event.source == "agent"): + llm_response_id = event.llm_response_id + batches[llm_response_id].append(event.id) + action_id_to_response_id[event.id] = llm_response_id + if isinstance(event, ActionEvent) and event.tool_call_id is not None: + tool_call_id_to_action_id[event.tool_call_id] = event.id + action_id_to_tool_call_id[event.id] = event.tool_call_id + + return batches, action_id_to_response_id, tool_call_id_to_action_id, action_id_to_tool_call_id + +def get_actions_for_last_obs(observation: OpenHandsObservation, require_same_llm_call_id: bool = True) -> list[Event]: + """Collect all Actions between the last observation.last_step_observation_id and the most recent observation, ensuring that all these Actions have a corresponding observation except for messages and finish actions from agent.""" + # from openhands.sdk.context.view import ActionBatch events = observation.conversation_state.events new_actions: list[Event] = list() - new_actions_candidates: list[Event] = list() seen_action_ids: set[EventID] = set() at_least_one_future_obs_seen = False at_least_one_future_error_event_seen = False + # action_batch = ActionBatch.from_events(events) + batches, action_id_to_response_id, tool_call_id_to_action_id, action_id_to_tool_call_id = group_actions(events) for event in reversed(events): + # Only consider events after the last observed event if event.id == observation.last_step_observation_id: break + if not is_action(event): - new_actions.clear() + new_actions.clear() # clear all accumulated actions till now if a non-action event happened before them at_least_one_future_obs_seen = True if hasattr(event, "action_id"): - seen_action_ids.add(event.action_id) - if isinstance(event, AgentErrorEvent): + seen_action_ids.add(event.action_id) #always true for observation events + + if isinstance(event, AgentErrorEvent) and event.tool_call_id is not None and event.tool_call_id in tool_call_id_to_action_id: + # If we see an agent error event that references a tool call id, we should consider the corresponding action as having a future observation, since agent error events are a type of observation event that LLM would see and react to and AgentErrorEvents don't terminate agent loop. + seen_action_ids.add(tool_call_id_to_action_id[event.tool_call_id]) + + if isinstance(event, ConversationErrorEvent): at_least_one_future_error_event_seen = True continue else: new_actions.append(event) - new_actions_candidates.append(event) + if isinstance(event, MessageEvent) and event.source == "agent": + seen_action_ids.add(event.id) + at_least_one_future_obs_seen = True + elif isinstance(event, ActionEvent) and event.source == "agent" and ( + isinstance(event.action, FinishAction) + or _is_terminal_status(observation.conversation_state) + ): + # The agent submitted a terminal action (built-in FinishAction or + # a custom tool that set execution_status to FINISHED/STUCK/ERROR, + # e.g. LocalizationFinishAction). Treat this the same as a + # message: mark it as "seen" so the downstream validation logic + # doesn't clear it for lacking a corresponding observation. + seen_action_ids.add(event.id) + at_least_one_future_obs_seen = True + + if len(new_actions) == 0: + return new_actions + last_event_seen = new_actions[0].id if new_actions else None - last_event_seen = new_actions[-1].id if new_actions else None if not is_finished(observation, last_event_seen=last_event_seen) and not at_least_one_future_error_event_seen: for action in new_actions: - if isinstance(action, ActionEvent) and action.id not in seen_action_ids: + if action.id not in seen_action_ids: + print(f"Clearing new_actions due to action event that has not been observed in a future observation: {action.id} {action.kind}", flush=True) new_actions.clear() break if not at_least_one_future_obs_seen: new_actions.clear() - + if require_same_llm_call_id and new_actions: llm_call_id = new_actions[0].llm_response_id if any(action.llm_response_id != llm_call_id for action in new_actions): raise ValueError("Detected at least two actions in a step with differing llm_response_id. " "This is unexpected and can lead to undefined behavior.") + if len(new_actions) != len(batches[llm_call_id]): + print("Warning: The number of new actions detected does not match the number of actions in the batch for the corresponding llm_response_id. This could indicate that some actions are not being properly observed or that there are unexpected events in the conversation history.", flush=True) return list(reversed(new_actions)) + +# # TODO: Logic can probably be simplified now, by looking at changes in llm_response_id. Anytime llm_response_id changes, we can consider it as a new action. +# def get_actions_for_last_obs(observation: OpenHandsObservation, require_same_llm_call_id: bool = False) -> list[Event]: +# """Collect Event(s) we consider as actions that immediately follow a past ObservationEvent and are +# fully observed by a subsequent ObservationBaseEvent referencing them. +# """ +# events = observation.conversation_state.events +# new_actions: list[Event] = list() +# seen_action_ids: set[EventID] = set() +# at_least_one_future_obs_seen = False +# at_least_one_future_error_event_seen = False +# for event in reversed(events): +# if event.id == observation.last_step_observation_id: +# break +# if not is_action(event): +# new_actions.clear() +# at_least_one_future_obs_seen = True +# if hasattr(event, "action_id"): +# seen_action_ids.add(event.action_id) +# # NOTE: Do we need to handle ConversationErrorEvent here? +# if isinstance(event, AgentErrorEvent) or isinstance(event, ConversationErrorEvent): +# at_least_one_future_error_event_seen = True +# continue +# else: +# new_actions.append(event) +# if isinstance(event, MessageEvent) and event.source == "agent": +# # print("IMPORTANT: agent has finished rollout") +# seen_action_ids.add(event.id) +# at_least_one_future_obs_seen = True +# elif isinstance(event, ActionEvent) and event.source == "agent" and isinstance(event.action, FinishAction): +# # print("IMPORTANT: agent has finished rollout") +# seen_action_ids.add(event.id) + + +# if len(new_actions) == 0: +# return new_actions +# last_event_seen = new_actions[-1].id if new_actions else None +# if not is_finished(observation, last_event_seen=last_event_seen) and not at_least_one_future_error_event_seen: +# for action in new_actions: +# if isinstance(action, ActionEvent) and action.id not in seen_action_ids: +# print(f"Clearing new_actions due to action event that has not been observed in a future observation: {action.id} {action.kind}", flush=True) +# new_actions.clear() +# break + +# if not at_least_one_future_obs_seen: +# new_actions.clear() + +# if require_same_llm_call_id and new_actions: +# llm_call_id = new_actions[0].llm_response_id +# if any(action.llm_response_id != llm_call_id for action in new_actions): +# raise ValueError("Detected at least two actions in a step with differing llm_response_id. " +# "This is unexpected and can lead to undefined behavior.") + +# return list(reversed(new_actions)) + + def get_obs_for_last_action(observation: OpenHandsObservation) -> list[Event]: """Collect event(s) that immediately follow a past ActionEvent and are fully observed by a subsequent ObservationBaseEvent referencing them. @@ -62,30 +172,65 @@ def get_obs_for_last_action(observation: OpenHandsObservation) -> list[Event]: events = observation.conversation_state.events new_obs: list[Event] = list() at_least_one_future_action_seen = False + future_action_seen = None for event in reversed(events): if event.id == observation.last_step_action_id: + # print(f"Found matching last_step_action_id: {event.id} {observation.last_step_action_id}") break if is_action(event): at_least_one_future_action_seen = True + future_action_seen = event + # print(f"Clearing new_obs due to action event: {event.kind} {[ev.kind for ev in new_obs]}") new_obs.clear() continue else: new_obs.append(event) # If not at least one future action seen and if this obs is not the final one, empty the list. - last_event_seen = new_obs[-1].id if new_obs else None - if not at_least_one_future_action_seen and not is_finished(observation, last_event_seen=last_event_seen): + oh_conversation_finished = _is_terminal_status(observation.conversation_state) + if oh_conversation_finished and len(new_obs) == 0: + print(f"Conversation is finished and no new obs seen, returning empty obs list.") + return [] + if len(new_obs) == 0: + return new_obs + # last_event_seen = new_obs[0].id + + # Check if conversation has finished WITHOUT calling is_finished (to avoid circular dependency) + conversation_state = observation.conversation_state + oh_conversation_finished = _is_terminal_status(conversation_state) + + if not at_least_one_future_action_seen and not oh_conversation_finished: new_obs.clear() + # state_info = {'at_least_one_future_action_seen': at_least_one_future_action_seen, + # 'oh_conversation_finished': oh_conversation_finished, + # 'last_step_action_id': observation.last_step_action_id, + # 'last_event_seen': last_event_seen, + # 'future_action_seen': future_action_seen + # } + + # if len(new_obs) > 0: + # print(f"get_obs_for_last_action returning obs: {[ev.kind for ev in reversed(new_obs)]} oh_conversation_finished: {oh_conversation_finished}") + return list(reversed(new_obs)) def is_finished(observation: OpenHandsObservation, last_event_seen: EventID | None = None) -> bool: conversation_state = observation.conversation_state - oh_conversation_finished = conversation_state.agent_status == ConversationExecutionStatus.FINISHED \ - or conversation_state.agent_status == ConversationExecutionStatus.STUCK \ - or conversation_state.agent_status == ConversationExecutionStatus.ERROR + oh_conversation_finished = _is_terminal_status(conversation_state) last_event_id = conversation_state.events[-1].id - platoon_episode_caught_up = last_event_id in (observation.last_step_action_id, observation.last_step_observation_id, last_event_seen) + assert last_event_id is not None, "Last event in conversation must have a non-None ID" + valid_ids = [event_id for event_id in [observation.last_step_action_id, observation.last_step_observation_id, last_event_seen] if event_id is not None] + platoon_episode_caught_up = last_event_id in valid_ids + if oh_conversation_finished and platoon_episode_caught_up: + try: + print(f"is_finished: conversation finished with status {conversation_state.execution_status}, last_event_id: {last_event_id}, valid_ids: {valid_ids}, last_event_seen: {conversation_state.events[-1].kind}") + except Exception as e: + print(f"is_finished: conversation finished with status {conversation_state.execution_status}, last_event_id: {last_event_id}, valid_ids: {valid_ids}, unable to print last event kind due to error: {e}") + # if oh_conversation_finished and not platoon_episode_caught_up: + # try: + # print(f"is_finished: conversation finished with status {conversation_state.execution_status}, but platoon episode not caught up, last_event_id: {last_event_id}, valid_ids: {valid_ids}, last_event_seen: {conversation_state.events[-1].kind}", flush=True) + # except Exception as e: + # print(f"is_finished: conversation finished with status {conversation_state.execution_status}, but platoon episode not caught up, last_event_id: {last_event_id}, valid_ids: {valid_ids}, unable to print last event kind due to error: {e}", flush=True) return oh_conversation_finished and platoon_episode_caught_up \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/__init__.py b/plugins/codescout/platoon/codescout/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/codescout/platoon/codescout/custom_agent.py b/plugins/codescout/platoon/codescout/custom_agent.py new file mode 100644 index 0000000..3772015 --- /dev/null +++ b/plugins/codescout/platoon/codescout/custom_agent.py @@ -0,0 +1,121 @@ +import json +import re +from concurrent.futures import ThreadPoolExecutor +from openhands.sdk.agent.utils import ( + make_llm_completion, + prepare_llm_messages, +) +from openhands.sdk.conversation import ( + ConversationCallbackType, + ConversationState, + ConversationTokenCallbackType, + LocalConversation, +) +from openhands.sdk.conversation.state import ConversationExecutionStatus +from openhands.sdk.event import ( + ActionEvent, + MessageEvent, + UserRejectObservation, +) +from openhands.sdk.event.condenser import Condensation, CondensationRequest +from openhands.sdk.llm import ( + Message, + TextContent, +) +from openhands.sdk.llm.exceptions import ( + FunctionCallValidationError, + LLMContextWindowExceedError, +) +from openhands.sdk.logger import get_logger +from openhands.sdk.observability.laminar import ( + maybe_init_laminar, + observe, + should_enable_observability +) + +from openhands.sdk import Agent +from openhands.sdk.event import ( + ActionEvent, + AgentErrorEvent, + MessageEvent, + ObservationEvent, +) +from openhands.sdk.mcp import create_mcp_tools +from openhands.sdk.observability.utils import extract_action_name +from openhands.sdk.tool import Observation +from openhands.sdk.tool.builtins import FinishTool +from platoon.codescout.localization_finish import LocalizationFinishTool +from openhands.sdk.tool import BUILT_IN_TOOLS, Tool, ToolDefinition, resolve_tool +from typing import TYPE_CHECKING, Any +if TYPE_CHECKING: + from openhands.sdk.conversation import ConversationState, LocalConversation + from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, + ) +logger = get_logger(__name__) +maybe_init_laminar() + + +class CustomAgent(Agent): + + def _initialize(self, state: "ConversationState"): + """Create an AgentBase instance from an AgentSpec.""" + + if self._tools: + logger.warning("Agent already initialized; skipping re-initialization.") + return + + tools: list[ToolDefinition] = [] + + # Use ThreadPoolExecutor to parallelize tool resolution + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [] + + # Submit tool resolution tasks + for tool_spec in self.tools: + future = executor.submit(resolve_tool, tool_spec, state) + futures.append(future) + + # Submit MCP tools creation if configured + if self.mcp_config: + future = executor.submit(create_mcp_tools, self.mcp_config, 30) + futures.append(future) + + # Collect results as they complete + for future in futures: + result = future.result() + tools.extend(result) + + logger.info( + f"Loaded {len(tools)} tools from spec: {[tool.name for tool in tools]}" + ) + if self.filter_tools_regex: + pattern = re.compile(self.filter_tools_regex) + tools = [tool for tool in tools if pattern.match(tool.name)] + logger.info( + f"Filtered to {len(tools)} tools after applying regex filter: " + f"{[tool.name for tool in tools]}", + ) + + # Do not include built-in tools; not subject to filtering + # Instantiate built-in tools using their .create() method + # for tool_class in BUILT_IN_TOOLS: + # tools.extend(tool_class.create(state)) + + # Check tool types + for tool in tools: + if not isinstance(tool, ToolDefinition): + raise ValueError( + f"Tool {tool} is not an instance of 'ToolDefinition'. " + f"Got type: {type(tool)}" + ) + + # Check name duplicates + tool_names = [tool.name for tool in tools] + if len(tool_names) != len(set(tool_names)): + duplicates = set(name for name in tool_names if tool_names.count(name) > 1) + raise ValueError(f"Duplicate tool names found: {duplicates}") + + # Store tools in a dict for easy access + self._tools = {tool.name: tool for tool in tools} \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/custom_tools/__init__.py b/plugins/codescout/platoon/codescout/custom_tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/codescout/platoon/codescout/custom_tools/localization_finish.py b/plugins/codescout/platoon/codescout/custom_tools/localization_finish.py new file mode 100644 index 0000000..93373f2 --- /dev/null +++ b/plugins/codescout/platoon/codescout/custom_tools/localization_finish.py @@ -0,0 +1,211 @@ +"""Custom finish tool for code localization tasks. + +This tool allows the agent to submit localization results in a structured format where: +- File path is required +- Class name is optional +- Function name is optional +""" + +import sys + +# This module lives at two importable paths simultaneously: +# - "platoon.codescout.custom_tools.localization_finish" (via the platoon package) +# - "custom_tools.localization_finish" (via the .pth entry for plugins/codescout) +# +# Python treats these as two separate modules and will load the file twice, +# producing two distinct class objects that both claim __module__ = +# "custom_tools.localization_finish" after our __module__ override. +# Pydantic then sees two Action/Observation subclasses with the same name +# and raises "Duplicate class definition". +# +# Fix: whichever path loads us first, immediately register both keys in +# sys.modules pointing to the SAME module object, so the second import +# finds the cached version instead of re-executing the file. +_SHORT = "custom_tools.localization_finish" +_LONG = "platoon.codescout.custom_tools.localization_finish" +_PLUGINS = "plugins.codescout.platoon.codescout.custom_tools.localization_finish" +_SHORT_PKG = "custom_tools" +_LONG_PKG = "platoon.codescout.custom_tools" + +_this_module = sys.modules[__name__] +if _SHORT not in sys.modules: + sys.modules[_SHORT] = _this_module +if _LONG not in sys.modules: + sys.modules[_LONG] = _this_module +if _PLUGINS not in sys.modules: + sys.modules[_PLUGINS] = _this_module +if _SHORT_PKG not in sys.modules and _LONG_PKG in sys.modules: + sys.modules[_SHORT_PKG] = sys.modules[_LONG_PKG] + +import json +from typing import TYPE_CHECKING +from collections.abc import Sequence + +from pydantic import BaseModel, Field, computed_field +from rich.text import Text + +from openhands.sdk import ( + Action, + Observation, + ToolDefinition +) +from openhands.sdk.tool import ToolExecutor, ToolAnnotations, register_tool + +from openhands.sdk.conversation.state import ConversationExecutionStatus + +if TYPE_CHECKING: + from openhands.sdk.conversation.base import BaseConversation + +class CodeLocation(BaseModel): + """A single code location with optional class and function.""" + + file: str = Field(description="Path to the file (required)") + class_name: str | None = Field(default=None, description="Class name (optional)") + function_name: str | None = Field(default=None, description="Function/method name (optional)") + +class LocalizationFinishAction(Action): + """Action for submitting final localization results.""" + + locations: list[CodeLocation] = Field( + description="""List of code locations to modify. Each location in this list must have: +- file: Path to the file relative to the repository root (required) +- class_name: Class name (optional, omit for changes to imports, global variables, and global functions) +- function_name: Function/method name (optional, omit for changes that edit parts of a file outside of any particular function) +""" + ) + + @property + def visualize(self) -> Text: + """Return Rich Text representation of this action.""" + content = Text() + content.append("Submitting localization results:\n", style="bold blue") + content.append(f"Found {len(self.locations)} location(s):\n", style="green") + for i, loc in enumerate(self.locations, 1): + content.append(f" {i}. {loc.file}", style="cyan") + if loc.class_name: + content.append(f" → {loc.class_name}", style="yellow") + if loc.function_name: + content.append(f".{loc.function_name}", style="magenta") + content.append("\n") + return content + +# Override __module__ so the client-side pydantic discriminator registry uses the +# same module path as the server container ("custom_tools.localization_finish"). +# This prevents a "Duplicate class definition" error when the server sends events +# back and the client tries to deserialize them — both sides see the same name. +LocalizationFinishAction.__module__ = "custom_tools.localization_finish" + + +class LocalizationFinishObservation(Observation): + """Observation returned after submitting localization results. No observation is needed since the agent will exit after this action.""" + + @property + def visualize(self) -> Text: + """Return an empty Text representation since the message is in the action.""" + return Text() + +LocalizationFinishObservation.__module__ = "custom_tools.localization_finish" + +def locations_to_dict_list(locations: list[CodeLocation]) -> list[dict]: + """Convert CodeLocation objects to dictionary format. + + Args: + locations: List of CodeLocation objects + + Returns: + List of dictionaries with 'file', 'class_name', 'function_name' keys + """ + return [ + { + "file": loc.file, + "class_name": loc.class_name, + "function_name": loc.function_name, + } + for loc in locations + ] + +class LocalizationFinishExecutor(ToolExecutor): + def __call__( + self, + action: LocalizationFinishAction, + conversation: "BaseConversation | None" = None, # noqa: ARG002 + ) -> LocalizationFinishObservation: + try: + loc_dict = locations_to_dict_list(action.locations) + text = json.dumps(loc_dict, indent=2) + conversation.state.execution_status = ConversationExecutionStatus.FINISHED + return LocalizationFinishObservation.from_text(text=text) + except Exception as _: + return LocalizationFinishObservation.from_text(text="") + +TOOL_DESCRIPTION = """Submit your final code localization results. + +Use this tool when you have identified all relevant files, classes, and functions that need to be modified to address the issue described in the problem statement. + +Provide a structured list of locations. Each location must have: +- file: Path to the file relative to the root of the repository (required) +- class_name: Class name (optional) +- function_name: Function/method name (optional) + +You must submit a list of locations that require modification and for each location you must follow the below rules in your output: +1. If the required modifications belong to a specific function that belongs to a class, provide the file path, class name, and function name. +2. If the required modification belongs to a function that is not part of any class, provide the file path and function name. +3. If the required modification does not belong to any specific class or a function (e.g. global variables, imports, new class, new global function etc.), it is sufficient to provide only the file path. +4. If the required modification belongs to a class (e.g. adding a new method to a class, changing the class inheritance), provide the file path and class name. If you are modifying the __init__ method of a class, you should provide the function name as well. + +IMPORTANT: +1. If multiple different edits need to be edited in the same file, you should create separate entries for each edit, specifying the same file path but different class/function names as applicable. Each entry should compulsorily include the file path. +2. Do NOT include duplicate entries in your output for which the file, class, and function names are all identical. +3. Ensure that the file paths are accurate and relative to the root of the repository without any leading "./" or "/". All locations must be valid and exist in the codebase and this applies to class and function names as well. +4. Aim for high precision (all returned locations are relevant) and high recall (no relevant locations missed). +5. The agent will terminate its execution after you call this tool. +""" + +class LocalizationFinishTool(ToolDefinition[LocalizationFinishAction, LocalizationFinishObservation]): + """Tool for submitting final localization results.""" + + """Tool for submitting final code localization results.""" + + @classmethod + def create( + cls, + conv_state, # noqa: ARG003 + **params + ) -> Sequence["LocalizationFinishTool"]: + """Create LocalizationFinishTool instance. + + Args: + conv_state: Conversation state (provides workspace info) + workspace_dir: Optional workspace directory override + **params: Additional parameters + + Returns: + A sequence containing a single LocalizationFinishTool instance. + """ + if params: + raise ValueError("LocalizationFinishTool doesn't accept parameters") + + return [ + cls( + # name="localization_finish", + action_type=LocalizationFinishAction, + observation_type=LocalizationFinishObservation, + description=TOOL_DESCRIPTION, + executor=LocalizationFinishExecutor(), + annotations=ToolAnnotations( + title="localization_finish", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), + ) + ] + +# Override __module__ on all classes so both client and server use the same +# module path ("custom_tools.localization_finish") in every registry: +# - the pydantic discriminator registry (prevents "Duplicate class definition") +# - the tool registry _MODULE_QUALNAMES (tells the server what to import) +LocalizationFinishTool.__module__ = "custom_tools.localization_finish" + +register_tool("LocalizationFinishTool", LocalizationFinishTool) \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/env.py b/plugins/codescout/platoon/codescout/env.py new file mode 100644 index 0000000..b73aa6c --- /dev/null +++ b/plugins/codescout/platoon/codescout/env.py @@ -0,0 +1,196 @@ +from pathlib import Path +from typing import List, Tuple +from platoon.utils.openhands_utils import is_finished +from platoon.episode.context import current_trajectory_collection, current_trajectory, finish_message, error_message + +from openhands.sdk import get_logger +from platoon.envs.base import Task +from openhands.sdk.agent import AgentBase +from openhands.sdk.workspace import BaseWorkspace +from openhands.sdk.conversation import Conversation, BaseConversation, get_agent_final_response +#TODO: check below imports +from platoon.openhands.env import OpenHandsEnv +from platoon.openhands.types import OpenHandsObservation, OpenHandsAction, OpenHandsTrajectoryStep +import threading +import asyncio +from platoon.utils.openhands_utils import is_finished +from openhands.sdk.event import ActionEvent +from platoon.codescout.custom_tools.localization_finish import LocalizationFinishAction + +def get_structured_locations(events): + """Extract structured locations from LocalizationFinishAction in events. + Args: + events: List of conversation events to search through. + Returns: + List of location dicts with 'file', 'class', 'function' keys, or None if not found. + """ + # Find the last LocalizationFinishAction + cnt = [1 for event in events if isinstance(event, ActionEvent) and event.source == "agent" and isinstance(event.action, LocalizationFinishAction)] + cnt = sum(cnt) + if cnt != 1: # the localization finish tool must be called exactly once. + return None + for event in reversed(events): + if ( + isinstance(event, ActionEvent) + and event.source == "agent" + and isinstance(event.action, LocalizationFinishAction) + ): + # Extract structured locations from the action + locations = [] + for loc in event.action.locations: + locations.append({ + "file": loc.file, + "class_name": loc.class_name, + "function_name": loc.function_name, + }) + return locations + return None + +def parse_structured_outputs(structured_locations: List[dict]) -> Tuple[List[str], List[str], List[str]]: + """ + Process structured location outputs and extract files, modules, and entities. + + Args: + structured_locations: List of dicts with 'file', 'class_name', 'function_name' keys + Returns: + Tuple of (all_found_files, all_found_modules, all_found_entities) where each is a list of strs + + Example structured input format: + [ + {'file': 'path/to/file1.py', 'class_name': 'MyClass', 'function_name': 'my_method'}, + {'file': 'path/to/file2.py', 'class_name': None, 'function_name': 'standalone_function'}, + {'file': 'path/to/file1.py', 'class_name': None, 'function_name': 'global_function'}, + {'file': 'path/to/file2.py', 'class_name': 'AnotherClass', 'function_name': None}, + {'file': 'path/to/file3.py', 'class_name': None, 'function_name': None} + ] + + Example output: + [['path/to/file1.py', 'path/to/file2.py', 'path/to/file3.py'], ['path/to/file1.py:MyClass', 'path/to/file2.py:AnotherClass', 'path/to/file1.py:global_function', 'path/to/file2.py:standalone_function'], ['path/to/file1.py:MyClass.my_method', 'path/to/file2.py:standalone_function', 'path/to/file1.py:global_function', 'path/to/file2.py:AnotherClass']] + """ + + # Strict sanity check: if there are duplicates in the output, return an empty output so that it is penalized with 0 reward? + all_found_files = [] + all_found_modules = [] + all_found_entities = [] + + found_empty_filename = False + # found_duplictes = False + + for location in structured_locations: + file_path = location.get("file", None) + class_name = location.get("class_name", None) + function_name = location.get("function_name", None) + + #NOTE: Ideally the case of file_path being None should raise an error from the agent-sdk but adding here for safety + if file_path is None or file_path.strip() == "": + found_empty_filename = True + break + + all_found_files.append(file_path) + + module = None + if class_name: + module = f"{file_path}:{class_name}" + elif function_name: + module = f"{file_path}:{function_name}" + + if module: + all_found_modules.append(module) + + entity = None + if class_name and function_name: + entity = f"{file_path}:{class_name}.{function_name}" + elif function_name: + entity = f"{file_path}:{function_name}" + + if entity: + all_found_entities.append(entity) + if found_empty_filename: + return [], [], [] + all_found_files = list(set(all_found_files)) + all_found_modules = list(set(all_found_modules)) + all_found_entities = list(set(all_found_entities)) + return all_found_files, all_found_modules, all_found_entities + +def compute_file_f1_score(predicted_files, true_files, beta=1.0): + pred, true = set(predicted_files), set(true_files) + if not true: + return 0.0 # return 0 reward if ground truth is empty + tp = len(pred & true) + precision = tp / len(pred) if pred else 0.0 + recall = tp / len(true) if true else 0.0 + return (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall) if (precision + recall) > 0 else 0.0 + +def multilevel_localization_f1_reward( + instance: dict, + structured_locations: list[dict] | None = None, + file_level_weight: float=1.0, + module_level_weight: float=1.0, + entity_level_weight: float=1.0, +): + + if structured_locations is None: + return 0, { + "multilevel_localization_f1_reward": 0, + "file_reward": 0, + "module_reward": 0, + "entity_reward": 0, + } + + gt_files = [] + gt_modules = [] + gt_entities = [] + reward = 0 + + for change in instance.get("file_changes", []): + if "file" in change: + gt_files.append(change["file"]) + if "changes" in change: + edited_modules = change["changes"].get("edited_modules", []) + edited_modules = [] if edited_modules is None else edited_modules + for module in edited_modules: + gt_modules.append(module) + + edited_entities = change["changes"].get("edited_entities", []) + edited_entities = [] if edited_entities is None else edited_entities + for entity in edited_entities: + gt_entities.append(entity) + gt_files = set(gt_files) + gt_modules = set(gt_modules) + gt_entities = set(gt_entities) + + if structured_locations is not None: + predicted_files, predicted_modules, predicted_entities = parse_structured_outputs(structured_locations) + else: + predicted_files, predicted_modules, predicted_entities = get_simple_results_from_raw_outputs(final_message) + + file_f1_score = compute_file_f1_score(predicted_files, gt_files) + module_f1_score = compute_file_f1_score(predicted_modules, gt_modules) + entity_f1_score = compute_file_f1_score(predicted_entities, gt_entities) + + reward = ( + file_f1_score * file_level_weight + + module_f1_score * module_level_weight + + entity_f1_score * entity_level_weight + ) + + return reward, { + "multilevel_localization_f1_reward": reward, + "file_reward": file_f1_score, + "module_reward": module_f1_score, + "entity_reward": entity_f1_score, + } + +class CodeScoutEnv(OpenHandsEnv): + async def evaluate(self) -> tuple[float, dict]: + if not is_finished(await self.observe()): + return 0, {} + + structured_locations = get_structured_locations(self._conversation.state.events) + + if structured_locations is None: + return 0, {} + + instance: dict = self.task.misc + reward, metadata = multilevel_localization_f1_reward(instance, structured_locations) + return reward, metadata \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/prompts/system_prompt.j2 b/plugins/codescout/platoon/codescout/prompts/system_prompt.j2 new file mode 100644 index 0000000..8beba27 --- /dev/null +++ b/plugins/codescout/platoon/codescout/prompts/system_prompt.j2 @@ -0,0 +1,82 @@ +You are a specialized code localization agent. Your sole objective is to identify and return the files in the codebase that are relevant to the user's query. +You are given access to the codebase in a linux file system. + +## PRIMARY DIRECTIVE +- Find relevant files, do NOT answer the user's query directly +- Prioritize precision: every file you return should be relevant +- You have up to 4 turns to explore and return your answer + +## TOOL USAGE REQUIREMENTS + +### bash tool (REQUIRED for search) +- You MUST use the bash tool to search and explore the codebase +- Execute bash commands like: rg, grep, find, ls, cat, head, tail, sed +- Use parallel tool calls: invoke bash tool up to 5 times concurrently in a single turn +- NEVER exceed 5 parallel tool calls per turn +- Common patterns: + * `rg "pattern" -t py` - search for code patterns + * `rg --files | grep "keyword"` - find files by name + * `cat path/to/file.py` - read file contents + * `find . -name "*.py" -type f` - locate files by extension + * `wc -l path/to/file.py` - count lines in a file + * `sed -n '1,100p' path/to/file.py` - read lines 1-100 of a file + * `head -n 100 path/to/file.py` - read first 100 lines + * `tail -n 100 path/to/file.py` - read last 100 lines + +### Reading Files (CRITICAL for context management) +- NEVER read entire large files with `cat` - this will blow up your context window +- ALWAYS check file size first: `wc -l path/to/file.py` +- For files > 100 lines, read in chunks: + * Use `sed -n '1,100p' file.py` to read lines 1-100 + * Use `sed -n '101,200p' file.py` to read lines 101-200 + * Continue with subsequent ranges as needed (201-300, 301-400, etc.) +- Strategic reading approach: + * Read the first 50-100 lines to see imports and initial structure + * Use `rg` to find specific patterns and their line numbers + * Read targeted line ranges around matches using `sed -n 'START,ENDp'` + * Only read additional chunks if the initial sections are relevant + +### Submitting Your Answer (REQUIRED) + +When you have identified all relevant locations, you MUST use the `localization_finish` tool to submit your results. + +**When to include what:** +1. If the required modifications belong to a specific function that belongs to a class, provide the file path, class name, and function name. +2. If the required modification belongs to a function that is not part of any class, provide the file path and function name. +3. If the required modification does not belong to any specific class or a function (e.g. global variables, imports, new class, new global function etc.), it is sufficient to provide only the file path. +4. If the required modification belongs to a class (e.g. adding a new method to a class), provide the file path and class name. + +## SEARCH STRATEGY + +1. **Initial Exploration**: Cast a wide net + - Search for keywords, function names, class names + - Check file names and directory structure + - Use up to 3 parallel bash calls to explore multiple angles + - Check file sizes with `wc -l` before reading + - Read promising files in chunks (lines 1-100) to verify relevance + +2. **Deep Dive**: Follow the most promising leads + - Use up to 3 parallel bash calls to investigate further + - Read files in chunks to confirm they address the query + - Use `rg` with line numbers to locate specific code, then read those ranges + - Start eliminating false positives + +3. **Final Verification**: Confirm your location list and terminate execution by calling the `localization_finish` tool + +## CRITICAL RULES +- NEVER exceed 5 parallel bash tool calls in a single turn +- ALWAYS use the `localization_finish` tool after identifying all relevant locations +- ALWAYS use bash tool to search (do not guess file locations) +- NEVER read entire large files - always read in chunks (100-line ranges) +- Check file size with `wc -l` before reading +- Read file contents in chunks to verify relevance before including them +- Return file paths as they appear in the repository. Do not begin the path with "./" +- Aim for high precision (all files relevant) and high recall (no relevant files missed) +- Class and function names are OPTIONAL - only include when changes are at that level + +## EXAMPLE OUTPUT BEHAVIOUR +Here are some examples of how to format your output when calling the `localization_finish` tool: +- src/parsers/parser.py requires changes to imports, a function parse_data which belongs to the class DataParser, and another function __str__ inside the same class. This should be represented as three separate entries: one with just the file path and one each for the two functions parse_data and __str__ with file path, class name, and function name. +- src/user.py requires changes to a global function get_user outside of any class. This should be represented as a single entry with file path and function name. +- utils/visualizer.py requires adding new function visualize inside the class Visualizer. This should be represented as a single entry with file path and class name. +- utils/configs/default_config.py requires adding a new global function and a new class. This should be represented as a single entry with just the file path. Do NOT include class or function names for this file since multiple implementations might be possible with different function names. \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/prompts/user_prompt.j2 b/plugins/codescout/platoon/codescout/prompts/user_prompt.j2 new file mode 100644 index 0000000..b898c29 --- /dev/null +++ b/plugins/codescout/platoon/codescout/prompts/user_prompt.j2 @@ -0,0 +1,14 @@ +I have access to a python code repository in the directory {{ working_dir }} . Consider the following issue description: + + +{{ instance.problem_statement }} + + +Act as a code search agent and localize the specific files, classes or functions of code that need modification to resolve the issue in . + +NOTE: You do not need to solve the issue, all you need to do is localize relevant code from the repository. Your output will be used to guide another agent to solve the issue. + +IMPORTANT: Your output MUST follow the below rules: +1. The final output must be a tool call to the "localization_finish" tool containing relevant code locations. +2. The locations of the file path must be RELATIVE to the {{ working_dir }} directory WITHOUT any leading "./" in the output. +3. Only include those locations in your output that need modification to resolve the issue in . Do NOT include any locations that do not need modification. \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/rollout.py b/plugins/codescout/platoon/codescout/rollout.py new file mode 100644 index 0000000..60ca00d --- /dev/null +++ b/plugins/codescout/platoon/codescout/rollout.py @@ -0,0 +1,471 @@ +import os +from jinja2 import Environment, FileSystemLoader +import asyncio +from platoon.envs.base import Task +from platoon.codescout.env import CodeScoutEnv +from platoon.utils.llm_client import LLMClient +import subprocess +from pathlib import Path +from openhands.sdk import LLM, get_logger, Agent, AgentBase, Tool +from openhands.workspace import DockerWorkspace, APIRemoteWorkspace, ApptainerWorkspace + + +from platoon.episode.trajectory import TrajectoryCollection +from platoon.config_defs import RolloutConfig +from openhands.sdk.workspace import BaseWorkspace +from openhands.tools.preset import get_default_agent +from platoon.episode.loop import run_episode +from platoon.episode.context import current_trajectory_collection +from pydantic import SecretStr +from platoon.visualization.event_sinks import JsonlFileSink +from platoon.codescout.tasks import EVAL_AGENT_SERVER_IMAGE, SDK_SHORT_SHA, ENV_SETUP_COMMANDS, SYSTEM_PROMPT_FILENAME, USER_PROMPT_FILENAME, APPTAINER_CACHE_DIR +from platoon.openhands.agent import OpenHandsAgent +import platform +import uuid +from openhands.sdk.tool import Tool, register_tool +from openhands.tools.terminal import TerminalTool +from platoon.codescout.custom_tools.localization_finish import LocalizationFinishTool # noqa: F401 - registers the tool with the correct module qualname + +logger = get_logger(__name__) + +import signal +import subprocess +import threading +from typing import Callable +def _patched_start_container(self: ApptainerWorkspace) -> None: + """Patched start: ensure the child runs in a new session/process group.""" + # Prepare environment variables + env_args: list[str] = [] + for key in self.forward_env: + if key in os.environ: + env_args += ["--env", f"{key}={os.environ[key]}"] + + # Prepare bind mounts + bind_args: list[str] = [] + if self.mount_dir: + mount_path = "/workspace" + bind_args += ["--bind", f"{self.mount_dir}:{mount_path}"] + + # Build container options + container_opts: list[str] = [] + if self.use_fakeroot: + container_opts.append("--fakeroot") + if self.enable_docker_compat: + container_opts.append("--compat") + if self.disable_mount_locations: + for loc in self.disable_mount_locations: + container_opts += ["--no-mount", loc] + + server_cmd = [ + "apptainer", + "run", + *container_opts, + *env_args, + *bind_args, + self._sif_path, + "--host", + "0.0.0.0", + "--port", + str(self.host_port), + ] + + self._process = subprocess.Popen( + server_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + start_new_session=True, + ) + + if self.detach_logs: + self._logs_thread = threading.Thread(target=self._stream_logs, daemon=True) + self._logs_thread.start() + + +def _patched_cleanup(self: ApptainerWorkspace) -> None: + """Patched cleanup: terminate the full process group to avoid orphans.""" + if getattr(self, "_instance_name", None): + self._stop_logs.set() + if self._logs_thread and self._logs_thread.is_alive(): + self._logs_thread.join(timeout=2) + + if self._process: + try: + pgid = os.getpgid(self._process.pid) + os.killpg(pgid, signal.SIGTERM) + self._process.wait(timeout=5) + except Exception: + try: + pgid = os.getpgid(self._process.pid) + os.killpg(pgid, signal.SIGKILL) + self._process.wait(timeout=2) + except Exception: + pass + + self._process = None + self._instance_name = None + + + +# ApptainerWorkspace._wait_for_health has a hard-coded default of 120s. +# That is too short when the SIF cache is cold or the agent server is slow +# to start. Patch it to default to 600s instead. +_orig_wait_for_health = ApptainerWorkspace._wait_for_health +def _patched_wait_for_health(self, timeout: float = 600.0) -> None: + return _orig_wait_for_health(self, timeout=timeout) +ApptainerWorkspace._wait_for_health = _patched_wait_for_health # type: ignore[method-assign] +ApptainerWorkspace._start_container = _patched_start_container # type: ignore[method-assign] +ApptainerWorkspace.cleanup = _patched_cleanup # type: ignore[method-assign] + + +def prepare_workspace(instance: dict): + uuid_str = str(uuid.uuid4())[:8] + workspace = Path(f"/tmp/testbed/{uuid_str}/") + instance_id: str = instance["instance_id"] + repo_name: str = instance["repo"] + patch: str = instance["patch"] + + instance_dir_name = f"{repo_name.replace('/', '_')}_{instance_id}" + instance_path = workspace / instance_dir_name + + if instance_path.exists(): + print(f" ✓ Instance {instance_id} already exists") + return True, instance_path + + try: + # Clone the repository + subprocess.run( + [ + "git", + "clone", + f"https://github.com/{repo_name}.git", + str(instance_path), + ], + check=True, + capture_output=True, + text=True, + ) + subprocess.run( + ["git", "-C", str(instance_path), "apply"], + input=patch, + check=True, + capture_output=True, + text=True, + ) + return True, instance_path + except subprocess.CalledProcessError as e: + print(f" ✗ Error cloning {instance_id}: {e.stderr}") + return False, None +def detect_platform(): + """Detects the correct platform string.""" + machine = platform.machine().lower() + if "arm" in machine or "aarch64" in machine: + return "linux/arm64" + return "linux/amd64" + +def prepare_workspace2(instance: dict): + uuid_str = str(uuid.uuid4())[:8] + workspace = Path(f"/tmp/testbed/{uuid_str}/") + instance_id: str = instance["instance_id"] + repo_name: str = instance["repo"] + patch: str = instance["patch"] + + instance_dir_name = f"{repo_name.replace('/', '_')}_{instance_id}" + instance_path = workspace / instance_dir_name + + # use the openhands agent server image and then setup env manually + workspace = ApptainerWorkspace( + server_image="docker.io/adityasoni8/eval-agent-server:de65ac5-custom-base-image_tag_latest-source", #TODO + working_dir=str(instance_path), + platform=detect_platform(), + cache_dir=os.environ.get("APPTAINER_CACHEDIR", APPTAINER_CACHE_DIR), + detach_logs=True + ) + + def _run(cmd: str, timeout: float = 120.0) -> None: + """Run a command inside the workspace, raising on failure or timeout. + + httpx.ReadTimeout bubbles out of execute_command with the unhelpful + message 'timed out'. This wrapper catches *any* exception and + re-raises with the command text so we can identify the culprit. + """ + try: + result = workspace.execute_command(cmd, timeout=timeout) + except Exception as exc: + raise RuntimeError(f"Command raised {type(exc).__name__}: {exc}\n cmd: {cmd}") from exc + if result.exit_code != 0: + raise RuntimeError( + f"Command failed (exit {result.exit_code}): {result.stderr}\n cmd: {cmd}" + ) + + # _run("curl -LO https://github.com/BurntSushi/ripgrep/releases/download/14.1.1/ripgrep_14.1.1-1_amd64.deb", timeout=120.0) + # try: + # _run("sudo dpkg -i ripgrep_14.1.1-1_amd64.deb", timeout=180.0) + # except Exception as e: + # pass + + # clone repository inside workspace + try: + _run(f"git clone https://@github.com/{repo_name}.git {str(instance_path)}", timeout=120.0) + _run(f"cd {str(instance_path)} && git apply <<'EOF'\n{patch}\nEOF", timeout=120.0) + except Exception as e: + raise RuntimeError(f"Error preparing workspace for instance {instance_id}: {e}") + return True, instance_path, workspace + +def get_instruction( + instance: dict, + prompt_path: str, + workspace_path: str, +) -> str: + """Generate instruction for the agent.""" + # Set up Jinja2 environment + prompts_dir = os.path.dirname(prompt_path) + template_name = os.path.basename(prompt_path) + env = Environment(loader=FileSystemLoader(prompts_dir)) + template = env.get_template(template_name) + + # Prepare context for rendering + context = { + "instance": instance, + "working_dir": workspace_path, + } + + # Render the instruction + instruction = template.render(context) + return instruction + + +def prepare_llm(config: RolloutConfig) -> LLM: + model_name = config.model_name + temperature = 1.0 + if not model_name.startswith("openai/") and not model_name.startswith("litellm_proxy/"): + model_name = "openai/" + model_name + + llm=LLM( + usage_id="agent", + model=model_name, + base_url=config.model_endpoint, + api_key="sk-xxx", + temperature=temperature, + litellm_extra_body={ + # "return_token_ids": True, + "include_stop_str_in_output": False, + "chat_template_kwargs": { + # "add_generation_prompt": True, + "enable_thinking": False + } + } + ) + return llm + +def prepare_agent(llm: LLM, system_prompt_path: str) -> AgentBase: + # TODO: make tools configurable via instance/env vars or config + # current behaviour: uses default tools without browser + + # register_tool(LocalizationFinishTool.name, LocalizationFinishTool) + tools = [ + Tool(name=TerminalTool.name), + # Tool(name="localization_finish"), + ] + tools.append(Tool(name="LocalizationFinishTool")) + from openhands.sdk.agent import Agent + import types + agent = Agent( + llm=llm, + tools=tools, + system_prompt_filename="/app/prompts_codescout/system_prompt.j2", + # system_prompt_filename=system_prompt_path, + include_default_tools=[] + ) + # def _initialize(self, state: "ConversationState"): + # """Create an AgentBase instance from an AgentSpec.""" + + # if self._tools: + # logger.warning("Agent already initialized; skipping re-initialization.") + # return + + # tools: list[ToolDefinition] = [] + + # # Use ThreadPoolExecutor to parallelize tool resolution + # with ThreadPoolExecutor(max_workers=4) as executor: + # futures = [] + + # # Submit tool resolution tasks + # for tool_spec in self.tools: + # future = executor.submit(resolve_tool, tool_spec, state) + # futures.append(future) + + # # Submit MCP tools creation if configured + # if self.mcp_config: + # future = executor.submit(create_mcp_tools, self.mcp_config, 30) + # futures.append(future) + + # # Collect results as they complete + # for future in futures: + # result = future.result() + # tools.extend(result) + + # logger.info( + # f"Loaded {len(tools)} tools from spec: {[tool.name for tool in tools]}" + # ) + # if self.filter_tools_regex: + # pattern = re.compile(self.filter_tools_regex) + # tools = [tool for tool in tools if pattern.match(tool.name)] + # logger.info( + # f"Filtered to {len(tools)} tools after applying regex filter: " + # f"{[tool.name for tool in tools]}", + # ) + + # # Do not include built-in tools; not subject to filtering + # # Instantiate built-in tools using their .create() method + # # for tool_class in BUILT_IN_TOOLS: + # # tools.extend(tool_class.create(state)) + + # # Check tool types + # for tool in tools: + # if not isinstance(tool, ToolDefinition): + # raise ValueError( + # f"Tool {tool} is not an instance of 'ToolDefinition'. " + # f"Got type: {type(tool)}" + # ) + + # # Check name duplicates + # tool_names = [tool.name for tool in tools] + # if len(tool_names) != len(set(tool_names)): + # duplicates = set(name for name in tool_names if tool_names.count(name) > 1) + # raise ValueError(f"Duplicate tool names found: {duplicates}") + + # # Store tools in a dict for easy access + # self._tools = {tool.name: tool for tool in tools} + + # agent._initialize = types.MethodType(_initialize, agent) + + return agent + +async def cleanup_resources(agent, env): + if env is not None: + await env.close() + env = None + +async def run_rollout(task: Task, config: RolloutConfig) -> dict | TrajectoryCollection: + agent = env = agent_wrapper_platoon = None + WORKSPACE_SETUP_TIMEOUT = 1200 # 20 minutes max for workspace setup + try: + """ + Steps: + 1. Create a new workspace (apptainer/remote/local), openhands agent, and initialize env + 2. Create trajectory collection and register event handlers + """ + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: Starting rollout for task {task.id}", flush=True) + instance: dict = task.misc + try: + loop = asyncio.get_event_loop() + status, working_dir, workspace = await loop.run_in_executor( + None, # Uses default ThreadPoolExecutor + prepare_workspace2, + instance + ) + except Exception as e: + raise RuntimeError( + f"Workspace setup failed for task {task.id}: {e}" + ) + if not status or working_dir is None: + raise RuntimeError(f"Workspace setup failed for task {task.id}") + + user_prompt_filename = USER_PROMPT_FILENAME + system_prompt_filename = SYSTEM_PROMPT_FILENAME + prompt_dir = (Path(__file__).parent / "prompts").resolve() + user_prompt_path = prompt_dir / user_prompt_filename + system_prompt_path = prompt_dir / system_prompt_filename + assert user_prompt_path.exists(), f"User prompt path {user_prompt_path} not found" + assert system_prompt_path.exists(), f"System prompt path {system_prompt_path} not found" + input_message = get_instruction(instance, str(user_prompt_path), str(working_dir)) + + task.goal = input_message + task.max_steps = config.max_steps if config.max_steps is not None else 6 + + llm: LLM = prepare_llm(config) + agent: AgentBase = prepare_agent(llm, str(system_prompt_path)) + agent_wrapper_platoon: OpenHandsAgent = OpenHandsAgent() + # env: CodeScoutEnv = CodeScoutEnv(task=task, agent=agent, workspace=str(working_dir)) + env: CodeScoutEnv = CodeScoutEnv(task=task, agent=agent, workspace=workspace) + + traj_collection = TrajectoryCollection() + current_trajectory_collection.set(traj_collection) + + events_path = os.path.join( + config.output_dir, + "events", + f"events_{task.id}_{traj_collection.id}.jsonl" + ) + + traj_collection.register_event_handlers( + JsonlFileSink( + events_path, + collection_id=traj_collection.id, + process_id=os.getpid() + ) + ) + + rollout_task = asyncio.create_task(run_episode(agent_wrapper_platoon, env, timeout=300)) + try: + # Apply a hard timeout to the entire rollout, not just individual steps + _ = await asyncio.wait_for(rollout_task, timeout=330) + except asyncio.TimeoutError: + if config.verbose: + print(f"Process {os.getpid()}: Rollout timed out for task {task.id}", flush=True) + # The task should already be cancelled by wait_for, but let's be explicit + raise + except Exception as e: + if config.verbose: + print(f"Process {os.getpid()}: Rollout failed for task {task.id}: {e}", flush=True) + raise + + try: + # if working_dir.exists(): + # os.system(f"rm -rf {str(working_dir)}") + # logger.info(f"Removed workspace {str(working_dir)}") + print("cleaning up resources", flush=True) + await asyncio.wait_for(cleanup_resources(agent_wrapper_platoon, env), timeout=60) + print("cleanup complete", flush=True) + # asyncio.create_task(cleanup_resources(agent_wrapper_platoon, env), timeout=) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as _: + pass + if config.return_dict: + return current_trajectory_collection.get().to_dict() + else: + return current_trajectory_collection.get() + except Exception as e: + if config.verbose: + print(f"Error running rollout for task {task.id}: {e}", flush=True) + print("cleaning up resources", flush=True) + try: + await asyncio.wait_for(cleanup_resources(agent_wrapper_platoon, env), timeout=60) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as cleanup_exc: + pass + print("cleanup complete", flush=True) + raise + finally: + # pass + # Safety-net cleanup: ensure env is closed even if run_episode never ran + # (e.g. error during workspace setup after env was created). + # env.close() is idempotent — if run_episode already called it, + # self._conversation will be None and this is a no-op. + print("cleaning up resources", flush=True) + try: + await asyncio.wait_for(cleanup_resources(agent_wrapper_platoon, env), timeout=60) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as cleanup_exc: + pass + print("cleanup complete", flush=True) + + # if env is not None: + # try: + # await asyncio.wait_for(env.close(), timeout=30) + # except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e: + # print(f"Warning: safety-net env.close() in run_rollout: {type(e).__name__}: {e}", flush=True) + # if agent_wrapper_platoon is not None: + # try: + # await asyncio.wait_for(agent_wrapper_platoon.close(), timeout=10) + # except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e: + # print(f"Warning: safety-net agent.close() in run_rollout: {type(e).__name__}: {e}", flush=True) diff --git a/plugins/codescout/platoon/codescout/tasks.py b/plugins/codescout/platoon/codescout/tasks.py new file mode 100644 index 0000000..059390e --- /dev/null +++ b/plugins/codescout/platoon/codescout/tasks.py @@ -0,0 +1,52 @@ +from platoon.envs.base import Task +from typing import Dict, Optional +import numpy as np +from datasets import load_dataset + +EVAL_AGENT_SERVER_IMAGE = "docker.io/adityasoni8/eval-agent-server" +SDK_SHORT_SHA = "b498a69" +ENV_SETUP_COMMANDS = ["export PIP_CACHE_DIR=~/.cache/pip"] +SYSTEM_PROMPT_FILENAME = "system_prompt.j2" +USER_PROMPT_FILENAME = "user_prompt.j2" +APPTAINER_CACHE_DIR = "/scratch/apptainer_cache" + +data_loaded: bool = False +train_data_map: Optional[Dict[str, Task]] = {} +val_data_map: Optional[Dict[str, Task]] = {} + +def create_task_from_instance(x: dict) -> Task: + task = Task( + id=x['instance_id'], + misc=x, + ) + return task + +def load_data(): + global data_loaded, train_data_map, val_data_map + if data_loaded: + return train_data_map, val_data_map + + dataset = load_dataset("adityasoni17/SWE-smith-py-code-search", split='train').to_pandas() + np.random.seed(42) + split_indices = np.random.rand(len(dataset)) < 0.9 + train_df = dataset.iloc[split_indices] + val_df = dataset.iloc[~split_indices] + for _, row in train_df.iterrows(): + if len(row["problem_statement"]) > 0: #NOTE: optionally filter training instances by repo or other criteria here if needed + train_data_map[row['instance_id']] = create_task_from_instance(row.to_dict()) + for _, row in val_df.iterrows(): + if len(row["problem_statement"]) > 0: #NOTE: optionally filter validation instances by repo or other criteria here if needed + val_data_map[row['instance_id']] = create_task_from_instance(row.to_dict()) + data_loaded = True + print(f"Loaded {len(train_data_map)} training instances and {len(val_data_map)} validation instances.", flush=True) + return train_data_map, val_data_map + +def get_task(task_id: str) -> Task: + load_data() + global train_data_map, val_data_map + if task_id in train_data_map: + return train_data_map[task_id] + elif task_id in val_data_map: + return val_data_map[task_id] + else: + raise ValueError(f"Task ID {task_id} not found in training or validation data.") \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/temp.yaml b/plugins/codescout/platoon/codescout/temp.yaml new file mode 100644 index 0000000..1a8d326 --- /dev/null +++ b/plugins/codescout/platoon/codescout/temp.yaml @@ -0,0 +1,180 @@ +experiment_name: gsm8k-grpo +trial_name: trial0 + +seed: 1 +enable_offload: false +total_train_epochs: 10 +tokenizer_path: ${actor.path} + +export_style: concat +agent_run_args: + max_turns: 2 + +cluster: + n_nodes: 1 + n_gpus_per_node: 8 + fileroot: /tmp/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /tmp/areal/name_resolve + +allocation_mode: sglang:d4p1t1+d4p1t1 + +scheduler: + type: null + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 256 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 2 + enable_rollout_tracing: false + scheduling_spec: ${actor.scheduling_spec} + fileroot: ${cluster.fileroot} + tokenizer_path: ${tokenizer_path} + dump_to_file: true + +gconfig: + n_samples: 4 + min_new_tokens: 0 + max_new_tokens: 1024 + greedy: false + temperature: 1.0 + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: Qwen/Qwen2.5-1.5B-Instruct + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 10240 + optimizer: + type: adam + lr: 1.70e-5 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + eps_clip: 0.4 + temperature: ${gconfig.temperature} + reward_scaling: 10.0 + reward_bias: -0.5 + kl_ctl: 0.0 + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: true + behav_imp_weight_cap: 5.0 + reward_norm: + mean_level: group + std_level: group + group_size: ${gconfig.n_samples} + adv_norm: + mean_level: batch + std_level: batch + max_new_tokens: ${gconfig.max_new_tokens} + scheduling_spec: + - task_type: worker + port_count: 2 + gpu: 1 + mem: 32 + cmd: python3 -m areal.infra.rpc.rpc_server + env_vars: {} + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 10240 + optimizer: null + scheduling_strategy: + type: colocation + target: actor + scheduling_spec: ${actor.scheduling_spec} + +# SGLang +sglang: + model_path: ${actor.path} + random_seed: ${seed} + skip_tokenizer_init: true + dtype: ${actor.dtype} + max_running_requests: null + context_length: 32768 + mem_fraction_static: 0.8 + +vllm: + model: ${actor.path} + seed: ${seed} + skip_tokenizer_init: false + dtype: ${actor.dtype} + max_model_len: 32768 + gpu_memory_utilization: 0.9 + +# datasets +train_dataset: + batch_size: 256 + shuffle: true + pin_memory: true + num_workers: 4 + path: openai/gsm8k + type: rl + max_length: 1024 + +valid_dataset: + batch_size: 256 + pin_memory: true + num_workers: 4 + path: openai/gsm8k + type: rl + +# Utilities +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: null + +recover: + mode: disabled + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: 3600 + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: disabled + +perf_tracer: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + enabled: false + session_tracer: + enabled: false \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/train.py b/plugins/codescout/platoon/codescout/train.py new file mode 100644 index 0000000..4676316 --- /dev/null +++ b/plugins/codescout/platoon/codescout/train.py @@ -0,0 +1,39 @@ +import sys +import logging +from datasets import Dataset +from areal.api.cli_args import load_expr_config +logging.basicConfig(level=logging.INFO) # Quiet by default +logging.getLogger("platoon.train.areal.workflows").setLevel(logging.INFO) +logging.getLogger("httpx").setLevel(logging.WARNING) # Silence httpx spam + +from platoon.codescout.tasks import get_task, load_data +from platoon.codescout.rollout import run_rollout +from platoon.train.areal import PlatoonArealRLTrainer, PlatoonArealRLTrainerConfig +from platoon.train.areal.workflows import StepWiseArealWorkflow + +def main(args): + print(args) + config, _ = load_expr_config(args, PlatoonArealRLTrainerConfig) + config: PlatoonArealRLTrainerConfig = config + + train_datamap, val_datamap = load_data() + train_dataset = Dataset.from_list([{ "task_id": x } for x in train_datamap.keys()]) + val_dataset = Dataset.from_list([{ "task_id": x } for x in val_datamap.keys()]) + + with PlatoonArealRLTrainer( + config=config, + train_dataset=train_dataset, + val_dataset=val_dataset, + ) as trainer: + proxy_server = trainer.proxy_server + # TODO: do we need custom reward processor here? + workflow = StepWiseArealWorkflow(run_rollout, get_task, config.workflow_config, proxy_server, 'train_rollout', trainer.actor.device) + eval_workflow = StepWiseArealWorkflow(run_rollout, get_task, config.workflow_config, proxy_server, 'eval_rollout', trainer.actor.device) + + trainer.train( + workflow=workflow, + eval_workflow=eval_workflow, + ) + +if __name__ == "__main__": + main(sys.argv[1:]) \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/train.yaml b/plugins/codescout/platoon/codescout/train.yaml new file mode 100644 index 0000000..129a017 --- /dev/null +++ b/plugins/codescout/platoon/codescout/train.yaml @@ -0,0 +1,166 @@ +experiment_name: codescout-grpo +trial_name: trial1 + +seed: 42 +total_train_epochs: 1 +tokenizer_path: ${actor.path} +enable_offload: false + + +cluster: + n_nodes: 1 + n_gpus_per_node: 8 + fileroot: /scratch/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /scratch/areal/name_resolve + +allocation_mode: sglang:d4p1t1+d4p1t1 + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 32 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 3 + enable_rollout_tracing: false + +gconfig: + n_samples: 1 + min_new_tokens: 0 + max_new_tokens: 8192 + greedy: false + temperature: 1.0 + +workflow_config: + rollout_config: + model_name: ${actor.path} + max_steps: 6 + output_dir: /scratch/areal/experiments/codescout-grpo-trial1 + verbose: True + train: true + timeout: 600 + group_size: 8 + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: adityasoni17/Qwen3-1.7B-RFT-500 + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 40000 + optimizer: + type: adam + lr: 1e-6 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + group_size: ${gconfig.n_samples} + eps_clip: 0.25 + temperature: ${gconfig.temperature} + reward_scaling: 1.0 + reward_bias: 0.0 + # aent: + # entropy_coeff: 0.002 + # entropy_clamp: 0.4 + # adaptive_coeff: True + # entropy_high: 0.23 + # entropy_low: 0.05 + # coeff_lr: 0.001 + # coeff_box_high: 0.003 + # coeff_box_low: 1e-5 + # warmup_steps: 10 + kl_ctl: 0.0 + kl_estimator: k1 + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: false + behav_imp_weight_cap: null + dynamic_sampling: false + reward_norm: + mean_level: batch + std_level: null + adv_norm: + mean_level: batch + std_level: batch + max_new_tokens: ${gconfig.max_new_tokens} + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 40000 + optimizer: null + +# SGLang +sglang: + model_path: ${actor.path} + random_seed: ${seed} + skip_tokenizer_init: false + dtype: ${actor.dtype} + max_running_requests: null + context_length: 40000 + mem_fraction_static: 0.8 + +# datasets +train_dataset: + batch_size: 32 + type: rl + path: "" + +valid_dataset: + batch_size: 32 + type: rl + path: "" + +# Utilities +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 5 + freq_steps: 20 + freq_secs: null + +recover: + mode: auto + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 5 + freq_steps: null + freq_secs: 3600 + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: online + project: textcraft-platoon + +launcher: + inference_server_cpus_per_gpu: 32 + inference_server_mem_per_gpu: 32768 + trainer_cpus_per_gpu: 4 + trainer_mem_per_gpu: 32768 diff --git a/plugins/codescout/platoon/codescout/train_correct.yaml b/plugins/codescout/platoon/codescout/train_correct.yaml new file mode 100644 index 0000000..1ae60c0 --- /dev/null +++ b/plugins/codescout/platoon/codescout/train_correct.yaml @@ -0,0 +1,152 @@ +experiment_name: codescout-grpo +trial_name: trial6 + +cluster: + n_nodes: 1 + n_gpus_per_node: 6 + fileroot: /scratch/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /scratch/areal/name_resolve + +allocation_mode: sglang:d2p1t1+d4p1t1 +seed: 42 +# enable_offload: false +total_train_epochs: 1 +tokenizer_path: ${actor.path} +gconfig: + n_samples: 8 + min_new_tokens: 0 + max_new_tokens: 8192 + greedy: false + temperature: 1.0 + +workflow_config: + rollout_config: + model_name: ${actor.path} + max_steps: 6 + output_dir: /scratch/areal/experiments/codescout-grpo-trial6 + verbose: True + train: true + timeout: 600 + group_size: 8 + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 2 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 3 + enable_rollout_tracing: false + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: adityasoni17/Qwen3-1.7B-RFT-500 + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 32768 + optimizer: + type: adam + lr: 1e-6 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + eps_clip: 0.0003 + eps_clip_higher: 0.0004 + temperature: ${gconfig.temperature} + reward_scaling: 1.0 + reward_bias: 0.0 + kl_ctl: 0.0 + kl_estimator: k1 + importance_sampling_level: sequence + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: false + behav_imp_weight_cap: null + dynamic_sampling: false + reward_norm: + mean_level: group + std_level: group + group_size: ${gconfig.n_samples} + adv_norm: + mean_level: batch + std_level: batch + max_new_tokens: ${gconfig.max_new_tokens} + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 32768 + optimizer: null + +train_dataset: + type: rl + batch_size: 8 + path: "" + +valid_dataset: + batch_size: 8 + type: rl + path: "" + +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: 10 + freq_secs: null + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: online + project: swe-grpo-platoon + +recover: + mode: auto + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 5 + freq_steps: null + freq_secs: 3600 + +sglang: + model_path: ${actor.path} + random_seed: ${seed} + skip_tokenizer_init: false + dtype: ${actor.dtype} + max_running_requests: null + context_length: 32768 + mem_fraction_static: 0.8 + +launcher: + inference_server_cpus_per_gpu: 1 + inference_server_mem_per_gpu: 38000 + trainer_cpus_per_gpu: 1 + trainer_mem_per_gpu: 45000 \ No newline at end of file diff --git a/plugins/codescout/platoon/codescout/train_local_working.yaml b/plugins/codescout/platoon/codescout/train_local_working.yaml new file mode 100644 index 0000000..f3273b9 --- /dev/null +++ b/plugins/codescout/platoon/codescout/train_local_working.yaml @@ -0,0 +1,152 @@ +experiment_name: codescout-grpo +trial_name: trial1 + +cluster: + n_nodes: 1 + n_gpus_per_node: 6 + fileroot: /scratch/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /scratch/areal/name_resolve + +allocation_mode: sglang:d2p1t1+d4p1t1 +seed: 42 +# enable_offload: false +total_train_epochs: 1 +tokenizer_path: ${actor.path} +gconfig: + n_samples: 8 + min_new_tokens: 0 + max_new_tokens: 8192 + greedy: false + temperature: 1.0 + +workflow_config: + rollout_config: + model_name: ${actor.path} + max_steps: 6 + output_dir: /scratch/areal/experiments/codescout-grpo-trial1 + verbose: True + train: true + timeout: 600 + group_size: 8 + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 32 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 3 + enable_rollout_tracing: false + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: adityasoni17/Qwen3-1.7B-RFT-500 + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 32768 + optimizer: + type: adam + lr: 1e-6 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + eps_clip: 0.0003 + eps_clip_higher: 0.0004 + temperature: ${gconfig.temperature} + reward_scaling: 1.0 + reward_bias: 0.0 + kl_ctl: 0.0 + kl_estimator: k1 + importance_sampling_level: sequence + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: false + behav_imp_weight_cap: null + dynamic_sampling: false + reward_norm: + mean_level: group + std_level: group + group_size: ${gconfig.n_samples} + adv_norm: + mean_level: batch + std_level: batch + max_new_tokens: ${gconfig.max_new_tokens} + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 40000 + optimizer: null + +train_dataset: + type: rl + batch_size: 8 + path: "" + +valid_dataset: + batch_size: 8 + type: rl + path: "" + +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: 10 + freq_secs: null + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: online + project: swe-grpo-platoon + +recover: + mode: auto + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 5 + freq_steps: null + freq_secs: 3600 + +sglang: + model_path: ${actor.path} + random_seed: ${seed} + skip_tokenizer_init: false + dtype: ${actor.dtype} + max_running_requests: null + context_length: 40000 + mem_fraction_static: 0.8 + +launcher: + inference_server_cpus_per_gpu: 1 + inference_server_mem_per_gpu: 38000 + trainer_cpus_per_gpu: 1 + trainer_mem_per_gpu: 40000 \ No newline at end of file diff --git a/plugins/codescout/pyproject.toml b/plugins/codescout/pyproject.toml new file mode 100644 index 0000000..766f250 --- /dev/null +++ b/plugins/codescout/pyproject.toml @@ -0,0 +1,70 @@ +[project] +name = "platoon-codescout" +version = "0.1.0" +description = "Platoon plugin for training OSS models with RL for repo-level code localization." +requires-python = "~=3.12.0" +authors = [ + {name = "Aditya Bharat Soni", email = "adityabs@cs.cmu.edu"} +] +dependencies = [ + "platoon >= 0.1.0", + "platoon-openhands >= 0.1.0", + "openhands-sdk", + "openhands-tools", + "openhands-workspace", + "openhands-agent-server" +] +[project.optional-dependencies] +# Training backends - install one of these for training +tinker = [ + "platoon[tinker]", +] +# NOTE: areal backend requires uv for installation (not available on PyPI) +areal = [ + "platoon[areal]", +] +# Logging integrations +wandb = [ + "platoon[wandb]", +] +# uv-specific configuration +[tool.uv] +no-build-isolation-package = ['flash-attn'] +# tinker and areal backends are mutually exclusive +conflicts = [ + [ + { extra = "tinker" }, + { extra = "areal" }, + ], +] +override-dependencies = [ + "fastapi[standard]>=0.115.0", + "openai==1.99.6", + "xgrammar==0.1.24", + "outlines-core==0.1.26", + "pyarrow==20.0.0", + "huggingface_hub==0.34", + "datasets==4.3.0", + "networkx==3.3.0" # This can be removed if ai-rubric pins 3.3.0 or areal relaxes the pin. +] +[tool.uv.sources] +platoon = { path = "../..", editable = true } +platoon-openhands = { path = "../openhands", editable = true } +openhands-sdk = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "de65ac5077753b0d5b7efda3df40accb89512948", subdirectory = "openhands-sdk" } +openhands-tools = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "de65ac5077753b0d5b7efda3df40accb89512948", subdirectory = "openhands-tools" } +openhands-workspace = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "de65ac5077753b0d5b7efda3df40accb89512948", subdirectory = "openhands-workspace" } +openhands-agent-server = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "de65ac5077753b0d5b7efda3df40accb89512948", subdirectory = "openhands-agent-server" } + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = ["E", "F", "I"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + + +[tool.hatch.build.targets.wheel] +packages = ["platoon"] diff --git a/plugins/openhands/platoon/openhands/agent.py b/plugins/openhands/platoon/openhands/agent.py index 9b07d60..0667428 100644 --- a/plugins/openhands/platoon/openhands/agent.py +++ b/plugins/openhands/platoon/openhands/agent.py @@ -3,7 +3,7 @@ import asyncio from copy import deepcopy from platoon.envs.base import Task -from platoon.openhands.types import OpenHandsObservation, OpenHandsAction +from .types import OpenHandsObservation, OpenHandsAction from platoon.utils.openhands_utils import get_actions_for_last_obs from platoon.utils.openhands_utils import is_finished @@ -23,8 +23,10 @@ async def act(self, obs: OpenHandsObservation) -> OpenHandsAction: obs, require_same_llm_call_id=True ) - + # if is_finished(obs): + # print("Agent detected finished conversation in agent.act", flush=True) action = OpenHandsAction(action_events=step_actions) + print(f"got actions: {[action_event.action for action_event in step_actions]}", flush=True) if step_actions: action.misc['completion_id'] = step_actions[-1].llm_response_id diff --git a/plugins/openhands/platoon/openhands/env.py b/plugins/openhands/platoon/openhands/env.py index fe757e9..45e27d2 100644 --- a/plugins/openhands/platoon/openhands/env.py +++ b/plugins/openhands/platoon/openhands/env.py @@ -3,19 +3,17 @@ from __future__ import annotations from platoon.envs.base import Task -from .types import OpenHandsObservation, OpenHandsTrajectoryStep, OpenHandsAction -from openhands.sdk.conversation import get_agent_final_response -from openhands.sdk.conversation.base import BaseConversation -from openhands.sdk.agent.base import AgentBase -from openhands.sdk.workspace.base import BaseWorkspace +from platoon.openhands.types import OpenHandsObservation, OpenHandsTrajectoryStep, OpenHandsAction +from openhands.sdk.conversation import get_agent_final_response, BaseConversation, Conversation, ConversationExecutionStatus, RemoteConversation +from openhands.sdk.agent import AgentBase +from openhands.sdk.workspace import BaseWorkspace from copy import deepcopy -from openhands.sdk.conversation.conversation import Conversation from platoon.episode.context import current_trajectory_collection, current_trajectory, finish_message, error_message from platoon.utils.openhands_utils import get_obs_for_last_action from platoon.utils.openhands_utils import is_finished -from openhands.sdk.conversation.state import ConversationExecutionStatus import threading import asyncio +import concurrent.futures class OpenHandsEnv: def __init__(self, task: Task, agent: AgentBase, workspace: str | BaseWorkspace): @@ -25,22 +23,62 @@ def __init__(self, task: Task, agent: AgentBase, workspace: str | BaseWorkspace) workspace = str(workspace) self._workspace = workspace self._conversation = None + self._run_thread: threading.Thread | None = None async def reset(self) -> OpenHandsObservation: - self._conversation: BaseConversation = Conversation(agent=self._agent, workspace=self._workspace, visualizer=None, max_iteration_per_run=self._task.max_steps) + # NOTE: Do NOT re-import the tool module here — it must be imported via + # "custom_tools.localization_finish" (the server-side path) at the top of + # rollout.py so that register_tool stores the correct module qualname. + self._conversation: BaseConversation = Conversation(agent=self._agent, visualizer=None, workspace=self._workspace, max_iteration_per_run=self._task.max_steps) + if isinstance(self._conversation, RemoteConversation): + self._conversation.delete_on_close = True self._state = OpenHandsObservation(task=self._task, conversation_state=self._conversation.state) + # from openhands.sdk.tool import Tool, register_tool + # from platoon.codescout.custom_tools.localization_finish import LocalizationFinishTool + # register_tool(LocalizationFinishTool.name, LocalizationFinishTool) + # import platoon.codescout.custom_tools.localization_finish self._conversation.send_message(self._task.goal) # NOTE: Run the conversation in a separate thread to avoid blocking the main thread. - threading.Thread(target=self._conversation.run, daemon=True).start() + # Set a 5-min timeout and pass it to conversation.run + self._run_thread = threading.Thread(target=self._conversation.run, kwargs={'timeout': 300}, daemon=True) + self._run_thread.start() + # try: + # self._conversation.run() + # except Exception as e: + # pass + # for event in self._conversation.state.events: + # if event.source == "agent": + # print(f"Conv. state: {self._conversation.state.execution_status} Initial conversation event: {event}", flush=True) + # obs_events = get_obs_for_last_action(self._state) + # print("Observation events:", obs_events) + # self._state.last_step_observation_id = obs_events[-1].id if obs_events else None + # from platoon.utils.openhands_utils import get_actions_for_last_obs + # while True: + # action_events = get_actions_for_last_obs(self._state) + # for action_event in action_events: + # print(f"Conv. state: {self._conversation.state.execution_status} Initial conversation action event: {action_event}", flush=True) + # self._state.last_step_action_id = action_events[-1].id if action_events else None + # obs_events = get_obs_for_last_action(self._state) + # for obs_event in obs_events: + # print(f"Conv. state: {self._conversation.state.execution_status} Initial conversation observation event: {obs_event}", flush=True) + # # print("Observation events:", obs_events) + # self._state.last_step_observation_id = obs_events[-1].id if obs_events else None + # if not obs_events: + # break + # exit() + # self._run_thread = threading.Thread(target=self._conversation.run, daemon=True) + # self._run_thread.start() traj_collection = current_trajectory_collection.get() traj = current_trajectory.get() traj_collection.set_trajectory_task(traj.id, self._state.task) traj.reward = 0.0 + # print(f"Starting env.reset, last step action id: {self._state.last_step_action_id}") obs_events = get_obs_for_last_action(self._state) while not obs_events: await asyncio.sleep(1) obs_events = get_obs_for_last_action(self._state) + # print(f"env.reset adding observation events: {[e.kind for e in obs_events]} for last step action id: {self._state.last_step_action_id}") traj_collection.add_trajectory_step(traj.id, OpenHandsTrajectoryStep( observation_events=obs_events, )) @@ -53,12 +91,18 @@ async def evaluate(self) -> tuple[float, dict]: async def step(self, action: OpenHandsAction) -> OpenHandsObservation: if action.action_events: self._state.last_step_action_id = action.action_events[-1].id + + print(f"waiting to get obs events, curr agent state: {self._conversation.state.execution_status} last step action: {action.action_events[-1]}") obs_events = get_obs_for_last_action(self._state) while not obs_events and not is_finished(self._state): await asyncio.sleep(0.2) obs_events = get_obs_for_last_action(self._state) + print(f"got obs events: {[e.kind for e in obs_events]}") + # Update last_step_observation_id to the last event we collected + # This includes all trailing system events when conversation finishes if obs_events: self._state.last_step_observation_id = obs_events[-1].id + step = OpenHandsTrajectoryStep( action_events=action, observation_events=obs_events, @@ -69,12 +113,19 @@ async def step(self, action: OpenHandsAction) -> OpenHandsObservation: self._state.reward += step.reward if is_finished(self._state): + print("Environment detected finished conversation in env.step", flush=True) self._state.finished = True - finish_message.set(get_agent_final_response(self._conversation.state.events)) + agent_final_msg: str | None = get_agent_final_response(self._conversation.state.events) + if agent_final_msg is None or agent_final_msg.strip() == "": + agent_final_msg = "No final response from agent." + finish_message.set(agent_final_msg) self._state.misc["finish_message"] = finish_message.get() - if self._state.conversation_state.agent_status == ConversationExecutionStatus.STUCK: + if self._state.conversation_state.execution_status == ConversationExecutionStatus.STUCK: error_message.set("Agent got stuck") self._state.misc["error_message"] = error_message.get() + elif self._state.conversation_state.execution_status == ConversationExecutionStatus.ERROR: #TODO: check + error_message.set("Agent encountered an error") + self._state.misc["error_message"] = error_message.get() traj_collection = current_trajectory_collection.get() traj = current_trajectory.get() @@ -86,8 +137,54 @@ async def step(self, action: OpenHandsAction) -> OpenHandsObservation: async def close(self) -> None: if self._conversation is not None: - self._conversation.close() - self._conversation = None + conversation = self._conversation + self._conversation = None + # Fire-and-forget: submit close() to a thread pool so the DELETE + # request completes even if this coroutine is cancelled by + # asyncio.wait_for() or CancelledError from the parent task. + # We use a standalone executor submit (not awaited) so cancellation + # of this coroutine cannot prevent the HTTP DELETE from being sent. + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = executor.submit(self._close_conversation_sync, conversation, self._workspace) + try: + # Give it a reasonable amount of time, but don't block forever + loop = asyncio.get_event_loop() + await asyncio.wait_for( + asyncio.wrap_future(future), + timeout=180 + ) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e: + # Even if we're cancelled or timed out, the thread-pool task + # will still finish in the background (the DELETE gets sent). + print(f"env.close() interrupted ({type(e).__name__}: {e}), " + f"cleanup thread will finish in background", flush=True) + finally: + # Don't call executor.shutdown(wait=True) which would block; + # let the daemon thread finish on its own. + executor.shutdown(wait=False) + # Wait briefly for the run-polling thread to notice the conversation + # was deleted and exit on its own. + if self._run_thread is not None: + self._run_thread.join(timeout=5) + if self._run_thread.is_alive(): + print("Warning: conversation run thread still alive after close()", flush=True) + self._run_thread = None + # TODO: check if cleaning up workspace manually is required -- causes errors -- check this later + # if isinstance(self._workspace, BaseWorkspace): + # await self._workspace.cleanup() + + @staticmethod + def _close_conversation_sync(conversation: BaseConversation, workspace=None) -> None: + """Synchronous helper that calls conversation.close() in a background thread. + This runs outside the asyncio event loop so it cannot be cancelled by + CancelledError. The DELETE request will always be sent.""" + try: + conversation.close() + if workspace is not None: + workspace.cleanup() + # workspace.__exit__(None, None, None) + except Exception as e: + print(f"Error in background conversation.close(): {e}", flush=True) # TODO: Consider adding a return_copy option here. async def observe(self) -> OpenHandsObservation: diff --git a/plugins/openhands/pyproject.toml b/plugins/openhands/pyproject.toml index 19f097b..5be7a49 100644 --- a/plugins/openhands/pyproject.toml +++ b/plugins/openhands/pyproject.toml @@ -7,11 +7,7 @@ authors = [ {name = "Apurva Gandhi", email = "apurvag@cs.cmu.edu"} ] dependencies = [ - "platoon >= 0.1.0", - "openhands-sdk", - "openhands-tools", - "openhands-workspace", - "openhands-agent-server" + "platoon >= 0.1.0" ] no-build-isolation-package = ['flash-attn'] diff --git a/plugins/openhands_rl/README.md b/plugins/openhands_rl/README.md new file mode 100644 index 0000000..9ce5034 --- /dev/null +++ b/plugins/openhands_rl/README.md @@ -0,0 +1,44 @@ +# platoon-openhands-rl + +Platoon plugin for intermediate rewards with the OpenHands software agent SDK. + +## Installation + +This plugin depends on: +- **platoon** (core library) +- **platoon-openhands** (OpenHands plugin) +- **areal** backend (for RL training) + +### Prerequisites + +- Python 3.12 +- [uv](https://docs.astral.sh/uv/) package manager + +### Step-by-step installation + +We recommend installing into a dedicated virtual environment (not in home directory for Babel space usage constraints). The instructions below use a custom location (`/data/user_data//uv_cache/platoon/`), but you can use any path. In Babel, use a compute node (not a CPU node) so that GPU is detected during torch installation + +Assuming you are in project root directory. + +> Note: this plugin pins `openhands-sdk` to a specific commit of +> `https://github.com/OpenHands/software-agent-sdk.git` via `plugins/openhands_rl/pyproject.toml`. + +```bash +# Create directory for the environment +export VIRTUAL_ENV=/data/user_data//uv_cache/platoon +export UV_CACHE_DIR=/data/user_data//uv_cache/.cache +uv sync --active --extra areal --extra wandb +mkdir -p /data/user_data/$USER/uv_cache/platoon +source /data/user_data//uv_cache/platoon/bin/activate +uv pip install -e plugins/openhands_rl +``` + +### Verify installation + +```bash +python -c " +from platoon.openhands import * +from platoon.openhands_rl import * +print('All packages imported successfully!') +" +``` diff --git a/plugins/openhands_rl/platoon/openhands_rl/__init__.py b/plugins/openhands_rl/platoon/openhands_rl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/openhands_rl/platoon/openhands_rl/env.py b/plugins/openhands_rl/platoon/openhands_rl/env.py new file mode 100644 index 0000000..a350954 --- /dev/null +++ b/plugins/openhands_rl/platoon/openhands_rl/env.py @@ -0,0 +1,34 @@ +from pathlib import Path +from platoon.utils.openhands_utils import is_finished +from platoon.episode.context import current_trajectory_collection, current_trajectory, finish_message, error_message + +from openhands.sdk import get_logger +from platoon.envs.base import Task +from openhands.sdk.agent import AgentBase +from openhands.sdk.workspace import BaseWorkspace +from openhands.sdk.conversation import Conversation, BaseConversation, get_agent_final_response +#TODO: check below imports +from platoon.openhands.env import OpenHandsEnv +from platoon.openhands.types import OpenHandsObservation, OpenHandsAction, OpenHandsTrajectoryStep +import threading +import asyncio +from platoon.utils.openhands_utils import is_finished + +logger = get_logger(__name__) + +# TODO: double-check if we really need to over-ride any other methods from OpenHandsEnv +# NOTE: The primary job of this class is to implement the step-wise reward functionality. +class OpenHandsRLEnv(OpenHandsEnv): + async def evaluate(self) -> tuple[float, dict]: + # return 0., {} + import random + # get a random reward between 0 and 0.5 and between 0.5 and 1 + + low_reward = random.uniform(0, 0.5) + high_reward = random.uniform(0.5, 1) + if not is_finished(self._state): + return low_reward, {} + agent_final_msg: str | None = get_agent_final_response(self._conversation.state.events) + if agent_final_msg is None or agent_final_msg.strip() == "" or "" in agent_final_msg: + return low_reward, {} + return high_reward, {} \ No newline at end of file diff --git a/plugins/openhands_rl/platoon/openhands_rl/prompts/default.j2 b/plugins/openhands_rl/platoon/openhands_rl/prompts/default.j2 new file mode 100644 index 0000000..87f83f6 --- /dev/null +++ b/plugins/openhands_rl/platoon/openhands_rl/prompts/default.j2 @@ -0,0 +1,63 @@ +I have access to a python code repository in the directory {{ instance.repo_path }} . You can explore and modify files using the available tools. Consider the following issue description: + + +{{ instance.problem_statement }} + + +Can you help me implement the necessary changes to the repository so that the requirements specified in the are met? +I've already taken care of all changes to any of the test files described in the . This means you DON'T have to modify the testing logic or any of the tests in any way! +Also the development Python environment is already set up for you (i.e., all dependencies already installed), so you don't need to install other packages. +Your task is to make the minimal changes to non-test files in the {{ instance.repo_path }} directory to ensure the is satisfied. + +Follow these phases to resolve the issue: + +Phase 1. READING: read the problem and reword it in clearer terms + 1.1 If there are code or config snippets. Express in words any best practices or conventions in them. + 1.2 Hightlight message errors, method names, variables, file names, stack traces, and technical details. + 1.3 Explain the problem in clear terms. + 1.4 Enumerate the steps to reproduce the problem. + 1.5 Hightlight any best practices to take into account when testing and fixing the issue + +Phase 2. RUNNING: install and run the tests on the repository + 2.1 Activate the environment by running + ./opt/miniconda3/etc/profile.d/conda.sh ; conda activate testbed + 2.2 Follow the readme + 2.3 Install the environment and anything needed + 2.4 Iterate and figure out how to run the tests + +Phase 3. EXPLORATION: find the files that are related to the problem and possible solutions + 3.1 Use `grep` to search for relevant methods, classes, keywords and error messages. + 3.2 Identify all files related to the problem statement. + 3.3 Propose the methods and files to fix the issue and explain why. + 3.4 From the possible file locations, select the most likely location to fix the issue. + +Phase 4. TEST CREATION: before implementing any fix, create a script to reproduce and verify the issue. + 4.1 Look at existing test files in the repository to understand the test format/structure. + 4.2 Create a minimal reproduction script that reproduces the located issue. + 4.3 Run the reproduction script to confirm you are reproducing the issue. + 4.4 Adjust the reproduction script as necessary. + +Phase 5. FIX ANALYSIS: state clearly the problem and how to fix it + 5.1 State clearly what the problem is. + 5.2 State clearly where the problem is located. + 5.3 State clearly how the test reproduces the issue. + 5.4 State clearly the best practices to take into account in the fix. + 5.5 State clearly how to fix the problem. + +Phase 6. FIX IMPLEMENTATION: Edit the source code to implement your chosen solution. + 6.1 Make minimal, focused changes to fix the issue. + +Phase 7. VERIFICATION: Test your implementation thoroughly. + 7.1 Run your reproduction script to verify the fix works. + 7.2 Add edge cases to your test script to ensure comprehensive coverage. + 7.3 Run existing tests related to the modified code to ensure you haven't broken anything. + +8. FINAL REVIEW: Carefully re-read the problem description and ensure your changes fully address the issue. + 8.1 Ensure you've fully addressed all requirements. + 8.2 Run any tests in the repository related to: + 8.2.1 The issue you are fixing + 8.2.2 The files you modified + 8.2.3 The functions you changed + 8.3 If any tests fail, revise your implementation until all tests pass + +Be thorough in your exploration, testing, and reasoning. It's fine if your thinking process is lengthy - quality and completeness are more important than brevity. \ No newline at end of file diff --git a/plugins/openhands_rl/platoon/openhands_rl/rollout.py b/plugins/openhands_rl/platoon/openhands_rl/rollout.py new file mode 100644 index 0000000..29486cf --- /dev/null +++ b/plugins/openhands_rl/platoon/openhands_rl/rollout.py @@ -0,0 +1,304 @@ +import os +from jinja2 import Environment, FileSystemLoader +import asyncio +from platoon.envs.base import Task +from .env import OpenHandsRLEnv +from platoon.utils.llm_client import LLMClient +import subprocess +from pathlib import Path +from openhands.sdk import LLM, get_logger, Agent, AgentBase, Tool +from openhands.workspace import DockerWorkspace, APIRemoteWorkspace, ApptainerWorkspace +from platoon.episode.trajectory import TrajectoryCollection +from platoon.config_defs import RolloutConfig +from openhands.sdk.workspace import BaseWorkspace +from openhands.tools.preset import get_default_agent +from platoon.episode.loop import run_episode +from platoon.episode.context import current_trajectory_collection +from pydantic import SecretStr +from platoon.visualization.event_sinks import JsonlFileSink +from .tasks import EVAL_AGENT_SERVER_IMAGE, SDK_SHORT_SHA, ENV_SETUP_COMMANDS, PROMPT_FILENAME, APPTAINER_CACHE_DIR +from platoon.openhands.agent import OpenHandsAgent +import platform +logger = get_logger(__name__) + +# TODO: consider pre-building all docker images, and adding their names in instance on Huggingface dataset for simpler code here +# def get_official_docker_image( +# instance_id: str, +# docker_image_prefix="docker.io/xingyaoww/", #NOTE: default changed to match SWE-Gym +# # dataset: str = "swe-gym" #TODO: add dataset parameter in future +# ) -> str: +# # Official SWE-Bench image +# # swebench/sweb.eval.x86_64.django_1776_django-11333:v1 +# # SWE-Gym image: docker.io/xingyaoww/sweb.eval.x86_64.project-monai_s_monai-6969 +# image_name = 'sweb.eval.x86_64.' + instance_id +# image_name = image_name.replace('__', '_s_') # to comply with docker image naming convention +# official_image_name = (docker_image_prefix.rstrip('/') + '/' + image_name).lower() +# logger.info(f"Official {docker_image_prefix} image: {official_image_name}") +# return official_image_name + +# # NOTE: the below function is for SWE-Bench. +# def get_official_docker_image( +# instance_id: str, +# docker_image_prefix="docker.io/swebench/", +# ) -> str: +# # Official SWE-Bench image +# # swebench/sweb.eval.x86_64.django_1776_django-11333:v1 +# repo, name = instance_id.split("__") +# official_image_name = docker_image_prefix.rstrip("/") +# official_image_name += f"/sweb.eval.x86_64.{repo}_1776_{name}:latest".lower() +# logger.debug(f"Official SWE-Bench image: {official_image_name}") +# return official_image_name + +# NOTE: the below function is for SWE-Smith. +def get_official_docker_image( + instance: dict, +) -> str: + # Official SWE-Bench image + # swebench/sweb.eval.x86_64.django_1776_django-11333:v1 + image_name: str = instance["image_name"] + official_image_name: str = image_name.lower().strip() + if not official_image_name.startswith("docker.io"): + official_image_name = f"docker.io/{official_image_name}" + logger.debug(f"Official SWE-Bench image: {official_image_name}") + return official_image_name + +def extract_custom_tag(base_image: str) -> str: + """ + Extract SWE-Bench instance ID from official SWE-Bench image name. + + Example: + docker.io/swebench/sweb.eval.x86_64.django_1776_django-12155:latest + -> sweb.eval.x86_64.django_1776_django-12155 + """ + name_tag = base_image.split("/")[-1] + name = name_tag.split(":")[0] + return name + +def detect_platform(): + """Detects the correct platform string.""" + machine = platform.machine().lower() + if "arm" in machine or "aarch64" in machine: + return "linux/arm64" + return "linux/amd64" + +def get_instruction( + instance: dict, + workspace_path: str, + prompt_path: str +) -> str: + """Generate user instruction for the agent for SWE-Bench-style tasks.""" + # Set up Jinja2 environment + # NOTE: Jinja template will not work for SWE-Smith as its base commit is None + prompts_dir = os.path.dirname(prompt_path) + template_name = os.path.basename(prompt_path) + env = Environment(loader=FileSystemLoader(prompts_dir)) + template = env.get_template(template_name) + + instance["repo_path"] = workspace_path + # Prepare context for rendering + context = { + "instance": instance, + # "actual_workspace_path": workspace_path, + } + # context["test_instructions"] = "" + + # Render the instruction + instruction = template.render(context) + return instruction + +def prepare_workspace(instance: dict) -> BaseWorkspace: + official_docker_image: str = get_official_docker_image(instance) + build_target: str = "source-minimal" #NOTE: no other targets work, so this is hard-coded for the time being + custom_tag: str = extract_custom_tag(official_docker_image) + suffix: str = f"-{build_target}" if build_target != "binary" else "" + agent_server_image: str = f"{EVAL_AGENT_SERVER_IMAGE}:{SDK_SHORT_SHA}-{custom_tag}{suffix}" + + workspace_type: str = instance.get("workspace_type", "apptainer") #TODO: make sure the instance dict has this key + env_setup_commands = instance.get("env_setup_commands", ENV_SETUP_COMMANDS) #TODO: make sure the instance dict has this key + if workspace_type == "apptainer": + workspace = ApptainerWorkspace( + server_image=agent_server_image, + working_dir="/workspace", + platform=detect_platform(), + cache_dir=os.environ.get("APPTAINER_CACHEDIR", APPTAINER_CACHE_DIR), + detach_logs=False + ) + elif workspace_type == "remote": + # TODO: check if the environment variables are passed till this point by AReaL + runtime_api_key = os.getenv("RUNTIME_API_KEY") + runtime_api_url = os.getenv("RUNTIME_API_URL", "https://runtime.eval.all-hands.dev") + workspace = APIRemoteWorkspace( + runtime_api_url=runtime_api_url, + runtime_api_key=runtime_api_key, + server_image=agent_server_image, + target_type="source" if "source" in build_target else "binary", + ) + else: #NOTE: Docker workspace not supported yet since Babel doesn't allow docker access + raise NotImplementedError(f"Workspace type {workspace_type} not implemented yet.") + for cmd in env_setup_commands: + res = workspace.execute_command(cmd) + if res.exit_code != 0: + raise RuntimeError( + f"Failed to run env setup command '{cmd}': {res.stderr}" + ) + logger.debug(f"Ran env setup command '{cmd}': {res.stdout}") + # NOTE: Setup repository in workspace (note that we assume the workspace is remote and has the repo pre-configured from SWE-{Bench, Gym, Smith}'s docker containers) + repo_path = f"/workspace/{instance['repo'].split('/')[-1]}/" + logger.info(f"Repo path in Remote workspace: {repo_path}") + instance["repo_path"] = repo_path + + cp_testbed_repo = workspace.execute_command( + (f"mkdir -p {repo_path} ; cp -r /testbed/. {repo_path}"), timeout=900 + ) + assert cp_testbed_repo.exit_code == 0, ( + f"cp_testbed_repo failed: {cp_testbed_repo.stderr}" + ) + patch_str = instance["patch"] + # apply this patch to the repository in the remote workspace. + apply_patch = workspace.execute_command(f"cd {repo_path} && git apply <<'EOF'\n{patch_str}\nEOF", timeout=900) + assert apply_patch.exit_code == 0, f"apply_patch failed: {apply_patch.stderr}" + # git_reset = workspace.execute_command(f"cd {repo_path} ; git reset --hard") + # assert git_reset.exit_code == 0, f"git reset failed: {git_reset.stderr}" + return workspace + +def prepare_llm(config: RolloutConfig) -> LLM: + is_train: bool = config.train + # TODO: make more adjustments based on training phase + if is_train: + temperature = 1.0 + else: + temperature = 0.6 + model_name = config.model_name + if not model_name.startswith("openai/") and not model_name.startswith("litellm_proxy/"): + model_name = "openai/" + model_name + return LLM( + usage_id="agent", + model=model_name, + # log_completions=True, #this may result in new events, check closely + # log_completions_folder = "/home/adityabs/platoon/logs/", + base_url=config.model_endpoint, + api_key=SecretStr(config.model_api_key) if config.model_api_key is not None else SecretStr("api_key"), + temperature=temperature, + litellm_extra_body={ + # "return_token_ids": True, # this will result in TokenEvents in event stream + "include_stop_str_in_output": False, + "add_generation_prompt": True, + "chat_template_kwargs": { + "enable_thinking": False, + } + }, + ) + +def prepare_agent(llm: LLM) -> AgentBase: + # TODO: make tools configurable via instance/env vars or config + # current behaviour: uses default tools without browser + return get_default_agent(llm=llm, cli_mode=True) # browser is added iff cli_mode is False + +async def run_rollout(task: Task, config: RolloutConfig) -> dict | TrajectoryCollection: + agent = env = agent_wrapper_platoon = None + WORKSPACE_SETUP_TIMEOUT = 1200 # 20 minutes max for workspace setup + try: + """ + Steps: + 1. Create a new workspace (apptainer/remote/docker), openhands agent, and initialize env + 2. Create trajectory collection and register event handlers + """ + curr = "initial" + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: Starting rollout for task {task.id}", flush=True) + + instance: dict = task.misc # SWE-Bench styled instance, with optional extra keys: "workspace_type", "docker_image_prefix", "dataset_type", etc. + + loop = asyncio.get_event_loop() + try: + workspace: BaseWorkspace = await asyncio.wait_for( + loop.run_in_executor(None, prepare_workspace, instance), + timeout=WORKSPACE_SETUP_TIMEOUT + ) + except asyncio.TimeoutError: + raise RuntimeError( + f"Workspace setup timed out after {WORKSPACE_SETUP_TIMEOUT}s for task {task.id}" + ) + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: Workspace ready for task {task.id}", flush=True) + curr = "workspace ready" + # Get task-specific instruction and configure task parameters + prompt_filename = instance.get("prompt_filename", PROMPT_FILENAME) #NOTE: make sure the instance dict has this key if customized prompt is desired + prompt_dir = (Path(__file__).parent / "prompts").resolve() + prompt_path = prompt_dir / prompt_filename + assert prompt_path.exists(), f"Prompt path {prompt_path} not found" + prompt_path = str(prompt_path) + repo_path = f"/workspace/{instance['repo'].split('/')[-1]}/" + instruction = get_instruction(instance, repo_path, prompt_path) + task.goal = instruction + task.max_steps = config.max_steps if config.max_steps is not None else 100 + + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: Preparing LLM and agent for task {task.id}", flush=True) + llm: LLM = prepare_llm(config) + agent: AgentBase = prepare_agent(llm) + agent_wrapper_platoon: OpenHandsAgent = OpenHandsAgent() + env: OpenHandsRLEnv = OpenHandsRLEnv(task=task, agent=agent, workspace=workspace) + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: LLM and agent ready for task {task.id}", flush=True) + curr = "agent and env ready" + + traj_collection = TrajectoryCollection() + current_trajectory_collection.set(traj_collection) + + events_path = os.path.join( + config.output_dir, + "events", + f"events_{task.id}_{traj_collection.id}.jsonl" + ) + + traj_collection.register_event_handlers( + JsonlFileSink( + events_path, + collection_id=traj_collection.id, + process_id=os.getpid() + ) + ) + + if config.verbose: + print(f"[run_rollout] Process {os.getpid()}: Starting episode execution for task {task.id}", flush=True) + curr = "episode execution started" + + # Create the rollout task with a hard deadline + # Using a shield to ensure we can still clean up even if cancelled + rollout_task = asyncio.create_task(run_episode(agent_wrapper_platoon, env, timeout=300)) + try: + # Apply a hard timeout to the entire rollout, not just individual steps + _ = await asyncio.wait_for(rollout_task, timeout=330) + except asyncio.TimeoutError: + if config.verbose: + print(f"Process {os.getpid()}: Rollout timed out for task {task.id}. State: {curr}", flush=True) + # The task should already be cancelled by wait_for, but let's be explicit + raise + except Exception as e: + if config.verbose: + print(f"Process {os.getpid()}: Rollout failed for task {task.id}: {e}. State: {curr}", flush=True) + raise + if config.return_dict: + return current_trajectory_collection.get().to_dict() + else: + return current_trajectory_collection.get() + except Exception as e: + if config.verbose: + print(f"Error running rollout for task {task.id}: {e}", flush=True) + raise + finally: + # Safety-net cleanup: ensure env is closed even if run_episode never ran + # (e.g. error during workspace setup after env was created). + # env.close() is idempotent — if run_episode already called it, + # self._conversation will be None and this is a no-op. + if env is not None: + try: + await asyncio.wait_for(env.close(), timeout=30) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e: + print(f"Warning: safety-net env.close() in run_rollout: {type(e).__name__}: {e}", flush=True) + if agent_wrapper_platoon is not None: + try: + await asyncio.wait_for(agent_wrapper_platoon.close(), timeout=10) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception) as e: + print(f"Warning: safety-net agent.close() in run_rollout: {type(e).__name__}: {e}", flush=True) \ No newline at end of file diff --git a/plugins/openhands_rl/platoon/openhands_rl/tasks.py b/plugins/openhands_rl/platoon/openhands_rl/tasks.py new file mode 100644 index 0000000..e9fcdb0 --- /dev/null +++ b/plugins/openhands_rl/platoon/openhands_rl/tasks.py @@ -0,0 +1,78 @@ +from platoon.envs.base import Task +import pandas as pd +import os +from typing import Dict, Literal, Optional, List +import numpy as np +from datasets import load_dataset + +EVAL_AGENT_SERVER_IMAGE = "docker.io/adityasoni8/eval-agent-server" +SDK_SHORT_SHA = "b498a69" +ENV_SETUP_COMMANDS = ["export PIP_CACHE_DIR=~/.cache/pip"] +PROMPT_FILENAME = "default.j2" +APPTAINER_CACHE_DIR = "/scratch/apptainer_cache" + +data_loaded: bool = False +train_data_map: Optional[Dict[str, Task]] = {} +val_data_map: Optional[Dict[str, Task]] = {} + +def create_task_from_instance(x: dict) -> Task: + task = Task( + id=x['instance_id'], + misc=x, + # NOTE: optionally add new parameters to instance dicts here if needed + # misc={ + # "instance_id": x['instance_id'], + # "repo": x['repo'], + # "base_commit": x['base_commit'], + # "problem_statement": x['problem_statement'], + # "target": x['target'], + # "workspace_type": x.get("workspace_type", "docker"), # default to docker + # "docker_image_prefix": x.get("docker_image_prefix", "docker.io/xingyaoww/"), + # "dataset_type": x.get("dataset_type", "swe-bench"), + # "prompt_filename": x.get("prompt_filename", PROMPT_FILENAME), + # } + ) + return task + +def load_data(): + global data_loaded, train_data_map, val_data_map + if data_loaded: + return train_data_map, val_data_map + # data_path = os.path.join(os.path.dirname(__file__), "train.parquet") #NOTE: make it huggingface dataset if possible + # dataset = pd.read_parquet(data_path) + dataset = load_dataset("SWE-bench/SWE-smith-py", split='train') + repo_list = [] + for instance in dataset: + repo_list.append(instance['repo']) + import random + random.seed(42) + random.shuffle(repo_list) + repo_list = repo_list[:8] + dataset = dataset.to_pandas() + np.random.seed(42) + split_indices = np.random.rand(len(dataset)) < 0.8 + train_df = dataset.iloc[split_indices] + val_df = dataset.iloc[~split_indices] + for _, row in train_df.iterrows(): + if len(row["problem_statement"]) > 0 and row['repo'] in repo_list: #NOTE: optionally filter training instances by repo or other criteria here if needed + train_data_map[row['instance_id']] = create_task_from_instance(row.to_dict()) + # if row['instance_id'] in instance_list: + # train_data_map[row['instance_id']] = create_task_from_instance(row.to_dict()) + for _, row in val_df.iterrows(): + if len(row["problem_statement"]) > 0 and row['repo'] in repo_list: #NOTE: optionally filter validation instances by repo or other criteria here if needed + val_data_map[row['instance_id']] = create_task_from_instance(row.to_dict()) + data_loaded = True + print(f"Loaded {len(train_data_map)} training instances and {len(val_data_map)} validation instances.", flush=True) + return train_data_map, val_data_map + + +# NOTE: we should have enough RAM to hold all the training instances in RAM since they are only <200MB in size, so no need to lazy load from disk for now +def get_task(task_id: str) -> Task: + load_data() + global train_data_map, val_data_map + if task_id in train_data_map: + return train_data_map[task_id] + elif task_id in val_data_map: + return val_data_map[task_id] + else: + raise ValueError(f"Task ID {task_id} not found in training or validation data.") \ No newline at end of file diff --git a/plugins/openhands_rl/platoon/openhands_rl/train.py b/plugins/openhands_rl/platoon/openhands_rl/train.py new file mode 100644 index 0000000..6f3a2d6 --- /dev/null +++ b/plugins/openhands_rl/platoon/openhands_rl/train.py @@ -0,0 +1,41 @@ +import sys +import logging +from datasets import Dataset +from areal.api.cli_args import load_expr_config +# Enable debug logging for platoon workflows +logging.basicConfig(level=logging.INFO) # Quiet by default +logging.getLogger("platoon.train.areal.workflows").setLevel(logging.INFO) +logging.getLogger("httpx").setLevel(logging.WARNING) # Silence httpx spam + +from platoon.openhands_rl.tasks import get_task, load_data +from platoon.openhands_rl.rollout import run_rollout +from platoon.train.areal import PlatoonArealRLTrainer, PlatoonArealRLTrainerConfig +from platoon.train.areal.workflows import StepWiseArealWorkflow + +def main(args): + print(args) + config, _ = load_expr_config(args, PlatoonArealRLTrainerConfig) + config: PlatoonArealRLTrainerConfig = config + + train_datamap, val_datamap = load_data() + train_dataset = Dataset.from_list([{ "task_id": x } for x in train_datamap.keys()][:1000]) + val_dataset = Dataset.from_list([{ "task_id": x } for x in val_datamap.keys()][:100]) + + with PlatoonArealRLTrainer( + config=config, + train_dataset=train_dataset, + val_dataset=val_dataset, + ) as trainer: + + proxy_server = trainer.proxy_server + # TODO: do we need custom reward processor here? + workflow = StepWiseArealWorkflow(run_rollout, get_task, config.workflow_config, proxy_server, 'train_rollout', trainer.actor.device, filter_errors=True) + eval_workflow = StepWiseArealWorkflow(run_rollout, get_task, config.workflow_config, proxy_server, 'eval_rollout', trainer.actor.device) + + trainer.train( + workflow=workflow, + eval_workflow=eval_workflow, + ) + +if __name__ == "__main__": + main(sys.argv[1:]) \ No newline at end of file diff --git a/plugins/openhands_rl/pyproject.toml b/plugins/openhands_rl/pyproject.toml new file mode 100644 index 0000000..5ac8005 --- /dev/null +++ b/plugins/openhands_rl/pyproject.toml @@ -0,0 +1,70 @@ +[project] +name = "platoon-openhands-rl" +version = "0.1.0" +description = "Platoon plugin for intermediate rewards with the openhands software agent sdk." +requires-python = "~=3.12.0" +authors = [ + {name = "Aditya Soni", email = "adityabs@cs.cmu.edu"} +] +dependencies = [ + "platoon >= 0.1.0", + "platoon-openhands >= 0.1.0", + "openhands-sdk", + "openhands-tools", + "openhands-workspace", + "openhands-agent-server" +] +[project.optional-dependencies] +# Training backends - install one of these for training +tinker = [ + "platoon[tinker]", +] +# NOTE: areal backend requires uv for installation (not available on PyPI) +areal = [ + "platoon[areal]", +] +# Logging integrations +wandb = [ + "platoon[wandb]", +] +# uv-specific configuration +[tool.uv] +no-build-isolation-package = ['flash-attn'] +# tinker and areal backends are mutually exclusive +conflicts = [ + [ + { extra = "tinker" }, + { extra = "areal" }, + ], +] +override-dependencies = [ + "fastapi[standard]>=0.115.0", + "openai==1.99.6", + "xgrammar==0.1.24", + "outlines-core==0.1.26", + "pyarrow==20.0.0", + "huggingface_hub==0.34", + "datasets==4.3.0", + "networkx==3.3.0" # This can be removed if ai-rubric pins 3.3.0 or areal relaxes the pin. +] +[tool.uv.sources] +platoon = { path = "../..", editable = true } +platoon-openhands = { path = "../openhands", editable = true } +openhands-sdk = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "b498a69908f7d06feb3921ffe05ff7e781a6f108", subdirectory = "openhands-sdk" } +openhands-tools = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "b498a69908f7d06feb3921ffe05ff7e781a6f108", subdirectory = "openhands-tools" } +openhands-workspace = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "b498a69908f7d06feb3921ffe05ff7e781a6f108", subdirectory = "openhands-workspace" } +openhands-agent-server = { git = "https://github.com/OpenHands/software-agent-sdk.git", rev = "b498a69908f7d06feb3921ffe05ff7e781a6f108", subdirectory = "openhands-agent-server" } + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = ["E", "F", "I"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + + +[tool.hatch.build.targets.wheel] +packages = ["platoon"]