Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ type Message struct {
// Cost is the cost of this message in dollars (only set for assistant messages)
Cost float64 `json:"cost,omitempty"`

// FinishReason indicates why the model stopped generating for this message.
// "stop" = natural end, "tool_calls" = tool invocation, "length" = token limit.
// Only set for assistant messages.
FinishReason FinishReason `json:"finish_reason,omitempty"`

// CacheControl indicates whether this message is a cached message (only used by anthropic)
CacheControl bool `json:"cache_control,omitempty"`
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/runtime/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,14 @@ type Usage struct {
}

// MessageUsage contains per-message usage data to include in TokenUsageEvent.
// It embeds chat.Usage and adds Cost and Model fields.
// It embeds chat.Usage and adds Cost, Model, and FinishReason fields.
type MessageUsage struct {
chat.Usage
chat.RateLimit

Cost float64
Model string
Cost float64
Model string
FinishReason chat.FinishReason `json:"finish_reason,omitempty"`
}

// NewTokenUsageEvent creates a TokenUsageEvent with the given usage data.
Expand Down
8 changes: 5 additions & 3 deletions pkg/runtime/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (r *LocalRuntime) recordAssistantMessage(
Usage: res.Usage,
Model: messageModel,
Cost: messageCost,
FinishReason: res.FinishReason,
}

addAgentMessage(sess, a, &assistantMessage, events)
Expand All @@ -449,9 +450,10 @@ func (r *LocalRuntime) recordAssistantMessage(
return nil
}
msgUsage := &MessageUsage{
Usage: *res.Usage,
Cost: messageCost,
Model: messageModel,
Usage: *res.Usage,
Cost: messageCost,
Model: messageModel,
FinishReason: res.FinishReason,
}
if res.RateLimit != nil {
msgUsage.RateLimit = *res.RateLimit
Expand Down
27 changes: 27 additions & 0 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/docker/docker-agent/pkg/agent"
"github.com/docker/docker-agent/pkg/chat"
"github.com/docker/docker-agent/pkg/config/types"
"github.com/docker/docker-agent/pkg/hooks"
"github.com/docker/docker-agent/pkg/modelsdev"
Expand Down Expand Up @@ -861,6 +862,32 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, sess *session.Sessio
}
usage := SessionUsage(sess, contextLimit)
usage.Cost = sess.TotalCost()

// Reconstruct LastMessage from the parent session's last assistant
// message so that FinishReason (and other per-message fields) are
// available on session restore. We intentionally iterate
// sess.Messages (not GetAllMessages) so the result reflects the
// parent agent's state: this event carries the parent session_id,
// and sub-agents emit their own token_usage events with their own
// session_id during live streaming.
for i := len(sess.Messages) - 1; i >= 0; i-- {
item := &sess.Messages[i]
if !item.IsMessage() || item.Message.Message.Role != chat.MessageRoleAssistant {
continue
}
msg := &item.Message.Message
lm := &MessageUsage{
Model: msg.Model,
Cost: msg.Cost,
FinishReason: msg.FinishReason,
}
if msg.Usage != nil {
lm.Usage = *msg.Usage
}
usage.LastMessage = lm
break
}

send(NewTokenUsageEvent(sess.ID, r.CurrentAgentName(), usage))
}

Expand Down
73 changes: 65 additions & 8 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ func TestSimple(t *testing.T) {
AgentChoice("root", sess.ID, "Hello"),
MessageAdded(sess.ID, msgAdded.Message, "root"),
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 3, OutputTokens: 2, ContextLength: 5, LastMessage: &MessageUsage{
Usage: chat.Usage{InputTokens: 3, OutputTokens: 2},
Model: "test/mock-model",
Usage: chat.Usage{InputTokens: 3, OutputTokens: 2},
Model: "test/mock-model",
FinishReason: chat.FinishReasonStop,
}}),
StreamStopped(sess.ID, "root"),
}
Expand Down Expand Up @@ -323,8 +324,9 @@ func TestMultipleContentChunks(t *testing.T) {
AgentChoice("root", sess.ID, "you?"),
MessageAdded(sess.ID, msgAdded.Message, "root"),
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 8, OutputTokens: 12, ContextLength: 20, LastMessage: &MessageUsage{
Usage: chat.Usage{InputTokens: 8, OutputTokens: 12},
Model: "test/mock-model",
Usage: chat.Usage{InputTokens: 8, OutputTokens: 12},
Model: "test/mock-model",
FinishReason: chat.FinishReasonStop,
}}),
StreamStopped(sess.ID, "root"),
}
Expand Down Expand Up @@ -362,8 +364,9 @@ func TestWithReasoning(t *testing.T) {
AgentChoice("root", sess.ID, "Hello, how can I help you?"),
MessageAdded(sess.ID, msgAdded.Message, "root"),
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 10, OutputTokens: 15, ContextLength: 25, LastMessage: &MessageUsage{
Usage: chat.Usage{InputTokens: 10, OutputTokens: 15},
Model: "test/mock-model",
Usage: chat.Usage{InputTokens: 10, OutputTokens: 15},
Model: "test/mock-model",
FinishReason: chat.FinishReasonStop,
}}),
StreamStopped(sess.ID, "root"),
}
Expand Down Expand Up @@ -403,8 +406,9 @@ func TestMixedContentAndReasoning(t *testing.T) {
AgentChoice("root", sess.ID, " How can I help you today?"),
MessageAdded(sess.ID, msgAdded.Message, "root"),
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 15, OutputTokens: 20, ContextLength: 35, LastMessage: &MessageUsage{
Usage: chat.Usage{InputTokens: 15, OutputTokens: 20},
Model: "test/mock-model",
Usage: chat.Usage{InputTokens: 15, OutputTokens: 20},
Model: "test/mock-model",
FinishReason: chat.FinishReasonStop,
}}),
StreamStopped(sess.ID, "root"),
}
Expand Down Expand Up @@ -963,6 +967,59 @@ func TestEmitStartupInfo_CostIncludesSubSessions(t *testing.T) {
"cost should include sub-session costs (TotalCost, not OwnCost)")
}

func TestEmitStartupInfo_LastMessageFinishReason(t *testing.T) {
// When restoring a session whose last assistant message has a
// FinishReason, the emitted TokenUsageEvent.LastMessage must carry
// that FinishReason so the UI can identify the final response.
prov := &mockProvider{id: "test/startup-model", stream: &mockStream{}}
root := agent.New("root", "agent",
agent.WithModel(prov),
agent.WithDescription("Root"),
)
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm, WithCurrentAgent("root"),
WithModelStore(mockModelStoreWithLimit{limit: 128_000}))
require.NoError(t, err)

sess := session.New()
sess.InputTokens = 500
sess.OutputTokens = 200

sess.Messages = append(sess.Messages, session.Item{
Message: &session.Message{
AgentName: "root",
Message: chat.Message{
Role: chat.MessageRoleAssistant,
Content: "final answer",
Cost: 0.02,
Model: "test/startup-model",
FinishReason: chat.FinishReasonStop,
Usage: &chat.Usage{InputTokens: 500, OutputTokens: 200},
},
},
})

events := make(chan Event, 20)
rt.EmitStartupInfo(t.Context(), sess, events)
close(events)

var tokenEvent *TokenUsageEvent
for event := range events {
if te, ok := event.(*TokenUsageEvent); ok {
tokenEvent = te
}
}

require.NotNil(t, tokenEvent, "should emit TokenUsageEvent")
require.NotNil(t, tokenEvent.Usage.LastMessage, "LastMessage should be populated on session restore")
assert.Equal(t, chat.FinishReasonStop, tokenEvent.Usage.LastMessage.FinishReason)
assert.Equal(t, "test/startup-model", tokenEvent.Usage.LastMessage.Model)
assert.InDelta(t, 0.02, tokenEvent.Usage.LastMessage.Cost, 0.0001)
assert.Equal(t, int64(500), tokenEvent.Usage.LastMessage.InputTokens)
assert.Equal(t, int64(200), tokenEvent.Usage.LastMessage.OutputTokens)
}

func TestEmitStartupInfo_NilSessionNoTokenEvent(t *testing.T) {
// When sess is nil, no TokenUsageEvent should be emitted.
prov := &mockProvider{id: "test/startup-model", stream: &mockStream{}}
Expand Down
37 changes: 37 additions & 0 deletions pkg/runtime/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type streamResult struct {
ThinkingSignature string
ThoughtSignature []byte
Stopped bool
FinishReason chat.FinishReason
Usage *chat.Usage
RateLimit *chat.RateLimit
}
Expand All @@ -44,6 +45,7 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
var toolCalls []tools.ToolCall
var messageUsage *chat.Usage
var messageRateLimit *chat.RateLimit
var providerFinishReason chat.FinishReason

toolCallIndex := make(map[string]int) // toolCallID -> index in toolCalls slice
emittedPartial := make(map[string]bool) // toolCallID -> whether we've emitted a partial event
Expand Down Expand Up @@ -109,11 +111,19 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
ThinkingSignature: thinkingSignature,
ThoughtSignature: thoughtSignature,
Stopped: true,
FinishReason: choice.FinishReason,
Usage: messageUsage,
RateLimit: messageRateLimit,
}, nil
}

// Track the provider's explicit finish reason (e.g. tool_calls) so we
// can prefer it over inference after the loop. stop/length are already
// handled by the early return above.
if choice.FinishReason != "" {
providerFinishReason = choice.FinishReason
}

// Handle tool calls
if len(choice.Delta.ToolCalls) > 0 {
// Process each tool call delta
Expand Down Expand Up @@ -191,13 +201,40 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
// If the stream completed without producing any content or tool calls, likely because of a token limit, stop to avoid breaking the request loop
// NOTE(krissetto): this can likely be removed once compaction works properly with all providers (aka dmr)
stoppedDueToNoOutput := fullContent.Len() == 0 && len(toolCalls) == 0

// Prefer the provider's explicit finish reason when available (e.g.
// tool_calls). Only fall back to inference when no explicit reason was
// received (stream ended with bare EOF):
// - tool calls present → tool_calls (model was requesting tools)
// - content but no tool calls → stop (natural completion)
// - no output at all → null (unknown; likely token limit)
finishReason := providerFinishReason
if finishReason == "" {
switch {
case len(toolCalls) > 0:
finishReason = chat.FinishReasonToolCalls
case fullContent.Len() > 0:
finishReason = chat.FinishReasonStop
default:
finishReason = chat.FinishReasonNull
}
}
// Ensure finish reason agrees with the actual stream output.
switch {
case finishReason == chat.FinishReasonToolCalls && len(toolCalls) == 0:
finishReason = chat.FinishReasonNull
case finishReason == chat.FinishReasonStop && len(toolCalls) > 0:
finishReason = chat.FinishReasonToolCalls
}

return streamResult{
Calls: toolCalls,
Content: fullContent.String(),
ReasoningContent: fullReasoningContent.String(),
ThinkingSignature: thinkingSignature,
ThoughtSignature: thoughtSignature,
Stopped: stoppedDueToNoOutput,
FinishReason: finishReason,
Usage: messageUsage,
RateLimit: messageRateLimit,
}, nil
Expand Down
Loading