Skip to content

Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237

Open
mustafab0 wants to merge 16 commits intodevfrom
feature/mustafa-reimplemnt-manipulation-skills
Open

Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237
mustafab0 wants to merge 16 commits intodevfrom
feature/mustafa-reimplemnt-manipulation-skills

Conversation

@mustafab0
Copy link
Contributor

@mustafab0 mustafab0 commented Feb 11, 2026

Problem

ManipulationModule had no skill layer — only low-level RPCs requiring a custom IPython client.
No way for an LLM agent to invoke pick/place/move skills through the framework.
RRT planner rejected valid joint states due to floating-point drift past URDF limits.


Solution

Added 11 @skill() methods to ManipulationModule (pick, place, move_to_pose, scan_objects, etc.).
Changed base class to SkillModule so skills auto-register with agents via autoconnect().
Created xarm-perception-agent blueprint composing xarm_perception + llm_agent + human_input.
Added detection snapshot so pick uses stable labels instead of volatile live cache.
Added limit_eps=1e-3 tolerance to RRT planner joint limit validation.
Removed ManipulationClient and run_* RPC wrappers — agent CLI replaces them.
CoordinatorClient updated to route execution through task_invoke instead of removed RPCs.


Breaking Changes

ManipulationClient deleted — use dimos run xarm-perception-agent or direct RPC.
run_pick/run_place/run_place_back/run_go_init RPCs removed from ManipulationModule.


How to Test

  1. Run dimos run coordinator-mock then dimos run xarm-perception-agent
  2. Type scan the scene — verify objects listed with 3D positions
  3. Type pick up the <object> — verify approach, grasp, retract sequence
  4. Type place it back — verify placement at original pick position
  5. Verify skills appear in dimos agentspy

closes DIM-351
closes DIM-419

@greptile-apps
Copy link

greptile-apps bot commented Feb 12, 2026

Greptile Overview

Greptile Summary

This PR adds an agent-invokable skill layer to manipulation by converting ManipulationModule to a SkillModule and introducing a set of @skill() actions (scan, pick/place, move_to_pose/joints, gripper control, go_home/init). It also adds a new xarm-perception-agent blueprint composing perception + an LLM agent + human CLI input, updates coordinator client calls to route through ControlCoordinator.task_invoke, removes the legacy ManipulationClient example, and loosens RRT joint-limit validation with a small epsilon to avoid floating-point drift rejecting valid states.

The changes fit into the existing architecture by leaning on SkillModule’s auto-registration of skills into agents via autoconnect(), while continuing to use the existing planning stack (WorldMonitor + PlannerSpec) and execution through the ControlCoordinator trajectory task API (now accessed via task_invoke).

Confidence Score: 2/5

  • This PR is not safe to merge until coordinator status polling and grasp RPC wiring issues are fixed.
  • Core new functionality (agentic pick/place) depends on coordinator task polling and grasp generation wiring. The PR currently calls a non-existent trajectory-task method (get_state) and drops TrajectoryStatus results expecting dicts, which will cause skills/CLI to report success early or never show progress. The grasp path also calls an RPC method that is neither declared nor implemented (GraspingModule.get_latest_grasps), causing runtime failures on the intended GraspGen-success path.
  • dimos/manipulation/manipulation_module.py, dimos/manipulation/control/coordinator_client.py

Important Files Changed

Filename Overview
dimos/manipulation/control/coordinator_client.py Switched trajectory status/execute/cancel to ControlCoordinator.task_invoke; current status handling expects dict but trajectory tasks return TrajectoryStatus, breaking wait_for_completion/status.
dimos/manipulation/manipulation_blueprints.py Adds xarm-perception-agent blueprint composed from xarm_perception + llm_agent + human_input and extends RobotModelConfig usage (home_joints). No correctness issues spotted in this file.
dimos/manipulation/manipulation_module.py Rebased ManipulationModule onto SkillModule and added many @Skill methods; introduced broken coordinator status polling (uses get_state) and broken GraspingModule RPC integration (calls non-existent get_latest_grasps).
dimos/manipulation/planning/examples/init.py Removes ManipulationClient export and replaces docstring with a generic planning examples blurb; no functional code remains.
dimos/manipulation/planning/examples/manipulation_client.py Deletes obsolete IPython ManipulationClient RPC helper (breaking change as described). No remaining code to review.
dimos/manipulation/planning/planners/rrt_planner.py Adds small epsilon tolerance when validating start/goal joint limits to reduce false rejections due to floating-point drift.
dimos/manipulation/planning/spec/config.py Extends RobotModelConfig with home_joints and pre_grasp_offset fields used by new skills; schema change is straightforward.
dimos/robot/all_blueprints.py Registers new xarm-perception-agent blueprint entry.
pyproject.toml Adds manipulation_module.py to largefiles ignore list; no runtime behavior changes.

Sequence Diagram

sequenceDiagram
  autonumber
  participant User as Human user
  participant HI as human_input (CLI)
  participant Agent as llm_agent
  participant Skills as SkillCoordinator
  participant Manip as ManipulationModule (SkillModule)
  participant WM as WorldMonitor
  participant Planner as RRTConnectPlanner
  participant CC as ControlCoordinator
  participant Task as Trajectory Task (JointTrajectoryController)

  User->>HI: type "pick up the cup"
  HI->>Agent: user message
  Agent->>Skills: select skill + args
  Skills->>Manip: pick(object_name)
  Manip->>WM: refresh_obstacles(min_duration)
  WM-->>Manip: obstacles added
  Manip->>Manip: _generate_grasps_for_pick()
  alt GraspingModule RPC wired
    Manip->>Manip: get_rpc_calls("GraspingModule.generate_grasps")
    Manip-->>Manip: grasp candidates (intended)
  else fallback heuristic
    Manip->>WM: list_cached_detections()
    WM-->>Manip: detections snapshot
    Manip-->>Manip: heuristic grasp pose
  end

  Manip->>Planner: plan_joint_path(start, goal)
  Planner-->>Manip: JointPath
  Manip->>Manip: generate JointTrajectory
  Manip->>CC: task_invoke(task, "execute", {trajectory})
  CC->>Task: execute(trajectory)
  Task-->>CC: accepted
  CC-->>Manip: result
  loop wait for completion
    Manip->>CC: task_invoke(task, "get_status" or "get_state")
    CC->>Task: get_status()/get_state()
    Task-->>CC: TrajectoryStatus / None
    CC-->>Manip: status
  end
  Manip-->>Skills: streamed progress strings
  Skills-->>Agent: tool results
  Agent-->>HI: assistant response
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@greptile-apps
Copy link

greptile-apps bot commented Feb 12, 2026

Additional Comments (1)

dimos/manipulation/manipulation_module.py
Polling wrong task method

This switched to client.task_invoke(..., "get_state", {}), but the trajectory task implementation exposes get_status() (returning a TrajectoryStatus) and does not implement get_state (see dimos/manipulation/control/trajectory_controller/joint_trajectory_controller.py:162-273). As written, task_invoke will return None and this method returns None, which also breaks _wait_for_trajectory_completion() (it treats None as “task not found” and returns success early at manipulation_module.py:1001-1003).

@mustafab0 mustafab0 changed the title Feature: Basic Agentic Pick and Place - reimplimented manipulation skills Feature: Basic Agentic Pick and Place - reimplemented manipulation skills Feb 12, 2026
@spomichter
Copy link
Contributor

i assume i need hardware to test this

# Snapshotted detections from the last scan_objects/refresh call.
# The live detection cache is volatile (labels change every frame),
# so pick/place use this stable snapshot instead.
self._detection_snapshot: list[dict[str, object]] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also idk what object means

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been an Object. A confusion with object vs Object. will fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced object data type with Any to avoid confusion in the interim.

Comment on lines 947 to 955
def _make_pose(
self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float
) -> Pose:
"""Construct a Pose from position and Euler angles (radians)."""
from dimos.msgs.geometry_msgs import Pose, Vector3
from dimos.utils.transform_utils import euler_to_quaternion

orientation = euler_to_quaternion(Vector3(roll, pitch, yaw))
return Pose(position=Vector3(x, y, z), orientation=orientation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a helper. removed as unnecesary part of legacy code before type checking.

Comment on lines +957 to +985
def _wait_for_trajectory_completion(
self, robot_name: RobotName | None = None, timeout: float = 60.0, poll_interval: float = 0.2
) -> bool:
"""Wait for trajectory execution to complete.

Polls the coordinator task state via task_invoke. Falls back to waiting
for the trajectory duration if the coordinator is unavailable.

Args:
robot_name: Robot to monitor
timeout: Maximum wait time in seconds
poll_interval: Time between status checks

Returns:
True if trajectory completed successfully
"""
robot = self._get_robot(robot_name)
if robot is None:
return True
rname, _, config, _ = robot
client = self._get_coordinator_client()

if client is None or not config.coordinator_task_name:
# No coordinator — wait for trajectory duration as fallback
traj = self._planned_trajectories.get(rname)
if traj is not None:
logger.info(f"No coordinator status — waiting {traj.duration:.1f}s for trajectory")
time.sleep(traj.duration + 0.5)
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled by @paul-nechifor skill coordinator stuff. We should have a universal and Generic set of "Skills" or "Processes" within dimOS and all governed by the same coordinator and manager. Fine for now since not done yet.

  1. dimOS skills are functions that agents can call
  2. dimOS processes are short-mid-long running tasks typically triggered by skill calls (or directly)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is (discussed with @leshy ) to expose everything as an MCP server. So the tool coordination will happen through the MCP client. The MCP client can be claude code, but we'll also write our own client.

The tools/resources (replacing the skill terminology) will be defined the way MCP expects them. But we can have helper classes for running long running tools in our code.

Multiple async tool calls can happen through MCP and it supports updates from the server to the client.

Long running tools calls will push a stream of updates, like generator skills. This is a little more complex in MCP, but we'll have helper code to do this.

Comment on lines +1366 to +1379
def pick(
self,
object_name: str,
object_id: str | None = None,
robot_name: str | None = None,
) -> Generator[str, None, None]:
"""Pick up an object by name using grasp planning and motion execution.

Scans the scene, generates grasp poses (via GraspGen or heuristic fallback),
plans collision-free approach/grasp/retract motions, and executes them.

Args:
object_name: Name of the object to pick (e.g. "cup", "bottle", "can").
object_id: Optional unique object ID from perception for precise identification.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paul-nechifor Flagging for you since skills like this are helpful to see what we're solving for on agents

@mustafab0
Copy link
Contributor Author

i assume i need hardware to test this

You only need a realsense camera to test this PR. All the robot hardware can be simulated using the coordinator-mock blueprint

@spomichter

@mustafab0 mustafab0 force-pushed the feature/mustafa-reimplemnt-manipulation-skills branch from 2a69411 to e869963 Compare February 13, 2026 23:08
@mustafab0 mustafab0 requested a review from spomichter February 13, 2026 23:11
# Hardware SDKs
"piper-sdk",
"xarm-python-sdk>=1.17.0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you add something like this pyrealsense2>=2.50.0 dependency here or in perception part?

"init_joints": self._init_joints,
}

@rpc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these three RPC methods, I guess its speculative API design its not a bug but wanted clarity on the followup:

The pipeline the actually runs is:
Robot boots -> _on_joint_state fires -> self._init_joints auto-captured
here this now proceeds further and,
LLM agent calls go_init skill -> reads self._init_joints directly -> plans & moves

No RPC call involved, go_init reads the field directly on self , not through get_init_joints()

The only scenario would be an external script using RPCClient, something like this:
client = RPCClient("manipulation")
client.set_init_joints([0.0, -0.5, 0.0, 0.8, 0.0, 1.0, 0.0]) # override
client.go_init() # now goes to custom pose instead of startup pose

But that pattern doesn't exist anywhere in the codebase. No script, no blueprint, no test, no agent does this.

Args:
robot_name: Robot to control (only needed for multi-arm setups).
"""
if self._set_gripper_position(0.85, robot_name):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I feel should exist considering modularity but it does not exist for now is that RobotModelConfig has gripper_hardware_id but has no gripper_open_position or gripper_close_position fields. The adapter knows its own range (Piper has GRIPPER_MAX_OPENING_M), but that information never flows up to ManipulationModule.

So I feel this can be solved by two config fields to RobotModelConfig and the blueprint sets them per robot.
Or
You can normalize it.

Thank you.

else:
# task_invoke returned None — task not found, assume done
return True
except Exception:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we are not catching the exception here, because if the task_invoke throws (e.g. coordinator crash), it accepts it and then returns True, so the skill thinks the trajectory succeeded and continues to the next step even though the arm may not have moved.

self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float
) -> Pose:
"""Construct a Pose from position and Euler angles (radians)."""
from dimos.msgs.geometry_msgs import Pose, Vector3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion that is it possible to put these imports with JointState and above as the geometry_msg is a lightweight msg module already imported under TYPE_CHECKING so there is no circular imports or heavy import costs.
Thank you.

for msg in self.pick(object_name, object_id, robot_name):
yield msg
# Check if pick failed
if msg.startswith("Error:"):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its just my personal opinion that I do not like string matching failure catch, in my opinion string-based error detection is fragile, if someone adds a new failure path like "Failure: ..." without the "Error:..." prefix, but as the code stands right now, it is correctly mapping the returns Error prefix, so functionally correct. Small concern.
Thank you.

gripper_hardware_id=gripper_hardware_id,
tf_extra_links=tf_extra_links or [],
# Home configuration: arm extended forward, elbow up (safe observe pose)
home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think we need to add the home_joints to the other config also for xarm6 and piper. If this addition is intentional only for xarm7 then its fine.
Thank you.

return "\n".join(lines)

@skill()
def scan_objects(self, min_duration: float = 1.0, robot_name: str | None = None) -> str:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The robot_name is does not seem to be used in the method body. Is it there for API consistency? or does it serve a different purpose? Just need some clarification.

min_duration: Minimum time in seconds to wait for stable detections.
robot_name: Robot context (only needed for multi-arm setups).
"""
obstacles = self.refresh_obstacles(min_duration)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I just wanted to understand that refresh_obstacles snapshots detections into self._detection_snapshot (stable, frozen at that moment). But then scan_objects doesn't use self._detection_snapshot, it calls self.list_cached_detections() which reads the live volatile cache from WorldMonitor also I am aware that 99% of the time these return the same data since they are called milliseconds apart. But the whole point of _detection_snapshot (as the comment says: "the live cache is volatile") was to avoid reading from the live cache. scan_objects ignores that and reads the live cache anyway, so again just a concern about the approach.

print("=" * 70)


def wait_for_completion(client: CoordinatorClient, task_name: str, timeout: float = 60.0) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I presume that thewait_for_completion() is broken after the task_invoke migration. get_trajectory_status now returns {"state": int, "task": str}, but this function still reads active, progress, and here it expects a state as a string and then it exits on first poll reporting failure. I feel that it can be resolved by either update it to match the new return format or remove it and the ManipulationModule._wait_for_trajectory_completion() already handles this correctly.

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate agent with Manipulation module Implement Manipulation Skills Reimplemnt manipulation skills

4 participants