Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237
Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237
Conversation
Greptile OverviewGreptile SummaryThis PR adds an agent-invokable skill layer to manipulation by converting The changes fit into the existing architecture by leaning on Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
autonumber
participant User as Human user
participant HI as human_input (CLI)
participant Agent as llm_agent
participant Skills as SkillCoordinator
participant Manip as ManipulationModule (SkillModule)
participant WM as WorldMonitor
participant Planner as RRTConnectPlanner
participant CC as ControlCoordinator
participant Task as Trajectory Task (JointTrajectoryController)
User->>HI: type "pick up the cup"
HI->>Agent: user message
Agent->>Skills: select skill + args
Skills->>Manip: pick(object_name)
Manip->>WM: refresh_obstacles(min_duration)
WM-->>Manip: obstacles added
Manip->>Manip: _generate_grasps_for_pick()
alt GraspingModule RPC wired
Manip->>Manip: get_rpc_calls("GraspingModule.generate_grasps")
Manip-->>Manip: grasp candidates (intended)
else fallback heuristic
Manip->>WM: list_cached_detections()
WM-->>Manip: detections snapshot
Manip-->>Manip: heuristic grasp pose
end
Manip->>Planner: plan_joint_path(start, goal)
Planner-->>Manip: JointPath
Manip->>Manip: generate JointTrajectory
Manip->>CC: task_invoke(task, "execute", {trajectory})
CC->>Task: execute(trajectory)
Task-->>CC: accepted
CC-->>Manip: result
loop wait for completion
Manip->>CC: task_invoke(task, "get_status" or "get_state")
CC->>Task: get_status()/get_state()
Task-->>CC: TrajectoryStatus / None
CC-->>Manip: status
end
Manip-->>Skills: streamed progress strings
Skills-->>Agent: tool results
Agent-->>HI: assistant response
|
Additional Comments (1)
This switched to |
|
i assume i need hardware to test this |
| # Snapshotted detections from the last scan_objects/refresh call. | ||
| # The live detection cache is volatile (labels change every frame), | ||
| # so pick/place use this stable snapshot instead. | ||
| self._detection_snapshot: list[dict[str, object]] = [] |
There was a problem hiding this comment.
here also idk what object means
There was a problem hiding this comment.
This should have been an Object. A confusion with object vs Object. will fix.
There was a problem hiding this comment.
There's an issue open to rename all of them https://linear.app/dimensional/issue/DIM-452/rename-object-to-something-else-entity-body-etc
There was a problem hiding this comment.
replaced object data type with Any to avoid confusion in the interim.
| def _make_pose( | ||
| self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float | ||
| ) -> Pose: | ||
| """Construct a Pose from position and Euler angles (radians).""" | ||
| from dimos.msgs.geometry_msgs import Pose, Vector3 | ||
| from dimos.utils.transform_utils import euler_to_quaternion | ||
|
|
||
| orientation = euler_to_quaternion(Vector3(roll, pitch, yaw)) | ||
| return Pose(position=Vector3(x, y, z), orientation=orientation) |
There was a problem hiding this comment.
just a helper. removed as unnecesary part of legacy code before type checking.
| def _wait_for_trajectory_completion( | ||
| self, robot_name: RobotName | None = None, timeout: float = 60.0, poll_interval: float = 0.2 | ||
| ) -> bool: | ||
| """Wait for trajectory execution to complete. | ||
|
|
||
| Polls the coordinator task state via task_invoke. Falls back to waiting | ||
| for the trajectory duration if the coordinator is unavailable. | ||
|
|
||
| Args: | ||
| robot_name: Robot to monitor | ||
| timeout: Maximum wait time in seconds | ||
| poll_interval: Time between status checks | ||
|
|
||
| Returns: | ||
| True if trajectory completed successfully | ||
| """ | ||
| robot = self._get_robot(robot_name) | ||
| if robot is None: | ||
| return True | ||
| rname, _, config, _ = robot | ||
| client = self._get_coordinator_client() | ||
|
|
||
| if client is None or not config.coordinator_task_name: | ||
| # No coordinator — wait for trajectory duration as fallback | ||
| traj = self._planned_trajectories.get(rname) | ||
| if traj is not None: | ||
| logger.info(f"No coordinator status — waiting {traj.duration:.1f}s for trajectory") | ||
| time.sleep(traj.duration + 0.5) | ||
| return True |
There was a problem hiding this comment.
This should be handled by @paul-nechifor skill coordinator stuff. We should have a universal and Generic set of "Skills" or "Processes" within dimOS and all governed by the same coordinator and manager. Fine for now since not done yet.
- dimOS skills are functions that agents can call
- dimOS processes are short-mid-long running tasks typically triggered by skill calls (or directly)
There was a problem hiding this comment.
The plan is (discussed with @leshy ) to expose everything as an MCP server. So the tool coordination will happen through the MCP client. The MCP client can be claude code, but we'll also write our own client.
The tools/resources (replacing the skill terminology) will be defined the way MCP expects them. But we can have helper classes for running long running tools in our code.
Multiple async tool calls can happen through MCP and it supports updates from the server to the client.
Long running tools calls will push a stream of updates, like generator skills. This is a little more complex in MCP, but we'll have helper code to do this.
| def pick( | ||
| self, | ||
| object_name: str, | ||
| object_id: str | None = None, | ||
| robot_name: str | None = None, | ||
| ) -> Generator[str, None, None]: | ||
| """Pick up an object by name using grasp planning and motion execution. | ||
|
|
||
| Scans the scene, generates grasp poses (via GraspGen or heuristic fallback), | ||
| plans collision-free approach/grasp/retract motions, and executes them. | ||
|
|
||
| Args: | ||
| object_name: Name of the object to pick (e.g. "cup", "bottle", "can"). | ||
| object_id: Optional unique object ID from perception for precise identification. |
There was a problem hiding this comment.
@paul-nechifor Flagging for you since skills like this are helpful to see what we're solving for on agents
You only need a realsense camera to test this PR. All the robot hardware can be simulated using the coordinator-mock blueprint |
…lues have tiny errors that can get out of joint limit range
… dimos Object Datatype
2a69411 to
e869963
Compare
| # Hardware SDKs | ||
| "piper-sdk", | ||
| "xarm-python-sdk>=1.17.0", | ||
|
|
There was a problem hiding this comment.
Should you add something like this pyrealsense2>=2.50.0 dependency here or in perception part?
| "init_joints": self._init_joints, | ||
| } | ||
|
|
||
| @rpc |
There was a problem hiding this comment.
For these three RPC methods, I guess its speculative API design its not a bug but wanted clarity on the followup:
The pipeline the actually runs is:
Robot boots -> _on_joint_state fires -> self._init_joints auto-captured
here this now proceeds further and,
LLM agent calls go_init skill -> reads self._init_joints directly -> plans & moves
No RPC call involved, go_init reads the field directly on self , not through get_init_joints()
The only scenario would be an external script using RPCClient, something like this:
client = RPCClient("manipulation")
client.set_init_joints([0.0, -0.5, 0.0, 0.8, 0.0, 1.0, 0.0]) # override
client.go_init() # now goes to custom pose instead of startup pose
But that pattern doesn't exist anywhere in the codebase. No script, no blueprint, no test, no agent does this.
| Args: | ||
| robot_name: Robot to control (only needed for multi-arm setups). | ||
| """ | ||
| if self._set_gripper_position(0.85, robot_name): |
There was a problem hiding this comment.
What I feel should exist considering modularity but it does not exist for now is that RobotModelConfig has gripper_hardware_id but has no gripper_open_position or gripper_close_position fields. The adapter knows its own range (Piper has GRIPPER_MAX_OPENING_M), but that information never flows up to ManipulationModule.
So I feel this can be solved by two config fields to RobotModelConfig and the blueprint sets them per robot.
Or
You can normalize it.
Thank you.
| else: | ||
| # task_invoke returned None — task not found, assume done | ||
| return True | ||
| except Exception: |
There was a problem hiding this comment.
Is there a reason we are not catching the exception here, because if the task_invoke throws (e.g. coordinator crash), it accepts it and then returns True, so the skill thinks the trajectory succeeded and continues to the next step even though the arm may not have moved.
| self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float | ||
| ) -> Pose: | ||
| """Construct a Pose from position and Euler angles (radians).""" | ||
| from dimos.msgs.geometry_msgs import Pose, Vector3 |
There was a problem hiding this comment.
Just a suggestion that is it possible to put these imports with JointState and above as the geometry_msg is a lightweight msg module already imported under TYPE_CHECKING so there is no circular imports or heavy import costs.
Thank you.
| for msg in self.pick(object_name, object_id, robot_name): | ||
| yield msg | ||
| # Check if pick failed | ||
| if msg.startswith("Error:"): |
There was a problem hiding this comment.
Its just my personal opinion that I do not like string matching failure catch, in my opinion string-based error detection is fragile, if someone adds a new failure path like "Failure: ..." without the "Error:..." prefix, but as the code stands right now, it is correctly mapping the returns Error prefix, so functionally correct. Small concern.
Thank you.
| gripper_hardware_id=gripper_hardware_id, | ||
| tf_extra_links=tf_extra_links or [], | ||
| # Home configuration: arm extended forward, elbow up (safe observe pose) | ||
| home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], |
There was a problem hiding this comment.
Hey, I think we need to add the home_joints to the other config also for xarm6 and piper. If this addition is intentional only for xarm7 then its fine.
Thank you.
| return "\n".join(lines) | ||
|
|
||
| @skill() | ||
| def scan_objects(self, min_duration: float = 1.0, robot_name: str | None = None) -> str: |
There was a problem hiding this comment.
The robot_name is does not seem to be used in the method body. Is it there for API consistency? or does it serve a different purpose? Just need some clarification.
| min_duration: Minimum time in seconds to wait for stable detections. | ||
| robot_name: Robot context (only needed for multi-arm setups). | ||
| """ | ||
| obstacles = self.refresh_obstacles(min_duration) |
There was a problem hiding this comment.
Here I just wanted to understand that refresh_obstacles snapshots detections into self._detection_snapshot (stable, frozen at that moment). But then scan_objects doesn't use self._detection_snapshot, it calls self.list_cached_detections() which reads the live volatile cache from WorldMonitor also I am aware that 99% of the time these return the same data since they are called milliseconds apart. But the whole point of _detection_snapshot (as the comment says: "the live cache is volatile") was to avoid reading from the live cache. scan_objects ignores that and reads the live cache anyway, so again just a concern about the approach.
| print("=" * 70) | ||
|
|
||
|
|
||
| def wait_for_completion(client: CoordinatorClient, task_name: str, timeout: float = 60.0) -> bool: |
There was a problem hiding this comment.
Here I presume that thewait_for_completion() is broken after the task_invoke migration. get_trajectory_status now returns {"state": int, "task": str}, but this function still reads active, progress, and here it expects a state as a string and then it exits on first poll reporting failure. I feel that it can be resolved by either update it to match the new return format or remove it and the ManipulationModule._wait_for_trajectory_completion() already handles this correctly.
Thank you.
Problem
ManipulationModule had no skill layer — only low-level RPCs requiring a custom IPython client.
No way for an LLM agent to invoke pick/place/move skills through the framework.
RRT planner rejected valid joint states due to floating-point drift past URDF limits.
Solution
Added 11
@skill()methods to ManipulationModule (pick, place, move_to_pose, scan_objects, etc.).Changed base class to
SkillModuleso skills auto-register with agents viaautoconnect().Created
xarm-perception-agentblueprint composing xarm_perception + llm_agent + human_input.Added detection snapshot so pick uses stable labels instead of volatile live cache.
Added
limit_eps=1e-3tolerance to RRT planner joint limit validation.Removed ManipulationClient and
run_*RPC wrappers — agent CLI replaces them.CoordinatorClient updated to route execution through
task_invokeinstead of removed RPCs.Breaking Changes
ManipulationClientdeleted — usedimos run xarm-perception-agentor direct RPC.run_pick/run_place/run_place_back/run_go_initRPCs removed from ManipulationModule.How to Test
dimos run coordinator-mockthendimos run xarm-perception-agentscan the scene— verify objects listed with 3D positionspick up the <object>— verify approach, grasp, retract sequenceplace it back— verify placement at original pick positiondimos agentspycloses DIM-351
closes DIM-419