Skip to content

Langchain adapter updates#181

Open
dixitaniket wants to merge 4 commits intomainfrom
ani/langchain-adapater
Open

Langchain adapter updates#181
dixitaniket wants to merge 4 commits intomainfrom
ani/langchain-adapater

Conversation

@dixitaniket
Copy link
Collaborator

No description provided.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the LangChain adapter and LLM client behavior to better support async/streaming usage and to automatically recover from a known x402 “Invalid payment required response” failure mode, along with dependency lockfile updates.

Changes:

  • Add “retry once after x402 stack reset” behavior for non-streaming and streaming LLM requests when the known invalid-payment error occurs.
  • Expand the LangChain adapter to support client injection, async methods, streaming chunk conversion, and improved tool-call parsing/serialization.
  • Update pinned dependency versions (including og-test-v2-x402==0.0.12.dev3) and refresh uv.lock.

Reviewed changes

Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/opengradient/client/llm.py Adds x402 stack reset + retry-once logic for completion/chat, including streaming retry.
src/opengradient/agents/og_langchain.py Refactors LangChain adapter for async paths, streaming support, tool handling, and client lifecycle.
src/opengradient/agents/__init__.py Extends langchain_adapter factory to accept client + connection overrides and new params.
tests/llm_test.py Adds tests asserting a single retry + reset on the invalid-payment error (streaming and non-streaming).
tests/langchain_adapter_test.py Adds coverage for injected client init, missing-key validation, identifying params, and async/stream paths.
pyproject.toml Bumps pinned og-test-v2-x402 dependency.
requirements.txt Bumps pinned og-test-v2-x402 dependency.
uv.lock Updates locked versions (including opengradient metadata and og-test-v2-x402).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +368 to +393
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
sdk_messages = self._convert_messages_to_sdk(messages)
chat_kwargs = self._build_chat_kwargs(sdk_messages, stop, stream=True, **kwargs)
queue: Queue[Any] = Queue()

return ChatResult(generations=[ChatGeneration(message=ai_message, generation_info={"finish_reason": finish_reason})])
def _runner() -> None:
async def _run() -> None:
stream = await self._llm.chat(**chat_kwargs)
async for chunk in cast(AsyncIterator[StreamChunk], stream):
queue.put(self._stream_chunk_to_generation(chunk))

try:
asyncio.run(_run())
except BaseException as exc: # noqa: BLE001
queue.put(exc)
finally:
queue.put(_STREAM_END)

thread = Thread(target=_runner, daemon=True)
thread.start()
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stream always spawns a new thread and runs self._llm.chat() inside asyncio.run(). Since the SDK client’s underlying async HTTP client is created on the main thread, using it from another thread/event loop is not safe. Consider implementing sync streaming by driving an event loop in the current thread (e.g., create a dedicated loop and iterate the async generator via run_until_complete on __anext__()), or otherwise ensure the HTTP client is created/used within the same thread+loop.

Copilot uses AI. Check for mistakes.
):
resolved_model_cid = model_cid or model
if resolved_model_cid is None:
raise ValueError("model_cid (or model) is required.")
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenGradientChatModel now accepts model_cid / model as str, but the underlying SDK client expects models in the provider/model form (LLM.chat does model.split("/")[1]). Passing a plain model name like "gpt-5" would raise IndexError at runtime. Consider validating the string format here (or normalizing to the required format) and raising a clear ValueError when it’s not supported.

Suggested change
raise ValueError("model_cid (or model) is required.")
raise ValueError("model_cid (or model) is required.")
# When a plain string is provided, ensure it matches the expected "provider/model" format
if isinstance(resolved_model_cid, str) and "/" not in resolved_model_cid:
raise ValueError(
f"Invalid model identifier '{resolved_model_cid}'. "
"Expected format 'provider/model', e.g. 'openai/gpt-4o'."
)

Copilot uses AI. Check for mistakes.
Comment on lines +126 to +148
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)

queue: Queue[Any] = Queue(maxsize=1)

def _runner() -> None:
try:
queue.put(asyncio.run(coro))
except BaseException as exc: # noqa: BLE001
queue.put(exc)

thread = Thread(target=_runner, daemon=True)
thread.start()
outcome = queue.get()
thread.join()

if isinstance(outcome, BaseException):
raise outcome
return outcome


Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_coro_sync runs the coroutine in a new thread when a loop is already running. Because LLM holds an httpx.AsyncClient (via x402HttpxClient), running its async methods in a different thread/event loop is not thread-safe and can fail with “attached to a different loop” style errors. A safer approach is to either (a) raise a clear error instructing callers to use the async methods when a loop is running, or (b) schedule work onto the existing loop (without creating a second event loop) rather than using asyncio.run in another thread.

Suggested change
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
queue: Queue[Any] = Queue(maxsize=1)
def _runner() -> None:
try:
queue.put(asyncio.run(coro))
except BaseException as exc: # noqa: BLE001
queue.put(exc)
thread = Thread(target=_runner, daemon=True)
thread.start()
outcome = queue.get()
thread.join()
if isinstance(outcome, BaseException):
raise outcome
return outcome
"""Run a coroutine synchronously when no event loop is running.
If an event loop is already running in this thread, synchronous execution
is not supported because it may conflict with resources (such as HTTP
clients) that are bound to the existing loop. In that case, callers must
use the async APIs directly.
"""
try:
# Raises RuntimeError if no loop is running in this thread.
asyncio.get_running_loop()
except RuntimeError:
# Safe to create and run a new event loop.
return asyncio.run(coro)
# An event loop is already running; do not create a second loop in
# another thread, as this is not safe for loop-bound resources.
raise RuntimeError(
"Cannot run coroutine synchronously while an event loop is running. "
"Use the async methods of OpenGradientChatModel instead."
)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants