diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index c8eae3be0c3..97a7f33498c 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1448,6 +1448,7 @@ dependencies = [ "codex-otel", "codex-protocol", "codex-rmcp-client", + "codex-rollout", "codex-shell-command", "codex-state", "codex-utils-absolute-path", @@ -1862,7 +1863,6 @@ dependencies = [ "codex-exec-server", "codex-execpolicy", "codex-features", - "codex-file-search", "codex-git", "codex-hooks", "codex-login", @@ -1870,6 +1870,7 @@ dependencies = [ "codex-otel", "codex-protocol", "codex-rmcp-client", + "codex-rollout", "codex-secrets", "codex-shell-command", "codex-shell-escalation", @@ -1929,7 +1930,6 @@ dependencies = [ "test-case", "test-log", "thiserror 2.0.18", - "time", "tokio", "tokio-tungstenite", "tokio-util", @@ -1978,6 +1978,7 @@ dependencies = [ "codex-feedback", "codex-otel", "codex-protocol", + "codex-rollout", "codex-utils-absolute-path", "codex-utils-cargo-bin", "codex-utils-cli", @@ -2447,6 +2448,29 @@ dependencies = [ "which 8.0.0", ] +[[package]] +name = "codex-rollout" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "codex-file-search", + "codex-login", + "codex-otel", + "codex-protocol", + "codex-state", + "dunce", + "pretty_assertions", + "serde", + "serde_json", + "tempfile", + "time", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "codex-secrets" version = "0.0.0" @@ -2593,6 +2617,7 @@ dependencies = [ "codex-login", "codex-otel", "codex-protocol", + "codex-rollout", "codex-shell-command", "codex-state", "codex-terminal-detection", @@ -2686,6 +2711,7 @@ dependencies = [ "codex-login", "codex-otel", "codex-protocol", + "codex-rollout", "codex-shell-command", "codex-state", "codex-terminal-detection", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 6d768d69634..1221965dc5d 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -39,6 +39,7 @@ members = [ "ollama", "process-hardening", "protocol", + "rollout", "rmcp-client", "responses-api-proxy", "stdio-to-uds", @@ -126,6 +127,7 @@ codex-ollama = { path = "ollama" } codex-otel = { path = "otel" } codex-process-hardening = { path = "process-hardening" } codex-protocol = { path = "protocol" } +codex-rollout = { path = "rollout" } codex-responses-api-proxy = { path = "responses-api-proxy" } codex-rmcp-client = { path = "rmcp-client" } codex-secrets = { path = "secrets" } diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index c6be85984d1..b708c2e9fdd 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -43,6 +43,7 @@ codex-file-search = { workspace = true } codex-chatgpt = { workspace = true } codex-login = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } codex-app-server-protocol = { workspace = true } codex-feedback = { workspace = true } codex-rmcp-client = { workspace = true } @@ -66,7 +67,6 @@ tokio = { workspace = true, features = [ "signal", ] } tokio-util = { workspace = true } -tokio-tungstenite = { workspace = true } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] } uuid = { workspace = true, features = ["serde", "v7"] } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 6b693934745..52beac2441c 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -106,7 +106,6 @@ use codex_app_server_protocol::build_turns_from_rollout_items; use codex_app_server_protocol::convert_patch_changes; use codex_core::CodexThread; use codex_core::ThreadManager; -use codex_core::find_thread_name_by_id; use codex_core::review_format::format_review_findings_block; use codex_core::review_prompts; use codex_core::sandboxing::intersect_permission_profiles; @@ -135,6 +134,7 @@ use codex_protocol::request_permissions::RequestPermissionProfile as CoreRequest use codex_protocol::request_permissions::RequestPermissionsResponse as CoreRequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputAnswer as CoreRequestUserInputAnswer; use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestUserInputResponse; +use codex_rollout::find_thread_name_by_id; use codex_shell_command::parse_command::shlex_join; use std::collections::HashMap; use std::convert::TryFrom; diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 1b02e4bb6cf..fd11be258b7 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -180,14 +180,10 @@ use codex_cloud_requirements::cloud_requirements_loader; use codex_core::AuthManager; use codex_core::CodexAuth; use codex_core::CodexThread; -use codex_core::Cursor as RolloutCursor; use codex_core::NewThread; -use codex_core::RolloutRecorder; -use codex_core::SessionMeta; use codex_core::SteerInputError; use codex_core::ThreadConfigSnapshot; use codex_core::ThreadManager; -use codex_core::ThreadSortKey as CoreThreadSortKey; use codex_core::auth::AuthMode as CoreAuthMode; use codex_core::auth::CLIENT_ID; use codex_core::auth::login_with_api_key; @@ -207,17 +203,12 @@ use codex_core::exec::ExecCapturePolicy; use codex_core::exec::ExecExpiration; use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; -use codex_core::find_archived_thread_path_by_id_str; -use codex_core::find_thread_name_by_id; -use codex_core::find_thread_names_by_ids; -use codex_core::find_thread_path_by_id_str; use codex_core::git_info::git_diff_to_remote; use codex_core::mcp::auth::discover_supported_scopes; use codex_core::mcp::auth::resolve_oauth_scopes; use codex_core::mcp::collect_mcp_snapshot; use codex_core::mcp::group_tools_by_server; use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; -use codex_core::parse_cursor; use codex_core::plugins::MarketplaceError; use codex_core::plugins::MarketplacePluginSource; use codex_core::plugins::OPENAI_CURATED_MARKETPLACE_NAME; @@ -227,13 +218,10 @@ use codex_core::plugins::PluginReadRequest; use codex_core::plugins::PluginUninstallError as CorePluginUninstallError; use codex_core::plugins::load_plugin_apps; use codex_core::plugins::load_plugin_mcp_servers; -use codex_core::read_head_for_summary; -use codex_core::read_session_meta_line; -use codex_core::rollout_date_parts; +use codex_core::rollout_config; use codex_core::sandboxing::SandboxPermissions; -use codex_core::state_db::StateDbHandle; -use codex_core::state_db::get_state_db; -use codex_core::state_db::reconcile_rollout; +use codex_core::state_runtime::StateDbHandle; +use codex_core::state_runtime::get_state_db; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode; use codex_core::windows_sandbox::WindowsSandboxSetupRequest; @@ -269,12 +257,29 @@ use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::ReviewTarget as CoreReviewTarget; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; +use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS; use codex_protocol::user_input::UserInput as CoreInputItem; use codex_rmcp_client::perform_oauth_login_return_url; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; +use codex_rollout::Cursor as RolloutCursor; +use codex_rollout::RolloutRecorder; +use codex_rollout::SESSIONS_SUBDIR; +use codex_rollout::ThreadItem as RolloutThreadItem; +use codex_rollout::ThreadSortKey as CoreThreadSortKey; +use codex_rollout::append_thread_name; +use codex_rollout::find_archived_thread_path_by_id_str; +use codex_rollout::find_thread_name_by_id; +use codex_rollout::find_thread_names_by_ids; +use codex_rollout::find_thread_path_by_id_str; +use codex_rollout::parse_cursor; +use codex_rollout::read_head_for_summary; +use codex_rollout::read_session_meta_line; +use codex_rollout::reconcile_rollout; +use codex_rollout::rollout_date_parts; use codex_state::StateRuntime; use codex_state::ThreadMetadata; use codex_state::ThreadMetadataBuilder; @@ -2352,9 +2357,7 @@ impl CodexMessageProcessor { return; } - if let Err(err) = - codex_core::append_thread_name(&self.config.codex_home, thread_id, &name).await - { + if let Err(err) = append_thread_name(&self.config.codex_home, thread_id, &name).await { self.send_internal_error(request_id, format!("failed to set thread name: {err}")) .await; return; @@ -2719,10 +2722,7 @@ impl CodexMessageProcessor { let rollout_path_display = archived_path.display().to_string(); let fallback_provider = self.config.model_provider_id.clone(); let state_db_ctx = get_state_db(&self.config).await; - let archived_folder = self - .config - .codex_home - .join(codex_core::ARCHIVED_SESSIONS_SUBDIR); + let archived_folder = self.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR); let result: Result = async { let canonical_archived_dir = tokio::fs::canonicalize(&archived_folder).await.map_err( @@ -2780,7 +2780,7 @@ impl CodexMessageProcessor { }); }; - let sessions_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); + let sessions_folder = self.config.codex_home.join(SESSIONS_SUBDIR); let dest_dir = sessions_folder.join(year).join(month).join(day); let restored_path = dest_dir.join(&file_name); tokio::fs::create_dir_all(&dest_dir) @@ -4206,7 +4206,7 @@ impl CodexMessageProcessor { } } GetConversationSummaryParams::ThreadId { conversation_id } => { - match codex_core::find_thread_path_by_id_str( + match find_thread_path_by_id_str( &self.config.codex_home, &conversation_id.to_string(), ) @@ -4291,13 +4291,15 @@ impl CodexMessageProcessor { let fallback_provider = self.config.model_provider_id.clone(); let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); + let rollout_config = rollout_config(&self.config); let state_db_ctx = get_state_db(&self.config).await; while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); let page = if archived { RolloutRecorder::list_archived_threads( - &self.config, + &rollout_config, + state_db_ctx.as_deref(), page_size, cursor_obj.as_ref(), sort_key, @@ -4314,7 +4316,8 @@ impl CodexMessageProcessor { })? } else { RolloutRecorder::list_threads( - &self.config, + &rollout_config, + state_db_ctx.as_deref(), page_size, cursor_obj.as_ref(), sort_key, @@ -5064,7 +5067,7 @@ impl CodexMessageProcessor { rollout_path: &Path, ) -> Result<(), JSONRPCErrorError> { // Verify rollout_path is under sessions dir. - let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); + let rollout_folder = self.config.codex_home.join(SESSIONS_SUBDIR); let canonical_sessions_dir = match tokio::fs::canonicalize(&rollout_folder).await { Ok(path) => path, @@ -5149,10 +5152,7 @@ impl CodexMessageProcessor { // Move the rollout file to archived. let result: std::io::Result<()> = async move { - let archive_folder = self - .config - .codex_home - .join(codex_core::ARCHIVED_SESSIONS_SUBDIR); + let archive_folder = self.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR); tokio::fs::create_dir_all(&archive_folder).await?; let archived_path = archive_folder.join(&file_name); tokio::fs::rename(&canonical_rollout_path, &archived_path).await?; @@ -7906,7 +7906,7 @@ async fn read_summary_from_state_db_context_by_thread_id( } async fn summary_from_thread_list_item( - it: codex_core::ThreadItem, + it: RolloutThreadItem, fallback_provider: &str, state_db_ctx: Option<&StateDbHandle>, ) -> Option { diff --git a/codex-rs/app-server/src/filters.rs b/codex-rs/app-server/src/filters.rs index 6d2b90dbaea..f35248b3ca1 100644 --- a/codex-rs/app-server/src/filters.rs +++ b/codex-rs/app-server/src/filters.rs @@ -1,7 +1,7 @@ use codex_app_server_protocol::ThreadSourceKind; -use codex_core::INTERACTIVE_SESSION_SOURCES; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource; +use codex_rollout::INTERACTIVE_SESSION_SOURCES; pub(crate) fn compute_source_filters( source_kinds: Option>, diff --git a/codex-rs/app-server/tests/suite/v2/thread_archive.rs b/codex-rs/app-server/tests/suite/v2/thread_archive.rs index 20fd6fd884f..0e335b71210 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_archive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_archive.rs @@ -18,8 +18,8 @@ use codex_app_server_protocol::ThreadUnarchiveResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput; -use codex_core::ARCHIVED_SESSIONS_SUBDIR; -use codex_core::find_thread_path_by_id_str; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; +use codex_rollout::find_thread_path_by_id_str; use pretty_assertions::assert_eq; use std::path::Path; use tempfile::TempDir; diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 75bffe622cd..2725bf57575 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -22,13 +22,13 @@ use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput; -use codex_core::ARCHIVED_SESSIONS_SUBDIR; use codex_protocol::ThreadId; use codex_protocol::protocol::GitInfo as CoreGitInfo; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; use core_test_support::responses; use pretty_assertions::assert_eq; use std::cmp::Reverse; diff --git a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs index 6024fe47f5d..a24af54b724 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs @@ -18,10 +18,10 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; -use codex_core::ARCHIVED_SESSIONS_SUBDIR; -use codex_core::state_db::reconcile_rollout; use codex_protocol::ThreadId; use codex_protocol::protocol::GitInfo as RolloutGitInfo; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; +use codex_rollout::reconcile_rollout; use codex_state::StateRuntime; use pretty_assertions::assert_eq; use serde_json::Value; diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index b2ae60ae35f..01bd1738ff2 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -15,8 +15,8 @@ use codex_app_server_protocol::ThreadUnarchivedNotification; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput; -use codex_core::find_archived_thread_path_by_id_str; -use codex_core::find_thread_path_by_id_str; +use codex_rollout::find_archived_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_id_str; use pretty_assertions::assert_eq; use serde_json::Value; use std::fs::FileTimes; diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index d648655b242..c0555b72401 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -39,13 +39,13 @@ codex-login = { workspace = true } codex-shell-command = { workspace = true } codex-skills = { workspace = true } codex-execpolicy = { workspace = true } -codex-file-search = { workspace = true } codex-git = { workspace = true } codex-hooks = { workspace = true } codex-network-proxy = { workspace = true } codex-otel = { workspace = true } codex-artifacts = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } codex-rmcp-client = { workspace = true } codex-state = { workspace = true } codex-terminal-detection = { workspace = true } @@ -92,12 +92,6 @@ similar = { workspace = true } tempfile = { workspace = true } test-log = { workspace = true } thiserror = { workspace = true } -time = { workspace = true, features = [ - "formatting", - "parsing", - "local-offset", - "macros", -] } tokio = { workspace = true, features = [ "io-std", "macros", diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 10cbd441b48..d0fc0949c5e 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -7,13 +7,10 @@ use crate::agent::status::is_final; use crate::codex_thread::ThreadConfigSnapshot; use crate::error::CodexErr; use crate::error::Result as CodexResult; -use crate::find_archived_thread_path_by_id_str; -use crate::find_thread_path_by_id_str; -use crate::rollout::RolloutRecorder; use crate::session_prefix::format_subagent_context_line; use crate::session_prefix::format_subagent_notification_message; use crate::shell_snapshot::ShellSnapshot; -use crate::state_db; +use crate::state_runtime; use crate::thread_manager::ThreadManagerState; use codex_features::Feature; use codex_protocol::AgentPath; @@ -27,6 +24,9 @@ use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; +use codex_rollout::RolloutRecorder; +use codex_rollout::find_archived_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_id_str; use codex_state::DirectionalThreadSpawnEdgeStatus; use std::collections::HashMap; use std::collections::VecDeque; @@ -371,7 +371,7 @@ impl AgentControl { agent_nickname: _, }) => { let (resumed_agent_nickname, resumed_agent_role) = - if let Some(state_db_ctx) = state_db::get_state_db(&config).await { + if let Some(state_db_ctx) = state_runtime::get_state_db(&config).await { match state_db_ctx.get_thread(thread_id).await { Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), Ok(None) | Err(_) => (None, None), diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 20c051f853e..86dbb40af4d 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -22,6 +22,7 @@ use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::Duration; @@ -1178,10 +1179,7 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() { .await .expect("child shutdown should succeed"); - let archived_root = harness - .config - .codex_home - .join(crate::ARCHIVED_SESSIONS_SUBDIR); + let archived_root = harness.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR); tokio::fs::create_dir_all(&archived_root) .await .expect("archived root should exist"); diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 12270bb6ddf..53a4bf0b112 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -38,7 +38,6 @@ use crate::realtime_conversation::handle_audio as handle_realtime_conversation_a use crate::realtime_conversation::handle_close as handle_realtime_conversation_close; use crate::realtime_conversation::handle_start as handle_realtime_conversation_start; use crate::realtime_conversation::handle_text as handle_realtime_conversation_text; -use crate::rollout::session_index; use crate::skills::render_skills_section; use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::handle_non_tool_response_item; @@ -270,11 +269,7 @@ use crate::protocol::TokenUsage; use crate::protocol::TokenUsageInfo; use crate::protocol::TurnDiffEvent; use crate::protocol::WarningEvent; -use crate::rollout::RolloutRecorder; -use crate::rollout::RolloutRecorderParams; -use crate::rollout::map_session_init_error; -use crate::rollout::metadata; -use crate::rollout::policy::EventPersistenceMode; +use crate::rollout_config_with_cwd; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; @@ -293,7 +288,7 @@ use crate::skills::resolve_skill_dependencies_for_turn; use crate::state::ActiveTurn; use crate::state::SessionServices; use crate::state::SessionState; -use crate::state_db; +use crate::state_runtime; use crate::tasks::GhostSnapshotTask; use crate::tasks::ReviewTask; use crate::tasks::SessionTask; @@ -334,6 +329,13 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::InitialHistory; use codex_protocol::user_input::UserInput; +use codex_rollout::EventPersistenceMode; +use codex_rollout::RolloutRecorder; +use codex_rollout::RolloutRecorderParams; +use codex_rollout::build_thread_metadata_builder; +use codex_rollout::find_thread_name_by_id; +use codex_rollout::session_init_error_message; +use codex_rollout::spawn_backfill_if_needed; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; @@ -538,9 +540,13 @@ impl Codex { }; match thread_id { Some(thread_id) => { - let state_db_ctx = state_db::get_state_db(&config).await; - state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") - .await + let state_db_ctx = state_runtime::get_state_db(&config).await; + state_runtime::get_dynamic_tools( + state_db_ctx.as_deref(), + thread_id, + "codex_spawn", + ) + .await } None => None, } @@ -617,7 +623,7 @@ impl Codex { .await .map_err(|e| { error!("Failed to create session: {e:#}"); - map_session_init_error(&e, &config.codex_home) + CodexErr::Fatal(session_init_error_message(&e, &config.codex_home)) })?; let thread_id = session.conversation_id; @@ -726,7 +732,7 @@ impl Codex { state.session_configuration.thread_config_snapshot() } - pub(crate) fn state_db(&self) -> Option { + pub(crate) fn state_db(&self) -> Option { self.session.state_db() } @@ -1455,12 +1461,13 @@ impl Session { ), }; let state_builder = match &initial_history { - InitialHistory::Resumed(resumed) => metadata::builder_from_items( + InitialHistory::Resumed(resumed) => build_thread_metadata_builder( resumed.history.as_slice(), resumed.rollout_path.as_path(), ), InitialHistory::New | InitialHistory::Forked(_) => None, }; + let rollout_config = rollout_config_with_cwd(&config, session_configuration.cwd.clone()); // Kick off independent async setup tasks in parallel to reduce startup latency. // @@ -1471,9 +1478,10 @@ impl Session { if config.ephemeral { Ok::<_, anyhow::Error>((None, None)) } else { - let state_db_ctx = state_db::init(&config).await; + let state_db_ctx = state_runtime::init(&config).await; + spawn_backfill_if_needed(state_db_ctx.clone(), &rollout_config).await; let rollout_recorder = RolloutRecorder::new( - &config, + &rollout_config, rollout_params, state_db_ctx.clone(), state_builder.clone(), @@ -1535,7 +1543,7 @@ impl Session { })?; let rollout_path = rollout_recorder .as_ref() - .map(|rec| rec.rollout_path.clone()); + .map(|rec| rec.rollout_path().to_path_buf()); let mut post_session_configured_events = Vec::::new(); @@ -1698,20 +1706,19 @@ impl Session { default_shell.shell_snapshot = rx; tx }; - let thread_name = - match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id) - .instrument(info_span!( - "session_init.thread_name_lookup", - otel.name = "session_init.thread_name_lookup", - )) - .await - { - Ok(name) => name, - Err(err) => { - warn!("Failed to read session index for thread name: {err}"); - None - } - }; + let thread_name = match find_thread_name_by_id(&config.codex_home, &conversation_id) + .instrument(info_span!( + "session_init.thread_name_lookup", + otel.name = "session_init.thread_name_lookup", + )) + .await + { + Ok(name) => name, + Err(err) => { + warn!("Failed to read session index for thread name: {err}"); + None + } + }; session_configuration.thread_name = thread_name.clone(); let state = SessionState::new(session_configuration.clone()); let managed_network_requirements_enabled = config.managed_network_requirements_enabled(); @@ -2006,7 +2013,7 @@ impl Session { self.tx_event.clone() } - pub(crate) fn state_db(&self) -> Option { + pub(crate) fn state_db(&self) -> Option { self.services.state_db.clone() } @@ -4403,8 +4410,6 @@ mod handlers { use crate::mcp::auth::compute_auth_statuses; use crate::mcp::collect_mcp_snapshot_from_manager; use crate::review_prompts::resolve_review_request; - use crate::rollout::RolloutRecorder; - use crate::rollout::session_index; use crate::tasks::CompactTask; use crate::tasks::UndoTask; use crate::tasks::UserShellCommandMode; @@ -4429,6 +4434,8 @@ mod handlers { use codex_protocol::protocol::WarningEvent; use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputResponse; + use codex_rollout::RolloutRecorder; + use codex_rollout::append_thread_name; use crate::context_manager::is_user_turn_boundary; use codex_protocol::config_types::CollaborationMode; @@ -5044,9 +5051,7 @@ mod handlers { }; let codex_home = sess.codex_home().await; - if let Err(e) = - session_index::append_thread_name(&codex_home, sess.conversation_id, &name).await - { + if let Err(e) = append_thread_name(&codex_home, sess.conversation_id, &name).await { let event = Event { id: sub_id, msg: EventMsg::Error(ErrorEvent { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index a5412eff29f..26d53a39720 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -43,9 +43,6 @@ use crate::protocol::TokenUsageInfo; use crate::protocol::TurnCompleteEvent; use crate::protocol::TurnStartedEvent; use crate::protocol::UserMessageEvent; -use crate::rollout::policy::EventPersistenceMode; -use crate::rollout::recorder::RolloutRecorder; -use crate::rollout::recorder::RolloutRecorderParams; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; @@ -74,6 +71,9 @@ use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::Submission; use codex_protocol::protocol::W3cTraceContext; +use codex_rollout::EventPersistenceMode; +use codex_rollout::RolloutRecorder; +use codex_rollout::RolloutRecorderParams; use core_test_support::tracing::install_test_tracing; use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceId; @@ -82,6 +82,7 @@ use std::time::Duration; use tokio::time::sleep; use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::rollout_config; use codex_protocol::mcp::CallToolResult as McpCallToolResult; use pretty_assertions::assert_eq; use rmcp::model::JsonObject; @@ -1998,8 +1999,9 @@ async fn wait_for_thread_rollback_failed(rx: &async_channel::Receiver) -> async fn attach_rollout_recorder(session: &Arc) -> PathBuf { let config = session.get_config().await; + let rollout_config = rollout_config(config.as_ref()); let recorder = RolloutRecorder::new( - config.as_ref(), + &rollout_config, RolloutRecorderParams::new( ThreadId::default(), None, @@ -3915,8 +3917,9 @@ async fn record_context_updates_and_set_reference_context_item_persists_baseline state.set_reference_context_item(Some(previous_context_item.clone())); } let config = session.get_config().await; + let rollout_config = rollout_config(config.as_ref()); let recorder = RolloutRecorder::new( - config.as_ref(), + &rollout_config, RolloutRecorderParams::new( ThreadId::default(), None, @@ -4012,8 +4015,9 @@ async fn record_context_updates_and_set_reference_context_item_persists_full_rei .with_model(next_model.to_string(), &session.services.models_manager) .await; let config = session.get_config().await; + let rollout_config = rollout_config(config.as_ref()); let recorder = RolloutRecorder::new( - config.as_ref(), + &rollout_config, RolloutRecorderParams::new( ThreadId::default(), None, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index e016fec977c..bb6d081c898 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -26,7 +26,7 @@ use std::path::PathBuf; use tokio::sync::Mutex; use tokio::sync::watch; -use crate::state_db::StateDbHandle; +use crate::state_runtime::StateDbHandle; #[derive(Clone, Debug)] pub struct ThreadConfigSnapshot { diff --git a/codex-rs/core/src/guardian/review_session.rs b/codex-rs/core/src/guardian/review_session.rs index 34f0b6298ec..9536979f3e1 100644 --- a/codex-rs/core/src/guardian/review_session.rs +++ b/codex-rs/core/src/guardian/review_session.rs @@ -36,8 +36,8 @@ use crate::config::Permissions; use crate::config::types::McpServerConfig; use crate::model_provider_info::ModelProviderInfo; use crate::protocol::SandboxPolicy; -use crate::rollout::recorder::RolloutRecorder; use codex_features::Feature; +use codex_rollout::RolloutRecorder; use super::GUARDIAN_REVIEW_TIMEOUT; use super::GUARDIAN_REVIEWER_NAME; diff --git a/codex-rs/core/src/rollout/truncation.rs b/codex-rs/core/src/history_truncation.rs similarity index 98% rename from codex-rs/core/src/rollout/truncation.rs rename to codex-rs/core/src/history_truncation.rs index 490bf42b97f..261a5f0e098 100644 --- a/codex-rs/core/src/rollout/truncation.rs +++ b/codex-rs/core/src/history_truncation.rs @@ -69,5 +69,5 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start( } #[cfg(test)] -#[path = "truncation_tests.rs"] +#[path = "history_truncation_tests.rs"] mod tests; diff --git a/codex-rs/core/src/rollout/truncation_tests.rs b/codex-rs/core/src/history_truncation_tests.rs similarity index 100% rename from codex-rs/core/src/rollout/truncation_tests.rs rename to codex-rs/core/src/history_truncation_tests.rs diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 29436a0d7f9..c82ab8632ac 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -43,6 +43,7 @@ mod file_watcher; mod flags; pub mod git_info; mod guardian; +mod history_truncation; mod hook_runtime; pub mod instructions; pub mod landlock; @@ -67,6 +68,7 @@ mod model_provider_info; pub mod path_utils; pub mod personality_migration; pub mod plugins; +mod rollout_config_builder; mod sandbox_tags; pub mod sandboxing; mod session_prefix; @@ -118,45 +120,21 @@ mod default_client_forwarding; pub mod default_client { pub use super::default_client_forwarding::*; } +mod function_tool; pub mod project_doc; -mod rollout; pub(crate) mod safety; pub mod seatbelt; pub mod shell; pub mod shell_snapshot; pub mod skills; pub mod spawn; -pub mod state_db; +mod state; +pub mod state_runtime; +mod tasks; mod tools; pub mod turn_diff_tracker; mod turn_metadata; mod turn_timing; -pub use rollout::ARCHIVED_SESSIONS_SUBDIR; -pub use rollout::INTERACTIVE_SESSION_SOURCES; -pub use rollout::RolloutRecorder; -pub use rollout::RolloutRecorderParams; -pub use rollout::SESSIONS_SUBDIR; -pub use rollout::SessionMeta; -pub use rollout::append_thread_name; -pub use rollout::find_archived_thread_path_by_id_str; -#[deprecated(note = "use find_thread_path_by_id_str")] -pub use rollout::find_conversation_path_by_id_str; -pub use rollout::find_thread_name_by_id; -pub use rollout::find_thread_path_by_id_str; -pub use rollout::find_thread_path_by_name_str; -pub use rollout::list::Cursor; -pub use rollout::list::ThreadItem; -pub use rollout::list::ThreadSortKey; -pub use rollout::list::ThreadsPage; -pub use rollout::list::parse_cursor; -pub use rollout::list::read_head_for_summary; -pub use rollout::list::read_session_meta_line; -pub use rollout::policy::EventPersistenceMode; -pub use rollout::rollout_date_parts; -pub use rollout::session_index::find_thread_names_by_ids; -mod function_tool; -mod state; -mod tasks; mod user_shell_command; pub mod util; pub(crate) use codex_protocol::protocol; @@ -180,6 +158,8 @@ pub use exec_policy::check_execpolicy_for_warnings; pub use exec_policy::format_exec_policy_error_with_source; pub use exec_policy::load_exec_policy; pub use file_watcher::FileWatcherEvent; +pub use rollout_config_builder::rollout_config; +pub use rollout_config_builder::rollout_config_with_cwd; pub use safety::get_platform_sandbox; pub use tools::spec::parse_tool_input_schema; pub use turn_metadata::build_turn_metadata_header; diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 3e7f0cb84f8..507612803eb 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -31,7 +31,7 @@ use crate::protocol::EventMsg; use crate::protocol::McpInvocation; use crate::protocol::McpToolCallBeginEvent; use crate::protocol::McpToolCallEndEvent; -use crate::state_db; +use crate::state_runtime; use codex_features::Feature; use codex_protocol::mcp::CallToolResult; use codex_protocol::openai_models::InputModality; @@ -281,7 +281,7 @@ async fn maybe_mark_thread_memory_mode_polluted(sess: &Session, turn_context: &T { return; } - state_db::mark_thread_memory_mode_polluted( + state_runtime::mark_thread_memory_mode_polluted( sess.services.state_db.as_deref(), sess.conversation_id, "mcp_tool_call", diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index 921bc9953ca..c827ab01a3f 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -1,5 +1,4 @@ use crate::Prompt; -use crate::RolloutRecorder; use crate::codex::Session; use crate::codex::TurnContext; use crate::config::Config; @@ -10,8 +9,6 @@ use crate::memories::metrics; use crate::memories::phase_one; use crate::memories::phase_one::PRUNE_BATCH_SIZE; use crate::memories::prompts::build_stage_one_input_message; -use crate::rollout::INTERACTIVE_SESSION_SOURCES; -use crate::rollout::policy::should_persist_response_item_for_memories; use codex_api::ResponseEvent; use codex_otel::SessionTelemetry; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; @@ -23,6 +20,9 @@ use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TokenUsage; +use codex_rollout::INTERACTIVE_SESSION_SOURCES; +use codex_rollout::RolloutRecorder; +use codex_rollout::should_persist_response_item_for_memories; use codex_secrets::redact_secrets; use futures::StreamExt; use serde::Deserialize; diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index e04a018a849..f3a6d44a786 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -689,7 +689,7 @@ mod phase2 { let rollout_path = subagent .rollout_path() .expect("consolidation thread should have a rollout path"); - crate::state_db::read_repair_rollout_path( + codex_rollout::read_repair_rollout_path( Some(harness.state_db.as_ref()), Some(thread_id), Some(/*archived_only*/ false), diff --git a/codex-rs/core/src/personality_migration.rs b/codex-rs/core/src/personality_migration.rs index f535465209c..30b6d239930 100644 --- a/codex-rs/core/src/personality_migration.rs +++ b/codex-rs/core/src/personality_migration.rs @@ -1,14 +1,7 @@ use crate::config::ConfigToml; use crate::config::edit::ConfigEditsBuilder; -use crate::rollout::ARCHIVED_SESSIONS_SUBDIR; -use crate::rollout::SESSIONS_SUBDIR; -use crate::rollout::list::ThreadListConfig; -use crate::rollout::list::ThreadListLayout; -use crate::rollout::list::ThreadSortKey; -use crate::rollout::list::get_threads_in_root; -use crate::state_db; use codex_protocol::config_types::Personality; -use codex_protocol::protocol::SessionSource; +use codex_rollout::has_recorded_sessions as rollout_has_recorded_sessions; use std::io; use std::path::Path; use tokio::fs::OpenOptions; @@ -64,57 +57,7 @@ pub async fn maybe_migrate_personality( } async fn has_recorded_sessions(codex_home: &Path, default_provider: &str) -> io::Result { - let allowed_sources: &[SessionSource] = &[]; - - if let Some(state_db_ctx) = state_db::open_if_present(codex_home, default_provider).await - && let Some(ids) = state_db::list_thread_ids_db( - Some(state_db_ctx.as_ref()), - codex_home, - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - allowed_sources, - /*model_providers*/ None, - /*archived_only*/ false, - "personality_migration", - ) - .await - && !ids.is_empty() - { - return Ok(true); - } - - let sessions = get_threads_in_root( - codex_home.join(SESSIONS_SUBDIR), - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - ThreadListConfig { - allowed_sources, - model_providers: None, - default_provider, - layout: ThreadListLayout::NestedByDate, - }, - ) - .await?; - if !sessions.items.is_empty() { - return Ok(true); - } - - let archived_sessions = get_threads_in_root( - codex_home.join(ARCHIVED_SESSIONS_SUBDIR), - /*page_size*/ 1, - /*cursor*/ None, - ThreadSortKey::CreatedAt, - ThreadListConfig { - allowed_sources, - model_providers: None, - default_provider, - layout: ThreadListLayout::Flat, - }, - ) - .await?; - Ok(!archived_sessions.items.is_empty()) + rollout_has_recorded_sessions(codex_home, default_provider).await } async fn create_marker(marker_path: &Path) -> io::Result<()> { diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index de1070ad346..da7a20da0c0 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -7,6 +7,7 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::UserMessageEvent; +use codex_rollout::SESSIONS_SUBDIR; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::io::AsyncWriteExt; diff --git a/codex-rs/core/src/rollout/mod.rs b/codex-rs/core/src/rollout/mod.rs deleted file mode 100644 index 3b8ad9b4128..00000000000 --- a/codex-rs/core/src/rollout/mod.rs +++ /dev/null @@ -1,40 +0,0 @@ -//! Rollout module: persistence and discovery of session rollout files. - -use std::sync::LazyLock; - -use codex_protocol::protocol::SessionSource; - -pub const SESSIONS_SUBDIR: &str = "sessions"; -pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions"; -pub static INTERACTIVE_SESSION_SOURCES: LazyLock> = LazyLock::new(|| { - vec![ - SessionSource::Cli, - SessionSource::VSCode, - SessionSource::Custom("atlas".to_string()), - SessionSource::Custom("chatgpt".to_string()), - ] -}); - -pub(crate) mod error; -pub mod list; -pub(crate) mod metadata; -pub(crate) mod policy; -pub mod recorder; -pub(crate) mod session_index; -pub(crate) mod truncation; - -pub use codex_protocol::protocol::SessionMeta; -pub(crate) use error::map_session_init_error; -pub use list::find_archived_thread_path_by_id_str; -pub use list::find_thread_path_by_id_str; -#[deprecated(note = "use find_thread_path_by_id_str")] -pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str; -pub use list::rollout_date_parts; -pub use recorder::RolloutRecorder; -pub use recorder::RolloutRecorderParams; -pub use session_index::append_thread_name; -pub use session_index::find_thread_name_by_id; -pub use session_index::find_thread_path_by_name_str; - -#[cfg(test)] -pub mod tests; diff --git a/codex-rs/core/src/rollout_config_builder.rs b/codex-rs/core/src/rollout_config_builder.rs new file mode 100644 index 00000000000..d16341222e9 --- /dev/null +++ b/codex-rs/core/src/rollout_config_builder.rs @@ -0,0 +1,18 @@ +use std::path::PathBuf; + +use crate::config::Config; +use codex_rollout::RolloutConfig; + +pub fn rollout_config(config: &Config) -> RolloutConfig { + rollout_config_with_cwd(config, config.cwd.clone()) +} + +pub fn rollout_config_with_cwd(config: &Config, cwd: PathBuf) -> RolloutConfig { + RolloutConfig::new( + config.codex_home.clone(), + config.sqlite_home.clone(), + cwd, + config.model_provider_id.clone(), + config.memories.generate_memories, + ) +} diff --git a/codex-rs/core/src/shell_snapshot.rs b/codex-rs/core/src/shell_snapshot.rs index 29b50cb9e80..62fd7eeea9e 100644 --- a/codex-rs/core/src/shell_snapshot.rs +++ b/codex-rs/core/src/shell_snapshot.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; -use crate::rollout::list::find_thread_path_by_id_str; use crate::shell::Shell; use crate::shell::ShellType; use crate::shell::get_shell; @@ -16,6 +15,7 @@ use anyhow::anyhow; use anyhow::bail; use codex_otel::SessionTelemetry; use codex_protocol::ThreadId; +use codex_rollout::find_thread_path_by_id_str; use tokio::fs; use tokio::process::Command; use tokio::sync::watch; diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index ceab67f1c76..4203cb09b52 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use crate::AuthManager; -use crate::RolloutRecorder; use crate::agent::AgentControl; use crate::analytics_client::AnalyticsEventsClient; use crate::client::ModelClient; @@ -14,7 +13,7 @@ use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; use crate::plugins::PluginsManager; use crate::skills::SkillsManager; -use crate::state_db::StateDbHandle; +use crate::state_runtime::StateDbHandle; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::runtimes::ExecveSessionApproval; @@ -23,6 +22,7 @@ use crate::unified_exec::UnifiedExecProcessManager; use codex_exec_server::Environment; use codex_hooks::Hooks; use codex_otel::SessionTelemetry; +use codex_rollout::RolloutRecorder; use codex_utils_absolute_path::AbsolutePathBuf; use std::path::PathBuf; use tokio::sync::Mutex; diff --git a/codex-rs/core/src/state_runtime.rs b/codex-rs/core/src/state_runtime.rs new file mode 100644 index 00000000000..ba9de9a6e4d --- /dev/null +++ b/codex-rs/core/src/state_runtime.rs @@ -0,0 +1,145 @@ +use crate::config::Config; +use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; +pub use codex_state::LogEntry; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use tracing::warn; + +/// Core-facing handle to the SQLite-backed state runtime. +pub type StateDbHandle = Arc; + +/// Initialize the state runtime for thread state persistence. To only be used +/// inside `core`. The initialization should not be done anywhere else. +pub(crate) async fn init(config: &Config) -> Option { + match codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + { + Ok(runtime) => Some(runtime), + Err(err) => { + warn!( + "failed to initialize state runtime at {}: {err}", + config.sqlite_home.display() + ); + None + } + } +} + +/// Get the DB if the feature is enabled and the DB exists. +pub async fn get_state_db(config: &Config) -> Option { + let state_path = codex_state::state_db_path(config.sqlite_home.as_path()); + if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { + return None; + } + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .ok()?; + require_backfill_complete(runtime, config.sqlite_home.as_path()).await +} + +/// Open the state runtime when the SQLite file exists, without feature gating. +/// +/// This is used for parity checks during the SQLite migration phase. +pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option { + let db_path = codex_state::state_db_path(codex_home); + if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) { + return None; + } + let runtime = + codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string()) + .await + .ok()?; + require_backfill_complete(runtime, codex_home).await +} + +async fn require_backfill_complete( + runtime: StateDbHandle, + codex_home: &Path, +) -> Option { + match runtime.get_backfill_state().await { + Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime), + Ok(state) => { + warn!( + "state db backfill not complete at {} (status: {})", + codex_home.display(), + state.status.as_str() + ); + None + } + Err(err) => { + warn!( + "failed to read backfill state at {}: {err}", + codex_home.display() + ); + None + } + } +} + +/// Look up the rollout path for a thread id using SQLite. +pub async fn find_rollout_path_by_id( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + archived_only: Option, + stage: &str, +) -> Option { + let ctx = context?; + ctx.find_rollout_path_by_id(thread_id, archived_only) + .await + .unwrap_or_else(|err| { + warn!("state db find_rollout_path_by_id failed during {stage}: {err}"); + None + }) +} + +/// Get dynamic tools for a thread id using SQLite. +pub async fn get_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + stage: &str, +) -> Option> { + let ctx = context?; + match ctx.get_dynamic_tools(thread_id).await { + Ok(tools) => tools, + Err(err) => { + warn!("state db get_dynamic_tools failed during {stage}: {err}"); + None + } + } +} + +/// Persist dynamic tools for a thread id using SQLite, if none exist yet. +pub async fn persist_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + tools: Option<&[DynamicToolSpec]>, + stage: &str, +) { + let Some(ctx) = context else { + return; + }; + if let Err(err) = ctx.persist_dynamic_tools(thread_id, tools).await { + warn!("state db persist_dynamic_tools failed during {stage}: {err}"); + } +} + +pub async fn mark_thread_memory_mode_polluted( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + stage: &str, +) { + let Some(ctx) = context else { + return; + }; + if let Err(err) = ctx.mark_thread_memory_mode_polluted(thread_id).await { + warn!("state db mark_thread_memory_mode_polluted failed during {stage}: {err}"); + } +} diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index cd77f1d5a38..efdb9b04284 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -18,7 +18,7 @@ use crate::function_tool::FunctionCallError; use crate::memories::citations::get_thread_id_from_citations; use crate::memories::citations::parse_memory_citation; use crate::parse_turn_item; -use crate::state_db; +use crate::state_runtime; use crate::tools::parallel::ToolCallRuntime; use crate::tools::router::ToolRouter; use codex_protocol::models::DeveloperInstructions; @@ -146,7 +146,7 @@ async fn maybe_mark_thread_memory_mode_polluted_from_web_search( { return; } - state_db::mark_thread_memory_mode_polluted( + state_runtime::mark_thread_memory_mode_polluted( sess.services.state_db.as_deref(), sess.conversation_id, "record_completed_response_item", @@ -168,7 +168,7 @@ async fn record_stage1_output_usage_for_completed_item( return; } - if let Some(db) = state_db::get_state_db(turn_context.config.as_ref()).await { + if let Some(db) = state_runtime::get_state_db(turn_context.config.as_ref()).await { let _ = db.record_stage1_output_usage(&thread_ids).await; } } diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index a63cf2cb947..8a4aadac2a7 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -13,6 +13,7 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::file_watcher::FileWatcher; use crate::file_watcher::FileWatcherEvent; +use crate::history_truncation as truncation; use crate::mcp::McpManager; use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig; use crate::models_manager::manager::ModelsManager; @@ -20,8 +21,6 @@ use crate::plugins::PluginsManager; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; -use crate::rollout::RolloutRecorder; -use crate::rollout::truncation; use crate::shell_snapshot::ShellSnapshot; use crate::skills::SkillsManager; use codex_protocol::ThreadId; @@ -33,6 +32,7 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::W3cTraceContext; +use codex_rollout::RolloutRecorder; use futures::StreamExt; use futures::stream::FuturesUnordered; use std::collections::HashMap; diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index 0a8dd61d9cb..83ade80cbd6 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -1,5 +1,3 @@ -use codex_core::ARCHIVED_SESSIONS_SUBDIR; -use codex_core::SESSIONS_SUBDIR; use codex_core::config::ConfigToml; use codex_core::personality_migration::PERSONALITY_MIGRATION_FILENAME; use codex_core::personality_migration::PersonalityMigrationStatus; @@ -13,6 +11,8 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::UserMessageEvent; +use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; +use codex_rollout::SESSIONS_SUBDIR; use pretty_assertions::assert_eq; use std::io; use std::path::Path; diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index a9ea2b9c8a2..7bced713fbb 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -4,16 +4,17 @@ use std::path::Path; use std::path::PathBuf; use chrono::Utc; -use codex_core::EventPersistenceMode; -use codex_core::RolloutRecorder; -use codex_core::RolloutRecorderParams; use codex_core::config::ConfigBuilder; -use codex_core::find_archived_thread_path_by_id_str; -use codex_core::find_thread_path_by_id_str; -use codex_core::find_thread_path_by_name_str; +use codex_core::rollout_config; use codex_protocol::ThreadId; use codex_protocol::models::BaseInstructions; use codex_protocol::protocol::SessionSource; +use codex_rollout::EventPersistenceMode; +use codex_rollout::RolloutRecorder; +use codex_rollout::RolloutRecorderParams; +use codex_rollout::find_archived_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_name_str; use codex_state::StateRuntime; use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; @@ -162,10 +163,11 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> .codex_home(home.path().to_path_buf()) .build() .await?; + let rollout_config = rollout_config(&config); let thread_id = ThreadId::new(); let thread_name = "named thread"; let recorder = RolloutRecorder::new( - &config, + &rollout_config, RolloutRecorderParams::new( thread_id, None, diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index 37852356065..8b78d86e0d0 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -26,6 +26,7 @@ codex-core = { workspace = true } codex-feedback = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-cli = { workspace = true } codex-utils-elapsed = { workspace = true } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index f648a63952d..57c0f523bd2 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -59,6 +59,8 @@ use codex_core::config_loader::LoaderOverrides; use codex_core::config_loader::format_config_error_with_source; use codex_core::format_exec_policy_error_with_source; use codex_core::git_info::get_git_repo_root; +use codex_core::rollout_config; +use codex_core::state_runtime::get_state_db; use codex_feedback::CodexFeedback; use codex_otel::set_parent_from_context; use codex_otel::traceparent_context_from_env; @@ -72,6 +74,10 @@ use codex_protocol::protocol::ReviewTarget; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::user_input::UserInput; +use codex_rollout::RolloutRecorder; +use codex_rollout::ThreadSortKey; +use codex_rollout::find_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_name_str; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_oss::ensure_oss_provider_ready; use codex_utils_oss::get_default_model_for_oss_provider; @@ -101,8 +107,6 @@ use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; use codex_core::default_client::set_default_client_residency_requirement; use codex_core::default_client::set_default_originator; -use codex_core::find_thread_path_by_id_str; -use codex_core::find_thread_path_by_name_str; const DEFAULT_ANALYTICS_ENABLED: bool = true; @@ -1418,11 +1422,14 @@ async fn resolve_resume_path( } else { Some(config.cwd.as_path()) }; - match codex_core::RolloutRecorder::find_latest_thread_path( - config, + let rollout_config = rollout_config(config); + let state_db_ctx = get_state_db(config).await; + match RolloutRecorder::find_latest_thread_path( + &rollout_config, + state_db_ctx.as_deref(), /*page_size*/ 1, /*cursor*/ None, - codex_core::ThreadSortKey::UpdatedAt, + ThreadSortKey::UpdatedAt, &[], Some(default_provider_filter.as_slice()), &config.model_provider_id, diff --git a/codex-rs/rollout/BUILD.bazel b/codex-rs/rollout/BUILD.bazel new file mode 100644 index 00000000000..a91a3dd5073 --- /dev/null +++ b/codex-rs/rollout/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "rollout", + crate_name = "codex_rollout", +) diff --git a/codex-rs/rollout/Cargo.toml b/codex-rs/rollout/Cargo.toml new file mode 100644 index 00000000000..a95aa2e81c3 --- /dev/null +++ b/codex-rs/rollout/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "codex-rollout" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lib] +name = "codex_rollout" +path = "src/lib.rs" + +[lints] +workspace = true + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +chrono = { workspace = true } +codex-file-search = { workspace = true } +codex-login = { workspace = true } +codex-otel = { workspace = true } +codex-protocol = { workspace = true } +codex-state = { workspace = true } +dunce = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +time = { workspace = true, features = [ + "formatting", + "parsing", + "local-offset", + "macros", +] } +tokio = { workspace = true, features = [ + "fs", + "io-util", + "macros", + "process", + "rt-multi-thread", + "sync", + "time", +] } +tracing = { workspace = true } +uuid = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tempfile = { workspace = true } diff --git a/codex-rs/core/src/rollout/error.rs b/codex-rs/rollout/src/error.rs similarity index 82% rename from codex-rs/core/src/rollout/error.rs rename to codex-rs/rollout/src/error.rs index ee48bb20295..75f66bb9ad9 100644 --- a/codex-rs/core/src/rollout/error.rs +++ b/codex-rs/rollout/src/error.rs @@ -1,10 +1,9 @@ use std::io::ErrorKind; use std::path::Path; -use crate::error::CodexErr; -use crate::rollout::SESSIONS_SUBDIR; +use crate::SESSIONS_SUBDIR; -pub(crate) fn map_session_init_error(err: &anyhow::Error, codex_home: &Path) -> CodexErr { +pub(crate) fn session_init_error_message(err: &anyhow::Error, codex_home: &Path) -> String { if let Some(mapped) = err .chain() .filter_map(|cause| cause.downcast_ref::()) @@ -13,10 +12,10 @@ pub(crate) fn map_session_init_error(err: &anyhow::Error, codex_home: &Path) -> return mapped; } - CodexErr::Fatal(format!("Failed to initialize session: {err:#}")) + format!("Failed to initialize session: {err:#}") } -fn map_rollout_io_error(io_err: &std::io::Error, codex_home: &Path) -> Option { +fn map_rollout_io_error(io_err: &std::io::Error, codex_home: &Path) -> Option { let sessions_dir = codex_home.join(SESSIONS_SUBDIR); let hint = match io_err.kind() { ErrorKind::PermissionDenied => format!( @@ -43,7 +42,5 @@ fn map_rollout_io_error(io_err: &std::io::Error, codex_home: &Path) -> Option return None, }; - Some(CodexErr::Fatal(format!( - "{hint} (underlying error: {io_err})" - ))) + Some(format!("{hint} (underlying error: {io_err})")) } diff --git a/codex-rs/rollout/src/git_info.rs b/codex-rs/rollout/src/git_info.rs new file mode 100644 index 00000000000..84d0bc18aa2 --- /dev/null +++ b/codex-rs/rollout/src/git_info.rs @@ -0,0 +1,67 @@ +use std::path::Path; + +use codex_protocol::protocol::GitInfo; +use tokio::process::Command; +use tokio::time::Duration; +use tokio::time::timeout; + +const GIT_COMMAND_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) async fn collect_git_info(cwd: &Path) -> Option { + let is_git_repo = run_git_command_with_timeout(&["rev-parse", "--git-dir"], cwd) + .await? + .status + .success(); + if !is_git_repo { + return None; + } + + let (commit_result, branch_result, url_result) = tokio::join!( + run_git_command_with_timeout(&["rev-parse", "HEAD"], cwd), + run_git_command_with_timeout(&["rev-parse", "--abbrev-ref", "HEAD"], cwd), + run_git_command_with_timeout(&["remote", "get-url", "origin"], cwd) + ); + + let mut git_info = GitInfo { + commit_hash: None, + branch: None, + repository_url: None, + }; + if let Some(output) = commit_result + && output.status.success() + && let Ok(hash) = String::from_utf8(output.stdout) + { + git_info.commit_hash = Some(hash.trim().to_string()); + } + if let Some(output) = branch_result + && output.status.success() + && let Ok(branch) = String::from_utf8(output.stdout) + { + let branch = branch.trim(); + if branch != "HEAD" { + git_info.branch = Some(branch.to_string()); + } + } + if let Some(output) = url_result + && output.status.success() + && let Ok(url) = String::from_utf8(output.stdout) + { + git_info.repository_url = Some(url.trim().to_string()); + } + + Some(git_info) +} + +async fn run_git_command_with_timeout(args: &[&str], cwd: &Path) -> Option { + let mut command = Command::new("git"); + command + .args(args) + .current_dir(cwd) + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + timeout(GIT_COMMAND_TIMEOUT, command.output()) + .await + .ok()? + .ok() +} diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs new file mode 100644 index 00000000000..54afff0451b --- /dev/null +++ b/codex-rs/rollout/src/lib.rs @@ -0,0 +1,127 @@ +//! Rollout persistence and discovery for recorded Codex sessions. + +use std::io; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::LazyLock; + +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::SessionSource; +use codex_state::ThreadMetadataBuilder; + +mod error; +mod git_info; +pub mod list; +mod metadata; +mod path_utils; +pub mod policy; +pub mod recorder; +pub mod session_index; +mod state_db; +#[cfg(test)] +mod test_support; +#[cfg(test)] +mod tests; + +pub use list::Cursor; +pub use list::ThreadItem; +pub use list::ThreadSortKey; +pub use list::ThreadsPage; +pub use list::find_archived_thread_path_by_id_str; +pub use list::find_thread_path_by_id_str; +pub use list::parse_cursor; +pub use list::read_head_for_summary; +pub use list::read_session_meta_line; +pub use list::rollout_date_parts; +pub use policy::EventPersistenceMode; +pub use policy::should_persist_response_item_for_memories; +pub use recorder::RolloutRecorder; +pub use recorder::RolloutRecorderParams; +pub use session_index::append_thread_name; +pub use session_index::find_thread_name_by_id; +pub use session_index::find_thread_names_by_ids; +pub use session_index::find_thread_path_by_name_str; +pub use state_db::StateDbHandle; +pub use state_db::read_repair_rollout_path; +pub use state_db::reconcile_rollout; + +pub const SESSIONS_SUBDIR: &str = "sessions"; +pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions"; +pub static INTERACTIVE_SESSION_SOURCES: LazyLock> = LazyLock::new(|| { + vec![ + SessionSource::Cli, + SessionSource::VSCode, + SessionSource::Custom("atlas".to_string()), + SessionSource::Custom("chatgpt".to_string()), + ] +}); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RolloutConfig { + pub codex_home: PathBuf, + pub sqlite_home: PathBuf, + pub cwd: PathBuf, + pub model_provider_id: String, + pub generate_memories: bool, +} + +impl RolloutConfig { + pub fn new( + codex_home: PathBuf, + sqlite_home: PathBuf, + cwd: PathBuf, + model_provider_id: String, + generate_memories: bool, + ) -> Self { + Self { + codex_home, + sqlite_home, + cwd, + model_provider_id, + generate_memories, + } + } +} + +pub fn build_thread_metadata_builder( + items: &[RolloutItem], + rollout_path: &Path, +) -> Option { + metadata::builder_from_items(items, rollout_path) +} + +pub async fn spawn_backfill_if_needed( + runtime: Option>, + config: &RolloutConfig, +) { + let Some(runtime) = runtime else { + return; + }; + let backfill_state = match runtime.get_backfill_state().await { + Ok(state) => state, + Err(err) => { + tracing::warn!( + "failed to read backfill state at {}: {err}", + config.codex_home.display() + ); + return; + } + }; + if backfill_state.status == codex_state::BackfillStatus::Complete { + return; + } + let runtime_for_backfill = Arc::clone(&runtime); + let config = config.clone(); + tokio::spawn(async move { + metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config).await; + }); +} + +pub async fn has_recorded_sessions(codex_home: &Path, default_provider: &str) -> io::Result { + list::has_recorded_sessions(codex_home, default_provider).await +} + +pub fn session_init_error_message(err: &anyhow::Error, codex_home: &Path) -> String { + error::session_init_error_message(err, codex_home) +} diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/rollout/src/list.rs similarity index 96% rename from codex-rs/core/src/rollout/list.rs rename to codex-rs/rollout/src/list.rs index 8a3e41006c6..02bdfdb99a5 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -15,10 +15,10 @@ use uuid::Uuid; use super::ARCHIVED_SESSIONS_SUBDIR; use super::SESSIONS_SUBDIR; -use crate::protocol::EventMsg; use crate::state_db; use codex_file_search as file_search; use codex_protocol::ThreadId; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMetaLine; @@ -674,6 +674,62 @@ pub fn parse_cursor(token: &str) -> Option { Some(Cursor::new(ts, uuid)) } +pub(crate) async fn has_recorded_sessions( + codex_home: &Path, + default_provider: &str, +) -> io::Result { + let allowed_sources: &[SessionSource] = &[]; + if let Some(state_db_ctx) = state_db::open_if_present(codex_home, default_provider).await + && let Some(ids) = state_db::list_thread_ids_db( + Some(state_db_ctx.as_ref()), + codex_home, + /*page_size*/ 1, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + allowed_sources, + /*model_providers*/ None, + /*archived_only*/ false, + "has_recorded_sessions", + ) + .await + && !ids.is_empty() + { + return Ok(true); + } + + let sessions = get_threads_in_root( + codex_home.join(SESSIONS_SUBDIR), + /*page_size*/ 1, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + ThreadListConfig { + allowed_sources, + model_providers: None, + default_provider, + layout: ThreadListLayout::NestedByDate, + }, + ) + .await?; + if !sessions.items.is_empty() { + return Ok(true); + } + + let archived = get_threads_in_root( + codex_home.join(ARCHIVED_SESSIONS_SUBDIR), + /*page_size*/ 1, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + ThreadListConfig { + allowed_sources, + model_providers: None, + default_provider, + layout: ThreadListLayout::Flat, + }, + ) + .await?; + Ok(!archived.items.is_empty()) +} + fn build_next_cursor(items: &[ThreadItem], sort_key: ThreadSortKey) -> Option { let last = items.last()?; let file_name = last.path.file_name()?.to_string_lossy(); diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/rollout/src/metadata.rs similarity index 96% rename from codex-rs/core/src/rollout/metadata.rs rename to codex-rs/rollout/src/metadata.rs index 5b032d217d8..d8eb73f3bc2 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -1,7 +1,9 @@ -use crate::config::Config; -use crate::rollout; -use crate::rollout::list::parse_timestamp_uuid_from_filename; -use crate::rollout::recorder::RolloutRecorder; +use crate::ARCHIVED_SESSIONS_SUBDIR; +use crate::RolloutConfig; +use crate::SESSIONS_SUBDIR; +use crate::list::parse_timestamp_uuid_from_filename; +use crate::list::read_session_meta_line; +use crate::recorder::RolloutRecorder; use crate::state_db::normalize_cwd_for_state_db; use chrono::DateTime; use chrono::NaiveDateTime; @@ -131,7 +133,7 @@ pub(crate) async fn extract_metadata_from_rollout( }) } -pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, config: &Config) { +pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, config: &RolloutConfig) { let metric_client = codex_otel::metrics::global(); let timer = metric_client .as_ref() @@ -190,8 +192,8 @@ pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, confi } } - let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR); - let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR); + let sessions_root = config.codex_home.join(SESSIONS_SUBDIR); + let archived_root = config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR); let mut rollout_paths: Vec = Vec::new(); for (root, archived) in [(sessions_root, false), (archived_root, true)] { if !tokio::fs::try_exists(&root).await.unwrap_or(false) { @@ -268,9 +270,7 @@ pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, confi continue; } stats.upserted = stats.upserted.saturating_add(1); - if let Ok(meta_line) = - rollout::list::read_session_meta_line(&rollout.path).await - { + if let Ok(meta_line) = read_session_meta_line(&rollout.path).await { if let Err(err) = runtime .persist_dynamic_tools( meta_line.meta.id, diff --git a/codex-rs/core/src/rollout/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs similarity index 96% rename from codex-rs/core/src/rollout/metadata_tests.rs rename to codex-rs/rollout/src/metadata_tests.rs index dacd9e67b81..dbd835b6298 100644 --- a/codex-rs/core/src/rollout/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::test_support::test_rollout_config; use chrono::DateTime; use chrono::NaiveDateTime; use chrono::Timelike; @@ -16,7 +17,6 @@ use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; use std::fs::File; use std::io::Write; -use std::path::Path; use std::path::PathBuf; use tempfile::tempdir; use uuid::Uuid; @@ -197,9 +197,7 @@ async fn backfill_sessions_resumes_from_watermark_and_marks_complete() { )) .await; - let mut config = crate::config::test_config(); - config.codex_home = codex_home.clone(); - config.model_provider_id = "test-provider".to_string(); + let config = test_rollout_config(codex_home.as_path()); backfill_sessions(runtime.as_ref(), &config).await; let first_id = ThreadId::from_string(&first_uuid.to_string()).expect("first thread id"); @@ -267,9 +265,7 @@ async fn backfill_sessions_preserves_existing_git_branch_and_fills_missing_git_f .await .expect("existing metadata upsert"); - let mut config = crate::config::test_config(); - config.codex_home = codex_home.clone(); - config.model_provider_id = "test-provider".to_string(); + let config = test_rollout_config(codex_home.as_path()); backfill_sessions(runtime.as_ref(), &config).await; let persisted = runtime @@ -304,9 +300,7 @@ async fn backfill_sessions_normalizes_cwd_before_upsert() { .await .expect("initialize runtime"); - let mut config = crate::config::test_config(); - config.codex_home = codex_home.clone(); - config.model_provider_id = "test-provider".to_string(); + let config = test_rollout_config(codex_home.as_path()); backfill_sessions(runtime.as_ref(), &config).await; let thread_id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id"); diff --git a/codex-rs/rollout/src/path_utils.rs b/codex-rs/rollout/src/path_utils.rs new file mode 100644 index 00000000000..cb7b0d39567 --- /dev/null +++ b/codex-rs/rollout/src/path_utils.rs @@ -0,0 +1,86 @@ +use std::path::Path; +use std::path::PathBuf; + +pub(crate) fn normalize_for_path_comparison(path: impl AsRef) -> std::io::Result { + let canonical = path.as_ref().canonicalize()?; + Ok(normalize_for_wsl(canonical)) +} + +fn normalize_for_wsl(path: PathBuf) -> PathBuf { + normalize_for_wsl_with_flag(path, is_wsl()) +} + +fn normalize_for_wsl_with_flag(path: PathBuf, is_wsl: bool) -> PathBuf { + if !is_wsl { + return path; + } + if !is_wsl_case_insensitive_path(&path) { + return path; + } + lower_ascii_path(path) +} + +fn is_wsl() -> bool { + cfg!(target_os = "linux") + && matches!( + std::env::var("WSL_DISTRO_NAME"), + Ok(value) if !value.is_empty() + ) +} + +fn is_wsl_case_insensitive_path(path: &Path) -> bool { + #[cfg(target_os = "linux")] + { + use std::os::unix::ffi::OsStrExt; + use std::path::Component; + + let mut components = path.components(); + let Some(Component::RootDir) = components.next() else { + return false; + }; + let Some(Component::Normal(mnt)) = components.next() else { + return false; + }; + if !ascii_eq_ignore_case(mnt.as_bytes(), b"mnt") { + return false; + } + let Some(Component::Normal(drive)) = components.next() else { + return false; + }; + let drive_bytes = drive.as_bytes(); + drive_bytes.len() == 1 && drive_bytes[0].is_ascii_alphabetic() + } + #[cfg(not(target_os = "linux"))] + { + let _ = path; + false + } +} + +#[cfg(target_os = "linux")] +fn ascii_eq_ignore_case(left: &[u8], right: &[u8]) -> bool { + left.len() == right.len() + && left + .iter() + .zip(right) + .all(|(lhs, rhs)| lhs.to_ascii_lowercase() == *rhs) +} + +#[cfg(target_os = "linux")] +fn lower_ascii_path(path: PathBuf) -> PathBuf { + use std::ffi::OsString; + use std::os::unix::ffi::OsStrExt; + use std::os::unix::ffi::OsStringExt; + + let bytes = path.as_os_str().as_bytes(); + let mut lowered = Vec::with_capacity(bytes.len()); + for byte in bytes { + lowered.push(byte.to_ascii_lowercase()); + } + PathBuf::from(OsString::from_vec(lowered)) +} + +#[cfg(not(target_os = "linux"))] +fn lower_ascii_path(path: PathBuf) -> PathBuf { + dunce::simplified(&path).to_path_buf() +} diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/rollout/src/policy.rs similarity index 97% rename from codex-rs/core/src/rollout/policy.rs rename to codex-rs/rollout/src/policy.rs index 8b1f94dbd56..c866a7e8d32 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -1,6 +1,6 @@ -use crate::protocol::EventMsg; -use crate::protocol::RolloutItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::RolloutItem; #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] pub enum EventPersistenceMode { @@ -46,7 +46,7 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool { /// Whether a `ResponseItem` should be persisted for the memories. #[inline] -pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { +pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { match item { ResponseItem::Message { role, .. } => role != "developer", ResponseItem::LocalShellCall { .. } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/rollout/src/recorder.rs similarity index 93% rename from codex-rs/core/src/rollout/recorder.rs rename to codex-rs/rollout/src/recorder.rs index 72a3e3c637a..02c4390396a 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -39,14 +39,12 @@ use super::list::parse_timestamp_uuid_from_filename; use super::metadata; use super::policy::EventPersistenceMode; use super::policy::is_persisted_response_item; -use crate::config::Config; -use crate::default_client::originator; +use crate::RolloutConfig; use crate::git_info::collect_git_info; use crate::path_utils; use crate::state_db; use crate::state_db::StateDbHandle; -use crate::truncate::TruncationPolicy; -use crate::truncate::truncate_text; +use codex_login::default_client::originator; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::ResumedHistory; @@ -145,9 +143,9 @@ fn sanitize_rollout_item_for_persistence( match item { RolloutItem::EventMsg(EventMsg::ExecCommandEnd(mut event)) => { // Persist only a bounded aggregated summary of command output. - event.aggregated_output = truncate_text( + event.aggregated_output = truncate_bytes( &event.aggregated_output, - TruncationPolicy::Bytes(PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES), + PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES, ); // Drop unnecessary fields from rollout storage since aggregated_output is all we need. event.stdout.clear(); @@ -163,7 +161,8 @@ impl RolloutRecorder { /// List threads (rollout files) under the provided Codex home directory. #[allow(clippy::too_many_arguments)] pub async fn list_threads( - config: &Config, + config: &RolloutConfig, + state_db_ctx: Option<&StateRuntime>, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, @@ -174,6 +173,7 @@ impl RolloutRecorder { ) -> std::io::Result { Self::list_threads_with_db_fallback( config, + state_db_ctx, page_size, cursor, sort_key, @@ -189,7 +189,8 @@ impl RolloutRecorder { /// List archived threads (rollout files) under the archived sessions directory. #[allow(clippy::too_many_arguments)] pub async fn list_archived_threads( - config: &Config, + config: &RolloutConfig, + state_db_ctx: Option<&StateRuntime>, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, @@ -200,6 +201,7 @@ impl RolloutRecorder { ) -> std::io::Result { Self::list_threads_with_db_fallback( config, + state_db_ctx, page_size, cursor, sort_key, @@ -214,7 +216,8 @@ impl RolloutRecorder { #[allow(clippy::too_many_arguments)] async fn list_threads_with_db_fallback( - config: &Config, + config: &RolloutConfig, + state_db_ctx: Option<&StateRuntime>, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, @@ -256,7 +259,6 @@ impl RolloutRecorder { .await? }; - let state_db_ctx = state_db::get_state_db(config).await; if state_db_ctx.is_none() { // Keep legacy behavior when SQLite is unavailable: return filesystem results // at the requested page size. @@ -266,7 +268,7 @@ impl RolloutRecorder { // Warm the DB by repairing every filesystem hit before querying SQLite. for item in &fs_page.items { state_db::read_repair_rollout_path( - state_db_ctx.as_deref(), + state_db_ctx, item.thread_id, Some(archived), item.path.as_path(), @@ -275,7 +277,7 @@ impl RolloutRecorder { } if let Some(db_page) = state_db::list_threads_db( - state_db_ctx.as_deref(), + state_db_ctx, codex_home, page_size, cursor, @@ -298,7 +300,8 @@ impl RolloutRecorder { /// Find the newest recorded thread path, optionally filtering to a matching cwd. #[allow(clippy::too_many_arguments)] pub async fn find_latest_thread_path( - config: &Config, + config: &RolloutConfig, + state_db_ctx: Option<&StateRuntime>, page_size: usize, cursor: Option<&Cursor>, sort_key: ThreadSortKey, @@ -308,12 +311,11 @@ impl RolloutRecorder { filter_cwd: Option<&Path>, ) -> std::io::Result> { let codex_home = config.codex_home.as_path(); - let state_db_ctx = state_db::get_state_db(config).await; if state_db_ctx.is_some() { let mut db_cursor = cursor.cloned(); loop { let Some(db_page) = state_db::list_threads_db( - state_db_ctx.as_deref(), + state_db_ctx, codex_home, page_size, db_cursor.as_ref(), @@ -368,7 +370,7 @@ impl RolloutRecorder { /// /// For resumed sessions, this immediately opens the existing rollout file. pub async fn new( - config: &Config, + config: &RolloutConfig, params: RolloutRecorderParams, state_db_ctx: Option, state_builder: Option, @@ -414,8 +416,7 @@ impl RolloutRecorder { } else { Some(dynamic_tools) }, - memory_mode: (!config.memories.generate_memories) - .then_some("disabled".to_string()), + memory_mode: (!config.generate_memories).then_some("disabled".to_string()), }; ( @@ -463,7 +464,7 @@ impl RolloutRecorder { state_db_ctx.clone(), state_builder, config.model_provider_id.clone(), - config.memories.generate_memories, + config.generate_memories, )); Ok(Self { @@ -482,7 +483,7 @@ impl RolloutRecorder { self.state_db.clone() } - pub(crate) async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> { + pub async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> { let mut filtered = Vec::new(); for item in items { // Note that function calls may look a bit strange if they are @@ -528,7 +529,7 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}"))) } - pub(crate) async fn load_rollout_items( + pub async fn load_rollout_items( path: &Path, ) -> std::io::Result<(Vec, Option, usize)> { trace!("Resuming rollout from {path:?}"); @@ -660,7 +661,7 @@ struct LogFileInfo { } fn precompute_log_file_info( - config: &Config, + config: &RolloutConfig, conversation_id: ThreadId, ) -> std::io::Result { // Resolve ~/.codex/sessions/YYYY/MM/DD path. @@ -705,6 +706,53 @@ fn open_log_file(path: &Path) -> std::io::Result { .open(path) } +fn truncate_bytes(text: &str, max_bytes: usize) -> String { + if text.is_empty() { + return String::new(); + } + if text.len() <= max_bytes { + return text.to_string(); + } + if max_bytes == 0 { + return format!("…{} chars truncated…", text.chars().count()); + } + + let left_budget = max_bytes / 2; + let right_budget = max_bytes - left_budget; + let tail_start_target = text.len().saturating_sub(right_budget); + let mut prefix_end = 0usize; + let mut suffix_start = text.len(); + let mut removed_chars = 0usize; + let mut suffix_started = false; + + for (idx, ch) in text.char_indices() { + let char_end = idx + ch.len_utf8(); + if char_end <= left_budget { + prefix_end = char_end; + continue; + } + if idx >= tail_start_target { + if !suffix_started { + suffix_start = idx; + suffix_started = true; + } + continue; + } + removed_chars = removed_chars.saturating_add(1); + } + + if suffix_start < prefix_end { + suffix_start = prefix_end; + } + + let marker = format!("…{removed_chars} chars truncated…"); + let mut out = String::with_capacity(text.len().min(max_bytes) + marker.len()); + out.push_str(&text[..prefix_end]); + out.push_str(&marker); + out.push_str(&text[suffix_start..]); + out +} + #[allow(clippy::too_many_arguments)] async fn rollout_writer( file: Option, diff --git a/codex-rs/core/src/rollout/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs similarity index 91% rename from codex-rs/core/src/rollout/recorder_tests.rs rename to codex-rs/rollout/src/recorder_tests.rs index 8ca7b58a6b5..293ae331d5f 100644 --- a/codex-rs/core/src/rollout/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -1,7 +1,6 @@ use super::*; -use crate::config::ConfigBuilder; +use crate::test_support::test_rollout_config; use chrono::TimeZone; -use codex_features::Feature; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::AskForApproval; @@ -54,10 +53,7 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; + let config = test_rollout_config(home.path()); let thread_id = ThreadId::new(); let recorder = RolloutRecorder::new( &config, @@ -141,14 +137,7 @@ async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result< #[tokio::test] async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let mut config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); + let config = test_rollout_config(home.path()); let state_db = StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone()) .await @@ -229,14 +218,7 @@ async fn metadata_irrelevant_events_touch_state_db_updated_at() -> std::io::Resu async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let mut config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); + let config = test_rollout_config(home.path()); let state_db = StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone()) .await @@ -280,14 +262,7 @@ async fn metadata_irrelevant_events_fall_back_to_upsert_when_thread_missing() -> #[tokio::test] async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let mut config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; - config - .features - .disable(Feature::Sqlite) - .expect("test config should allow sqlite to be disabled"); + let config = test_rollout_config(home.path()); let newest = write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(9001))?; let middle = write_session_file(home.path(), "2025-01-02T12-00-00", Uuid::from_u128(9002))?; @@ -296,6 +271,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re let default_provider = config.model_provider_id.clone(); let page1 = RolloutRecorder::list_threads( &config, + None, 1, None, ThreadSortKey::CreatedAt, @@ -311,6 +287,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re let page2 = RolloutRecorder::list_threads( &config, + None, 1, Some(&cursor), ThreadSortKey::CreatedAt, @@ -328,14 +305,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re #[tokio::test] async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let mut config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); + let config = test_rollout_config(home.path()); let uuid = Uuid::from_u128(9010); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); @@ -375,6 +345,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul let default_provider = config.model_provider_id.clone(); let page = RolloutRecorder::list_threads( &config, + Some(runtime.as_ref()), 10, None, ThreadSortKey::CreatedAt, @@ -396,14 +367,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul #[tokio::test] async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); - let mut config = ConfigBuilder::default() - .codex_home(home.path().to_path_buf()) - .build() - .await?; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); + let config = test_rollout_config(home.path()); let uuid = Uuid::from_u128(9011); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); @@ -444,6 +408,7 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul let default_provider = config.model_provider_id.clone(); let page = RolloutRecorder::list_threads( &config, + Some(runtime.as_ref()), 1, None, ThreadSortKey::CreatedAt, diff --git a/codex-rs/core/src/rollout/session_index.rs b/codex-rs/rollout/src/session_index.rs similarity index 100% rename from codex-rs/core/src/rollout/session_index.rs rename to codex-rs/rollout/src/session_index.rs diff --git a/codex-rs/core/src/rollout/session_index_tests.rs b/codex-rs/rollout/src/session_index_tests.rs similarity index 100% rename from codex-rs/core/src/rollout/session_index_tests.rs rename to codex-rs/rollout/src/session_index_tests.rs diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/rollout/src/state_db.rs similarity index 75% rename from codex-rs/core/src/state_db.rs rename to codex-rs/rollout/src/state_db.rs index 72301862046..0ba3fc74603 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -1,8 +1,7 @@ -use crate::config::Config; -use crate::path_utils::normalize_for_path_comparison; -use crate::rollout::list::Cursor; -use crate::rollout::list::ThreadSortKey; -use crate::rollout::metadata; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + use chrono::DateTime; use chrono::NaiveDateTime; use chrono::Timelike; @@ -11,75 +10,23 @@ use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; -pub use codex_state::LogEntry; use codex_state::ThreadMetadataBuilder; use serde_json::Value; -use std::path::Path; -use std::path::PathBuf; -use std::sync::Arc; use tracing::warn; use uuid::Uuid; -/// Core-facing handle to the SQLite-backed state runtime. -pub type StateDbHandle = Arc; - -/// Initialize the state runtime for thread state persistence and backfill checks. To only be used -/// inside `core`. The initialization should not be done anywhere else. -pub(crate) async fn init(config: &Config) -> Option { - let runtime = match codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - { - Ok(runtime) => runtime, - Err(err) => { - warn!( - "failed to initialize state runtime at {}: {err}", - config.sqlite_home.display() - ); - return None; - } - }; - let backfill_state = match runtime.get_backfill_state().await { - Ok(state) => state, - Err(err) => { - warn!( - "failed to read backfill state at {}: {err}", - config.codex_home.display() - ); - return None; - } - }; - if backfill_state.status != codex_state::BackfillStatus::Complete { - let runtime_for_backfill = runtime.clone(); - let config = config.clone(); - tokio::spawn(async move { - metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config).await; - }); - } - Some(runtime) -} +use crate::list::Cursor; +use crate::list::ThreadSortKey; +use crate::list::read_session_meta_line; +use crate::metadata; +use crate::path_utils::normalize_for_path_comparison; -/// Get the DB if the feature is enabled and the DB exists. -pub async fn get_state_db(config: &Config) -> Option { - let state_path = codex_state::state_db_path(config.sqlite_home.as_path()); - if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { - return None; - } - let runtime = codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .ok()?; - require_backfill_complete(runtime, config.sqlite_home.as_path()).await -} +pub type StateDbHandle = Arc; -/// Open the state runtime when the SQLite file exists, without feature gating. -/// -/// This is used for parity checks during the SQLite migration phase. -pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option { +pub(crate) async fn open_if_present( + codex_home: &Path, + default_provider: &str, +) -> Option { let db_path = codex_state::state_db_path(codex_home); if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) { return None; @@ -139,9 +86,8 @@ pub(crate) fn normalize_cwd_for_state_db(cwd: &Path) -> PathBuf { normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf()) } -/// List thread ids from SQLite for parity checks without rollout scanning. #[allow(clippy::too_many_arguments)] -pub async fn list_thread_ids_db( +pub(crate) async fn list_thread_ids_db( context: Option<&codex_state::StateRuntime>, codex_home: &Path, page_size: usize, @@ -160,12 +106,11 @@ pub async fn list_thread_ids_db( codex_home.display() ); } - let anchor = cursor_to_anchor(cursor); let allowed_sources: Vec = allowed_sources .iter() .map(|value| match serde_json::to_value(value) { - Ok(Value::String(s)) => s, + Ok(Value::String(value)) => value, Ok(other) => other.to_string(), Err(_) => String::new(), }) @@ -193,9 +138,8 @@ pub async fn list_thread_ids_db( } } -/// List thread metadata from SQLite without rollout directory traversal. #[allow(clippy::too_many_arguments)] -pub async fn list_threads_db( +pub(crate) async fn list_threads_db( context: Option<&codex_state::StateRuntime>, codex_home: &Path, page_size: usize, @@ -214,12 +158,11 @@ pub async fn list_threads_db( codex_home.display() ); } - let anchor = cursor_to_anchor(cursor); let allowed_sources: Vec = allowed_sources .iter() .map(|value| match serde_json::to_value(value) { - Ok(Value::String(s)) => s, + Ok(Value::String(value)) => value, Ok(other) => other.to_string(), Err(_) => String::new(), }) @@ -268,8 +211,7 @@ pub async fn list_threads_db( } } -/// Look up the rollout path for a thread id using SQLite. -pub async fn find_rollout_path_by_id( +pub(crate) async fn find_rollout_path_by_id( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, archived_only: Option, @@ -284,24 +226,7 @@ pub async fn find_rollout_path_by_id( }) } -/// Get dynamic tools for a thread id using SQLite. -pub async fn get_dynamic_tools( - context: Option<&codex_state::StateRuntime>, - thread_id: ThreadId, - stage: &str, -) -> Option> { - let ctx = context?; - match ctx.get_dynamic_tools(thread_id).await { - Ok(tools) => tools, - Err(err) => { - warn!("state db get_dynamic_tools failed during {stage}: {err}"); - None - } - } -} - -/// Persist dynamic tools for a thread id using SQLite, if none exist yet. -pub async fn persist_dynamic_tools( +pub(crate) async fn persist_dynamic_tools( context: Option<&codex_state::StateRuntime>, thread_id: ThreadId, tools: Option<&[DynamicToolSpec]>, @@ -315,20 +240,7 @@ pub async fn persist_dynamic_tools( } } -pub async fn mark_thread_memory_mode_polluted( - context: Option<&codex_state::StateRuntime>, - thread_id: ThreadId, - stage: &str, -) { - let Some(ctx) = context else { - return; - }; - if let Err(err) = ctx.mark_thread_memory_mode_polluted(thread_id).await { - warn!("state db mark_thread_memory_mode_polluted failed during {stage}: {err}"); - } -} - -/// Reconcile rollout items into SQLite, falling back to scanning the rollout file. +#[allow(clippy::too_many_arguments)] pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, rollout_path: &Path, @@ -355,6 +267,7 @@ pub async fn reconcile_rollout( .await; return; } + let outcome = match metadata::extract_metadata_from_rollout(rollout_path, default_provider).await { Ok(outcome) => outcome, @@ -398,7 +311,7 @@ pub async fn reconcile_rollout( ); return; } - if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await { + if let Ok(meta_line) = read_session_meta_line(rollout_path).await { persist_dynamic_tools( Some(ctx), meta_line.meta.id, @@ -414,7 +327,6 @@ pub async fn reconcile_rollout( } } -/// Repair a thread's rollout path after filesystem fallback succeeds. pub async fn read_repair_rollout_path( context: Option<&codex_state::StateRuntime>, thread_id: Option, @@ -424,9 +336,6 @@ pub async fn read_repair_rollout_path( let Some(ctx) = context else { return; }; - - // Fast path: update an existing metadata row in place, but avoid writes when - // read-repair computes no effective change. let mut saw_existing_metadata = false; if let Some(thread_id) = thread_id && let Ok(Some(metadata)) = ctx.get_thread(thread_id).await @@ -457,13 +366,10 @@ pub async fn read_repair_rollout_path( return; } } - - // Slow path: when the row is missing/unreadable (or direct upsert failed), - // rebuild metadata from rollout contents and reconcile it into SQLite. if !saw_existing_metadata { warn!("state db discrepancy during read_repair_rollout_path: upsert_needed (slow path)"); } - let default_provider = crate::rollout::list::read_session_meta_line(rollout_path) + let default_provider = read_session_meta_line(rollout_path) .await .ok() .and_then(|meta| meta.meta.model_provider) @@ -480,9 +386,8 @@ pub async fn read_repair_rollout_path( .await; } -/// Apply rollout items incrementally to SQLite. #[allow(clippy::too_many_arguments)] -pub async fn apply_rollout_items( +pub(crate) async fn apply_rollout_items( context: Option<&codex_state::StateRuntime>, rollout_path: &Path, _default_provider: &str, @@ -522,7 +427,7 @@ pub async fn apply_rollout_items( } } -pub async fn touch_thread_updated_at( +pub(crate) async fn touch_thread_updated_at( context: Option<&codex_state::StateRuntime>, thread_id: Option, updated_at: DateTime, diff --git a/codex-rs/core/src/state_db_tests.rs b/codex-rs/rollout/src/state_db_tests.rs similarity index 84% rename from codex-rs/core/src/state_db_tests.rs rename to codex-rs/rollout/src/state_db_tests.rs index adf08197d64..aad40dcadb6 100644 --- a/codex-rs/core/src/state_db_tests.rs +++ b/codex-rs/rollout/src/state_db_tests.rs @@ -1,6 +1,11 @@ use super::*; -use crate::rollout::list::parse_cursor; +use crate::parse_cursor; +use chrono::DateTime; +use chrono::NaiveDateTime; +use chrono::Timelike; +use chrono::Utc; use pretty_assertions::assert_eq; +use uuid::Uuid; #[test] fn cursor_to_anchor_normalizes_timestamp_format() { diff --git a/codex-rs/rollout/src/test_support.rs b/codex-rs/rollout/src/test_support.rs new file mode 100644 index 00000000000..235dcf762b5 --- /dev/null +++ b/codex-rs/rollout/src/test_support.rs @@ -0,0 +1,13 @@ +use std::path::Path; + +use crate::RolloutConfig; + +pub(crate) fn test_rollout_config(codex_home: &Path) -> RolloutConfig { + RolloutConfig::new( + codex_home.to_path_buf(), + codex_home.to_path_buf(), + codex_home.to_path_buf(), + "test-provider".to_string(), + false, + ) +} diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/rollout/src/tests.rs similarity index 98% rename from codex-rs/core/src/rollout/tests.rs rename to codex-rs/rollout/src/tests.rs index 44e536e50ef..da30ddba55a 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -17,14 +17,14 @@ use time::format_description::FormatItem; use time::macros::format_description; use uuid::Uuid; -use crate::rollout::INTERACTIVE_SESSION_SOURCES; -use crate::rollout::list::Cursor; -use crate::rollout::list::ThreadItem; -use crate::rollout::list::ThreadSortKey; -use crate::rollout::list::ThreadsPage; -use crate::rollout::list::get_threads; -use crate::rollout::list::read_head_for_summary; -use crate::rollout::rollout_date_parts; +use crate::INTERACTIVE_SESSION_SOURCES; +use crate::list::Cursor; +use crate::list::ThreadItem; +use crate::list::ThreadSortKey; +use crate::list::ThreadsPage; +use crate::list::get_threads; +use crate::list::read_head_for_summary; +use crate::rollout_date_parts; use anyhow::Result; use codex_protocol::ThreadId; use codex_protocol::models::ContentItem; @@ -229,7 +229,7 @@ async fn find_thread_path_falls_back_when_db_path_is_stale() { )); insert_state_db_thread(home, thread_id, stale_db_path.as_path(), false).await; - let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string()) + let found = crate::find_thread_path_by_id_str(home, &uuid.to_string()) .await .expect("lookup should succeed"); assert_eq!(found, Some(fs_rollout_path.clone())); @@ -255,7 +255,7 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() { .await .expect("backfill should be complete"); - let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string()) + let found = crate::find_thread_path_by_id_str(home, &uuid.to_string()) .await .expect("lookup should succeed"); assert_eq!(found, Some(fs_rollout_path.clone())); diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 8013b1325ed..52495409e25 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -43,6 +43,7 @@ codex-file-search = { workspace = true } codex-login = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } codex-shell-command = { workspace = true } codex-state = { workspace = true } codex-terminal-detection = { workspace = true } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index e2480c4ec0b..e8ca904b9cf 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -67,7 +67,6 @@ use codex_core::config::types::ApprovalsReviewer; use codex_core::config::types::Notifications; use codex_core::config::types::WindowsSandboxModeToml; use codex_core::config_loader::ConfigLayerStackOrdering; -use codex_core::find_thread_name_by_id; use codex_core::git_info::current_branch_name; use codex_core::git_info::get_git_repo_root; use codex_core::git_info::local_git_branches; @@ -155,6 +154,7 @@ use codex_protocol::request_permissions::RequestPermissionsEvent; use codex_protocol::request_user_input::RequestUserInputEvent; use codex_protocol::user_input::TextElement; use codex_protocol::user_input::UserInput; +use codex_rollout::find_thread_name_by_id; use codex_terminal_detection::TerminalName; use codex_terminal_detection::terminal_info; use codex_utils_sleep_inhibitor::SleepInhibitor; diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 54162006911..37d53b7d5e5 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -10,9 +10,6 @@ pub use app::ExitReason; use codex_cloud_requirements::cloud_requirements_loader; use codex_core::AuthManager; use codex_core::CodexAuth; -use codex_core::INTERACTIVE_SESSION_SOURCES; -use codex_core::RolloutRecorder; -use codex_core::ThreadSortKey; use codex_core::auth::AuthConfig; use codex_core::auth::AuthMode; use codex_core::auth::enforce_login_restrictions; @@ -28,12 +25,10 @@ use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::LoaderOverrides; use codex_core::config_loader::format_config_error_with_source; use codex_core::default_client::set_default_client_residency_requirement; -use codex_core::find_thread_path_by_id_str; -use codex_core::find_thread_path_by_name_str; use codex_core::format_exec_policy_error_with_source; use codex_core::path_utils; -use codex_core::read_session_meta_line; -use codex_core::state_db::get_state_db; +use codex_core::rollout_config; +use codex_core::state_runtime::get_state_db; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_protocol::ThreadId; use codex_protocol::config_types::AltScreenMode; @@ -42,6 +37,12 @@ use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; +use codex_rollout::INTERACTIVE_SESSION_SOURCES; +use codex_rollout::RolloutRecorder; +use codex_rollout::ThreadSortKey; +use codex_rollout::find_thread_path_by_id_str; +use codex_rollout::find_thread_path_by_name_str; +use codex_rollout::read_session_meta_line; use codex_state::log_db; use codex_terminal_detection::Multiplexer; use codex_terminal_detection::terminal_info; @@ -554,7 +555,7 @@ pub async fn run_main( let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); - let log_db_layer = codex_core::state_db::get_state_db(&config) + let log_db_layer = get_state_db(&config) .await .map(|db| log_db::start(db).with_filter(env_filter())); @@ -742,8 +743,11 @@ async fn run_ratatui_app( } } else if cli.fork_last { let provider_filter = vec![config.model_provider_id.clone()]; + let rollout_config = rollout_config(&config); + let state_db_ctx = get_state_db(&config).await; match RolloutRecorder::list_threads( - &config, + &rollout_config, + state_db_ctx.as_deref(), /*page_size*/ 1, /*cursor*/ None, ThreadSortKey::UpdatedAt, @@ -842,8 +846,11 @@ async fn run_ratatui_app( } else { Some(config.cwd.as_path()) }; + let rollout_config = rollout_config(&config); + let state_db_ctx = get_state_db(&config).await; match RolloutRecorder::find_latest_thread_path( - &config, + &rollout_config, + state_db_ctx.as_deref(), /*page_size*/ 1, /*cursor*/ None, ThreadSortKey::UpdatedAt, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 1a74fcd83a4..23bd577a19a 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -12,16 +12,18 @@ use crate::tui::Tui; use crate::tui::TuiEvent; use chrono::DateTime; use chrono::Utc; -use codex_core::Cursor; -use codex_core::INTERACTIVE_SESSION_SOURCES; -use codex_core::RolloutRecorder; -use codex_core::ThreadItem; -use codex_core::ThreadSortKey; -use codex_core::ThreadsPage; use codex_core::config::Config; -use codex_core::find_thread_names_by_ids; use codex_core::path_utils; +use codex_core::rollout_config; +use codex_core::state_runtime::get_state_db; use codex_protocol::ThreadId; +use codex_rollout::Cursor; +use codex_rollout::INTERACTIVE_SESSION_SOURCES; +use codex_rollout::RolloutRecorder; +use codex_rollout::ThreadItem; +use codex_rollout::ThreadSortKey; +use codex_rollout::ThreadsPage; +use codex_rollout::find_thread_names_by_ids; use color_eyre::eyre::Result; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -159,8 +161,11 @@ async fn run_session_picker( let config = config.clone(); tokio::spawn(async move { let provider_filter = vec![request.default_provider.clone()]; + let rollout_config = rollout_config(&config); + let state_db_ctx = get_state_db(&config).await; let page = RolloutRecorder::list_threads( - &config, + &rollout_config, + state_db_ctx.as_deref(), PAGE_SIZE, request.cursor.as_ref(), request.sort_key, diff --git a/codex-rs/tui_app_server/Cargo.toml b/codex-rs/tui_app_server/Cargo.toml index 88660420517..cd7fdb5fad5 100644 --- a/codex-rs/tui_app_server/Cargo.toml +++ b/codex-rs/tui_app_server/Cargo.toml @@ -47,6 +47,7 @@ codex-file-search = { workspace = true } codex-login = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } +codex-rollout = { workspace = true } codex-shell-command = { workspace = true } codex-state = { workspace = true } codex-terminal-detection = { workspace = true } diff --git a/codex-rs/tui_app_server/src/chatwidget.rs b/codex-rs/tui_app_server/src/chatwidget.rs index 5e0cff03c75..6faba9f19e2 100644 --- a/codex-rs/tui_app_server/src/chatwidget.rs +++ b/codex-rs/tui_app_server/src/chatwidget.rs @@ -87,7 +87,6 @@ use codex_core::config::types::ApprovalsReviewer; use codex_core::config::types::Notifications; use codex_core::config::types::WindowsSandboxModeToml; use codex_core::config_loader::ConfigLayerStackOrdering; -use codex_core::find_thread_name_by_id; use codex_core::git_info::current_branch_name; use codex_core::git_info::get_git_repo_root; use codex_core::git_info::local_git_branches; @@ -199,6 +198,7 @@ use codex_protocol::request_user_input::RequestUserInputEvent; use codex_protocol::request_user_input::RequestUserInputQuestionOption; use codex_protocol::user_input::TextElement; use codex_protocol::user_input::UserInput; +use codex_rollout::find_thread_name_by_id; use codex_terminal_detection::TerminalName; use codex_terminal_detection::terminal_info; use codex_utils_sleep_inhibitor::SleepInhibitor; diff --git a/codex-rs/tui_app_server/src/lib.rs b/codex-rs/tui_app_server/src/lib.rs index 17e309d5fbd..0ee014389ad 100644 --- a/codex-rs/tui_app_server/src/lib.rs +++ b/codex-rs/tui_app_server/src/lib.rs @@ -37,8 +37,7 @@ use codex_core::config_loader::format_config_error_with_source; use codex_core::default_client::set_default_client_residency_requirement; use codex_core::format_exec_policy_error_with_source; use codex_core::path_utils; -use codex_core::read_session_meta_line; -use codex_core::state_db::get_state_db; +use codex_core::state_runtime::get_state_db; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_protocol::ThreadId; use codex_protocol::config_types::AltScreenMode; @@ -48,6 +47,7 @@ use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::TurnContextItem; +use codex_rollout::read_session_meta_line; use codex_state::log_db; use codex_terminal_detection::Multiplexer; use codex_terminal_detection::terminal_info; @@ -878,7 +878,7 @@ pub async fn run_main( let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); - let log_db_layer = codex_core::state_db::get_state_db(&config) + let log_db_layer = get_state_db(&config) .await .map(|db| log_db::start(db).with_filter(env_filter())); diff --git a/codex-rs/tui_app_server/src/resume_picker.rs b/codex-rs/tui_app_server/src/resume_picker.rs index debb887aaf1..08b801321ee 100644 --- a/codex-rs/tui_app_server/src/resume_picker.rs +++ b/codex-rs/tui_app_server/src/resume_picker.rs @@ -17,16 +17,18 @@ use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; -use codex_core::Cursor; -use codex_core::INTERACTIVE_SESSION_SOURCES; -use codex_core::RolloutRecorder; -use codex_core::ThreadItem; -use codex_core::ThreadSortKey; -use codex_core::ThreadsPage; use codex_core::config::Config; -use codex_core::find_thread_names_by_ids; use codex_core::path_utils; +use codex_core::rollout_config; +use codex_core::state_runtime::get_state_db; use codex_protocol::ThreadId; +use codex_rollout::Cursor; +use codex_rollout::INTERACTIVE_SESSION_SOURCES; +use codex_rollout::RolloutRecorder; +use codex_rollout::ThreadItem; +use codex_rollout::ThreadSortKey; +use codex_rollout::ThreadsPage; +use codex_rollout::find_thread_names_by_ids; use color_eyre::eyre::Result; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -317,8 +319,11 @@ fn spawn_rollout_page_loader( Some(PageCursor::AppServer(_)) => None, None => None, }; + let rollout_config = rollout_config(&config); + let state_db_ctx = get_state_db(&config).await; let page = RolloutRecorder::list_threads( - &config, + &rollout_config, + state_db_ctx.as_deref(), PAGE_SIZE, cursor, request.sort_key,