diff --git a/docs/spec/index.md b/docs/spec/index.md index f3ccf05..9b4c913 100644 --- a/docs/spec/index.md +++ b/docs/spec/index.md @@ -13,6 +13,7 @@ Audience: This documentation is written for LLM consumption and should remain ex ## Specs - `docs/spec/system_elf_memory_service_v2.md` - ELF Memory Service v2.0 specification. +- `docs/spec/system_graph_memory_postgres_v1.md` - Graph memory schema and invariants for Postgres. - `docs/spec/system_version_registry.md` - Registry of versioned identifiers and schema versions. ## Authoring guidance (LLM-first) diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index 8c3d2c5..fcfcf2a 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -19,7 +19,7 @@ Multi-tenant namespace: - tenant_id, project_id, agent_id, scope, read_profile. Optional future work: -- Graph memory backend (Neo4j) is reserved and out of scope for v2.0. +- Graph memory backend is defined in Postgres in `system_graph_memory_postgres_v1.md` and kept aligned with this specification. ============================================================ 0. INVARIANTS (MUST HOLD) diff --git a/docs/spec/system_graph_memory_postgres_v1.md b/docs/spec/system_graph_memory_postgres_v1.md new file mode 100644 index 0000000..69c73ea --- /dev/null +++ b/docs/spec/system_graph_memory_postgres_v1.md @@ -0,0 +1,139 @@ +# Graph Memory Postgres v1.0 Specification + +Description: Canonical entity/fact temporal memory schema and invariants for PostgreSQL-backed graph memory. +Language: English only. + +Purpose: +- Persist entities, aliases, temporal facts, and evidence links for ELF graph memory. +- Keep one active fact per `(tenant, project, scope, subject, predicate, value-or-entity)` combination. + +Core tables: +- `graph_entities` +- `graph_entity_aliases` +- `graph_facts` +- `graph_fact_evidence` + +============================================================ +1. ENTITIES +============================================================ + +`graph_entities` columns: +- `entity_id uuid PRIMARY KEY` +- `tenant_id text NOT NULL` +- `project_id text NOT NULL` +- `canonical text NOT NULL` +- `canonical_norm text NOT NULL` +- `kind text NULL` +- `created_at timestamptz NOT NULL DEFAULT now()` +- `updated_at timestamptz NOT NULL DEFAULT now()` + +Indexes: +- `UNIQUE (tenant_id, project_id, canonical_norm)` + +Constraint and behavior: +- Canonical values are normalized by application helper before insert/upsert. +- Normalized canonical names allow idempotent upsert behavior across whitespace/case differences. + +`graph_entity_aliases` columns: +- `alias_id uuid PRIMARY KEY` +- `entity_id uuid NOT NULL REFERENCES graph_entities(entity_id) ON DELETE CASCADE` +- `alias text NOT NULL` +- `alias_norm text NOT NULL` +- `created_at timestamptz NOT NULL DEFAULT now()` + +Indexes: +- `UNIQUE (entity_id, alias_norm)` +- `INDEX (alias_norm)` + +============================================================ +2. FACTS +============================================================ + +`graph_facts` columns: +- `fact_id uuid PRIMARY KEY` +- `tenant_id text NOT NULL` +- `project_id text NOT NULL` +- `agent_id text NOT NULL` +- `scope text NOT NULL` +- `subject_entity_id uuid NOT NULL REFERENCES graph_entities(entity_id)` +- `predicate text NOT NULL` +- `object_entity_id uuid NULL REFERENCES graph_entities(entity_id)` +- `object_value text NULL` +- `valid_from timestamptz NOT NULL` +- `valid_to timestamptz NULL` +- `created_at timestamptz NOT NULL DEFAULT now()` +- `updated_at timestamptz NOT NULL DEFAULT now()` + +Checks: +- Exactly one object reference per fact: + - `(object_entity_id IS NULL AND object_value IS NOT NULL)` OR + `(object_entity_id IS NOT NULL AND object_value IS NULL)` +- `valid_to IS NULL OR valid_to > valid_from` + +Indexes: +- `(tenant_id, project_id, subject_entity_id, predicate)` +- `(tenant_id, project_id, valid_to)` +- `(tenant_id, project_id, object_entity_id) WHERE object_entity_id IS NOT NULL` +- `UNIQUE (tenant_id, project_id, scope, subject_entity_id, predicate, object_entity_id) + WHERE valid_to IS NULL AND object_entity_id IS NOT NULL` +- `UNIQUE (tenant_id, project_id, scope, subject_entity_id, predicate, object_value) + WHERE valid_to IS NULL AND object_value IS NOT NULL` + +============================================================ +3. EVIDENCE +============================================================ + +`graph_fact_evidence` columns: +- `evidence_id uuid PRIMARY KEY` +- `fact_id uuid NOT NULL REFERENCES graph_facts(fact_id) ON DELETE CASCADE` +- `note_id uuid NOT NULL REFERENCES memory_notes(note_id) ON DELETE CASCADE` +- `created_at timestamptz NOT NULL DEFAULT now()` + +Indexes: +- `UNIQUE (fact_id, note_id)` +- `(note_id)` +- `(fact_id)` + +============================================================ +4. INVARIANTS +============================================================ +- `graph_entities.canonical_norm` must be deterministic using: + - trim + - whitespace collapse to one space + - lowercase +- An active fact is defined by: `valid_from <= now AND (valid_to IS NULL OR valid_to > now)`. +- Active duplicate prevention is enforced by partial unique indexes. + +============================================================ +5. CALL EXAMPLES +============================================================ + +``` +canonical = normalize_entity_name(" Alice Example ") +=> "alice example" + +upsert_entity("tenant-a", "project-b", canonical, Some("person")) -> entity_id +upsert_entity_alias(entity_id, "A. Example") + +insert_fact_with_evidence( + "tenant-a", + "project-b", + "agent-c", + "project_shared", + subject_entity_id, + "connected_to", + Some(object_entity_id), + None, + now, + None, + &[note_id_1, note_id_2], +) + +fetch_active_facts_for_subject( + "tenant-a", + "project-b", + "project_shared", + subject_entity_id, + now, +) +``` diff --git a/packages/elf-service/src/graph.rs b/packages/elf-service/src/graph.rs new file mode 100644 index 0000000..43274c2 --- /dev/null +++ b/packages/elf-service/src/graph.rs @@ -0,0 +1,47 @@ +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::Result; +use elf_storage::graph; + +#[allow(dead_code)] +pub(crate) struct GraphUpsertFactArgs<'a> { + pub tenant_id: &'a str, + pub project_id: &'a str, + pub agent_id: &'a str, + pub scope: &'a str, + pub subject_entity_id: Uuid, + pub predicate: &'a str, + pub object_entity_id: Option, + pub object_value: Option<&'a str>, + pub valid_from: OffsetDateTime, + pub valid_to: Option, + pub evidence_note_ids: &'a [Uuid], +} + +impl crate::ElfService { + #[allow(dead_code)] + pub(crate) async fn graph_upsert_fact(&self, args: GraphUpsertFactArgs<'_>) -> Result { + let mut tx = self.db.pool.begin().await?; + let fact_id = graph::insert_fact_with_evidence( + &mut tx, + args.tenant_id, + args.project_id, + args.agent_id, + args.scope, + args.subject_entity_id, + args.predicate, + args.object_entity_id, + args.object_value, + args.valid_from, + args.valid_to, + args.evidence_note_ids, + ) + .await + .map_err(|err| crate::Error::Storage { message: err.to_string() })?; + + tx.commit().await?; + + Ok(fact_id) + } +} diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 8123599..19e6b5c 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -2,6 +2,7 @@ pub mod add_event; pub mod add_note; pub mod admin; pub mod delete; +pub mod graph; pub mod list; pub mod notes; pub mod progressive_search; diff --git a/packages/elf-service/src/search.rs b/packages/elf-service/src/search.rs index 010257b..bb83aac 100644 --- a/packages/elf-service/src/search.rs +++ b/packages/elf-service/src/search.rs @@ -1543,6 +1543,7 @@ impl ElfService { Ok(result) } + #[allow(clippy::too_many_arguments)] async fn collect_recursive_candidates( &self, args: &RecursiveRetrievalArgs<'_>, diff --git a/packages/elf-service/tests/acceptance/suite.rs b/packages/elf-service/tests/acceptance/suite.rs index 1a5f773..75590a7 100644 --- a/packages/elf-service/tests/acceptance/suite.rs +++ b/packages/elf-service/tests/acceptance/suite.rs @@ -402,6 +402,10 @@ where sqlx::query( "\ TRUNCATE + graph_entities, + graph_entity_aliases, + graph_facts, + graph_fact_evidence, memory_hits, memory_note_versions, note_field_embeddings, @@ -410,6 +414,8 @@ TRUNCATE memory_note_chunks, note_embeddings, search_trace_items, + search_trace_stage_items, + search_trace_stages, search_traces, search_trace_outbox, search_sessions, diff --git a/packages/elf-storage/src/error.rs b/packages/elf-storage/src/error.rs index d394262..c0e34f1 100644 --- a/packages/elf-storage/src/error.rs +++ b/packages/elf-storage/src/error.rs @@ -2,6 +2,8 @@ pub enum Error { #[error(transparent)] Sqlx(#[from] sqlx::Error), + #[error("Invalid argument: {0}")] + InvalidArgument(String), #[error(transparent)] Qdrant(#[from] Box), } diff --git a/packages/elf-storage/src/graph.rs b/packages/elf-storage/src/graph.rs new file mode 100644 index 0000000..e84ae6e --- /dev/null +++ b/packages/elf-storage/src/graph.rs @@ -0,0 +1,210 @@ +use sqlx::PgConnection; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Error, Result, models::GraphFact}; + +pub fn normalize_entity_name(input: &str) -> String { + input.split_whitespace().collect::>().join(" ").to_lowercase() +} + +#[allow(clippy::too_many_arguments)] +pub async fn insert_fact_with_evidence( + executor: &mut PgConnection, + tenant_id: &str, + project_id: &str, + agent_id: &str, + scope: &str, + subject_entity_id: Uuid, + predicate: &str, + object_entity_id: Option, + object_value: Option<&str>, + valid_from: OffsetDateTime, + valid_to: Option, + evidence_note_ids: &[Uuid], +) -> Result { + if evidence_note_ids.is_empty() { + return Err(Error::InvalidArgument( + "graph fact evidence is required; evidence_note_ids must not be empty".to_string(), + )); + } + + match (object_entity_id, object_value) { + (Some(_), None) | (None, Some(_)) => (), + _ => { + return Err(Error::InvalidArgument( + "graph fact must provide exactly one of object_entity_id and object_value" + .to_string(), + )); + }, + } + + let row: (Uuid,) = sqlx::query_as( + "\ +INSERT INTO graph_facts ( + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + object_entity_id, + object_value, + valid_from, + valid_to, + created_at, + updated_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, now(), now()) +RETURNING fact_id", + ) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(scope) + .bind(subject_entity_id) + .bind(predicate) + .bind(object_entity_id) + .bind(object_value) + .bind(valid_from) + .bind(valid_to) + .fetch_one(&mut *executor) + .await?; + + let fact_id = row.0; + + for note_id in evidence_note_ids { + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1, $2, $3, now()) +ON CONFLICT (fact_id, note_id) DO NOTHING", + ) + .bind(Uuid::new_v4()) + .bind(fact_id) + .bind(*note_id) + .execute(&mut *executor) + .await?; + } + + Ok(fact_id) +} + +pub async fn upsert_entity( + executor: &mut PgConnection, + tenant_id: &str, + project_id: &str, + canonical: &str, + kind: Option<&str>, +) -> Result { + let canonical_norm = normalize_entity_name(canonical); + + let row: (Uuid,) = sqlx::query_as( + "\ +INSERT INTO graph_entities ( + entity_id, + tenant_id, + project_id, + canonical, + canonical_norm, + kind, + created_at, + updated_at +) +VALUES ( + $1, $2, $3, $4, $5, $6, now(), now() +) +ON CONFLICT (tenant_id, project_id, canonical_norm) +DO UPDATE +SET + canonical = EXCLUDED.canonical, + kind = COALESCE(EXCLUDED.kind, graph_entities.kind), + updated_at = now() +RETURNING entity_id", + ) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(canonical) + .bind(&canonical_norm) + .bind(kind) + .fetch_one(executor) + .await?; + + Ok(row.0) +} + +pub async fn upsert_entity_alias( + executor: &mut PgConnection, + entity_id: Uuid, + alias: &str, +) -> Result<()> { + let alias_norm = normalize_entity_name(alias); + + sqlx::query( + "\ +INSERT INTO graph_entity_aliases ( + alias_id, + entity_id, + alias, + alias_norm, + created_at +) +VALUES ($1, $2, $3, $4, now()) +ON CONFLICT (entity_id, alias_norm) +DO UPDATE SET alias = EXCLUDED.alias", + ) + .bind(Uuid::new_v4()) + .bind(entity_id) + .bind(alias) + .bind(&alias_norm) + .execute(executor) + .await?; + + Ok(()) +} + +pub async fn fetch_active_facts_for_subject( + executor: &mut PgConnection, + tenant_id: &str, + project_id: &str, + scope: &str, + subject_entity_id: Uuid, + now: OffsetDateTime, +) -> Result> { + let rows = sqlx::query_as::<_, GraphFact>( + "\ +SELECT + fact_id, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + object_entity_id, + object_value, + valid_from, + valid_to, + created_at, + updated_at +FROM graph_facts +WHERE tenant_id = $1 + AND project_id = $2 + AND scope = $3 + AND subject_entity_id = $4 + AND valid_from <= $5 + AND (valid_to IS NULL OR valid_to > $5)", + ) + .bind(tenant_id) + .bind(project_id) + .bind(scope) + .bind(subject_entity_id) + .bind(now) + .fetch_all(executor) + .await?; + + Ok(rows) +} diff --git a/packages/elf-storage/src/lib.rs b/packages/elf-storage/src/lib.rs index 7fc8889..0d09c44 100644 --- a/packages/elf-storage/src/lib.rs +++ b/packages/elf-storage/src/lib.rs @@ -1,4 +1,5 @@ pub mod db; +pub mod graph; pub mod models; pub mod outbox; pub mod qdrant; diff --git a/packages/elf-storage/src/models.rs b/packages/elf-storage/src/models.rs index 8a1b9d2..2e1fd6a 100644 --- a/packages/elf-storage/src/models.rs +++ b/packages/elf-storage/src/models.rs @@ -75,3 +75,49 @@ pub struct TraceOutboxJob { pub payload: Value, pub attempts: i32, } + +#[derive(Debug, sqlx::FromRow)] +pub struct GraphEntity { + pub entity_id: Uuid, + pub tenant_id: String, + pub project_id: String, + pub canonical: String, + pub canonical_norm: String, + pub kind: Option, + pub created_at: OffsetDateTime, + pub updated_at: OffsetDateTime, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct GraphEntityAlias { + pub alias_id: Uuid, + pub entity_id: Uuid, + pub alias: String, + pub alias_norm: String, + pub created_at: OffsetDateTime, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct GraphFact { + pub fact_id: Uuid, + pub tenant_id: String, + pub project_id: String, + pub agent_id: String, + pub scope: String, + pub subject_entity_id: Uuid, + pub predicate: String, + pub object_entity_id: Option, + pub object_value: Option, + pub valid_from: OffsetDateTime, + pub valid_to: Option, + pub created_at: OffsetDateTime, + pub updated_at: OffsetDateTime, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct GraphFactEvidence { + pub evidence_id: Uuid, + pub fact_id: Uuid, + pub note_id: Uuid, + pub created_at: OffsetDateTime, +} diff --git a/packages/elf-storage/src/schema.rs b/packages/elf-storage/src/schema.rs index 0cb697c..6ab1dae 100644 --- a/packages/elf-storage/src/schema.rs +++ b/packages/elf-storage/src/schema.rs @@ -16,6 +16,14 @@ fn expand_includes(sql: &str) -> String { "00_extensions.sql" => out.push_str(include_str!("../../../sql/00_extensions.sql")), "tables/001_memory_notes.sql" => out.push_str(include_str!("../../../sql/tables/001_memory_notes.sql")), + "tables/016_graph_entities.sql" => + out.push_str(include_str!("../../../sql/tables/016_graph_entities.sql")), + "tables/017_graph_entity_aliases.sql" => + out.push_str(include_str!("../../../sql/tables/017_graph_entity_aliases.sql")), + "tables/018_graph_facts.sql" => + out.push_str(include_str!("../../../sql/tables/018_graph_facts.sql")), + "tables/019_graph_fact_evidence.sql" => + out.push_str(include_str!("../../../sql/tables/019_graph_fact_evidence.sql")), "tables/013_memory_note_fields.sql" => out.push_str(include_str!("../../../sql/tables/013_memory_note_fields.sql")), "tables/009_memory_note_chunks.sql" => diff --git a/packages/elf-storage/tests/graph_memory.rs b/packages/elf-storage/tests/graph_memory.rs new file mode 100644 index 0000000..a5f50c0 --- /dev/null +++ b/packages/elf-storage/tests/graph_memory.rs @@ -0,0 +1,332 @@ +use serde_json::json; +use sqlx::PgConnection; +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +use elf_config::Postgres; +use elf_storage::{ + Error as StorageError, + db::Db, + graph::{ + fetch_active_facts_for_subject, insert_fact_with_evidence, normalize_entity_name, + upsert_entity, + }, + models::{GraphFact, MemoryNote}, + queries, +}; +use elf_testkit::TestDatabase; + +#[tokio::test] +#[ignore = "Requires external Postgres. Set ELF_PG_DSN to run."] +async fn graph_entity_upsert_is_idempotent_by_normalized_canonical() { + let Some(base_dsn) = elf_testkit::env_dsn() else { + eprintln!( + "Skipping graph_entity_upsert_is_idempotent_by_normalized_canonical; set ELF_PG_DSN to run." + ); + + return; + }; + + let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); + let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; + let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); + + db.ensure_schema(4_096).await.expect("Failed to ensure schema."); + + let mut tx = db.pool.begin().await.expect("Failed to open transaction."); + + let tenant_id = "tenant-a"; + let project_id = "project-a"; + let entity_id = upsert_entity(&mut tx, tenant_id, project_id, " Alice Doe ", Some("person")) + .await + .expect("Failed to upsert canonical entity."); + let canonical_norm = normalize_entity_name("Alice doe"); + assert_eq!(canonical_norm, "alice doe"); + + let entity_again = upsert_entity(&mut tx, tenant_id, project_id, "Alice\tDoe", Some("person")) + .await + .expect("Failed to upsert canonical alias."); + + assert_eq!(entity_id, entity_again); + + tx.commit().await.expect("Failed to commit transaction."); + assert!(test_db.cleanup().await.is_ok(), "Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres. Set ELF_PG_DSN to run."] +async fn graph_fact_with_empty_evidence_is_rejected() { + let Some(base_dsn) = elf_testkit::env_dsn() else { + eprintln!("Skipping graph_fact_with_empty_evidence_is_rejected; set ELF_PG_DSN to run."); + + return; + }; + + let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); + let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; + let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); + + db.ensure_schema(4_096).await.expect("Failed to ensure schema."); + + let mut tx = db.pool.begin().await.expect("Failed to open transaction."); + let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity A", None) + .await + .expect("Failed to upsert subject."); + + let err = insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "related_to", + None, + Some("value"), + OffsetDateTime::now_utc(), + None, + &[], + ) + .await + .expect_err("Expected empty evidence to be rejected."); + + assert!(matches!(err, StorageError::InvalidArgument(_))); + + tx.rollback().await.expect("Failed to rollback transaction."); + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres. Set ELF_PG_DSN to run."] +async fn graph_fact_duplicates_with_active_window_fail_unique_constraint() { + let Some(base_dsn) = elf_testkit::env_dsn() else { + eprintln!( + "Skipping graph_fact_duplicates_with_active_window_fail_unique_constraint; set ELF_PG_DSN to run." + ); + + return; + }; + + let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); + let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; + let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); + + db.ensure_schema(4_096).await.expect("Failed to ensure schema."); + + let mut tx = db.pool.begin().await.expect("Failed to open transaction."); + let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; + + let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); + let object = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Object", None) + .await + .expect("Failed to upsert object."); + + let now = OffsetDateTime::now_utc(); + + insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "related_to", + Some(object), + None, + now, + None, + &[note_id], + ) + .await + .expect("Failed to insert graph fact."); + + let err = insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "related_to", + Some(object), + None, + now, + None, + &[note_id], + ) + .await; + + assert!(err.is_err()); + + tx.rollback().await.expect("Failed to rollback transaction."); + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres. Set ELF_PG_DSN to run."] +async fn graph_fact_rejects_invalid_valid_window() { + let Some(base_dsn) = elf_testkit::env_dsn() else { + eprintln!("Skipping graph_fact_rejects_invalid_valid_window; set ELF_PG_DSN to run."); + + return; + }; + + let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); + let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; + let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); + + db.ensure_schema(4_096).await.expect("Failed to ensure schema."); + + let mut tx = db.pool.begin().await.expect("Failed to open transaction."); + let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; + + let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); + + let now = OffsetDateTime::now_utc(); + let err = insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "expires", + None, + Some("value"), + now, + Some(now), + &[note_id], + ) + .await; + + assert!(err.is_err()); + + tx.rollback().await.expect("Failed to rollback transaction."); + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres. Set ELF_PG_DSN to run."] +async fn graph_fetch_active_facts_returns_active_window_only() { + let Some(base_dsn) = elf_testkit::env_dsn() else { + eprintln!( + "Skipping graph_fetch_active_facts_returns_active_window_only; set ELF_PG_DSN to run." + ); + + return; + }; + + let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); + let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; + let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); + + db.ensure_schema(4_096).await.expect("Failed to ensure schema."); + + let mut tx = db.pool.begin().await.expect("Failed to open transaction."); + let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; + + let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); + + let now = OffsetDateTime::now_utc(); + + let active = insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "active_fact", + None, + Some("alpha"), + now - Duration::hours(1), + None, + &[note_id], + ) + .await + .expect("Failed to insert active graph fact."); + + insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "expired_fact", + None, + Some("beta"), + now - Duration::hours(2), + Some(now - Duration::minutes(1)), + &[note_id], + ) + .await + .expect("Failed to insert expired graph fact."); + + insert_fact_with_evidence( + &mut tx, + "tenant-a", + "project-a", + "agent-a", + "scope-a", + subject, + "future_fact", + None, + Some("gamma"), + now + Duration::hours(1), + None, + &[note_id], + ) + .await + .expect("Failed to insert future graph fact."); + + let facts: Vec = + fetch_active_facts_for_subject(&mut tx, "tenant-a", "project-a", "scope-a", subject, now) + .await + .expect("Failed to fetch active graph facts."); + + assert_eq!(facts.len(), 1); + assert_eq!(facts[0].fact_id, active); + assert_eq!(facts[0].predicate, "active_fact"); + + tx.rollback().await.expect("Failed to rollback transaction."); + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +async fn insert_memory_note( + executor: &mut PgConnection, + tenant_id: &str, + project_id: &str, +) -> Uuid { + let note_id = Uuid::new_v4(); + let note = MemoryNote { + note_id, + tenant_id: tenant_id.to_string(), + project_id: project_id.to_string(), + agent_id: "agent-a".to_string(), + scope: "scope-a".to_string(), + r#type: "fact".to_string(), + key: None, + text: "graph note evidence".to_string(), + importance: 1.0, + confidence: 1.0, + status: "active".to_string(), + created_at: OffsetDateTime::now_utc(), + updated_at: OffsetDateTime::now_utc(), + expires_at: None, + embedding_version: "test:vec:1".to_string(), + source_ref: json!({}), + hit_count: 0, + last_hit_at: None, + }; + + queries::insert_note(executor, ¬e).await.expect("Failed to insert evidence note."); + + note_id +} diff --git a/sql/init.sql b/sql/init.sql index b36efa1..cc0607d 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -1,5 +1,9 @@ \ir 00_extensions.sql \ir tables/001_memory_notes.sql +\ir tables/016_graph_entities.sql +\ir tables/017_graph_entity_aliases.sql +\ir tables/018_graph_facts.sql +\ir tables/019_graph_fact_evidence.sql \ir tables/013_memory_note_fields.sql \ir tables/009_memory_note_chunks.sql \ir tables/010_note_chunk_embeddings.sql diff --git a/sql/tables/016_graph_entities.sql b/sql/tables/016_graph_entities.sql new file mode 100644 index 0000000..4785fec --- /dev/null +++ b/sql/tables/016_graph_entities.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS graph_entities ( + entity_id uuid PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + canonical text NOT NULL, + canonical_norm text NOT NULL, + kind text NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_entities_tenant_project_canonical_norm + ON graph_entities (tenant_id, project_id, canonical_norm); + diff --git a/sql/tables/017_graph_entity_aliases.sql b/sql/tables/017_graph_entity_aliases.sql new file mode 100644 index 0000000..cc38b81 --- /dev/null +++ b/sql/tables/017_graph_entity_aliases.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS graph_entity_aliases ( + alias_id uuid PRIMARY KEY, + entity_id uuid NOT NULL REFERENCES graph_entities(entity_id) ON DELETE CASCADE, + alias text NOT NULL, + alias_norm text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_entity_aliases_entity_alias_norm + ON graph_entity_aliases (entity_id, alias_norm); +CREATE INDEX IF NOT EXISTS idx_graph_entity_aliases_alias_norm + ON graph_entity_aliases (alias_norm); + diff --git a/sql/tables/018_graph_facts.sql b/sql/tables/018_graph_facts.sql new file mode 100644 index 0000000..7edf427 --- /dev/null +++ b/sql/tables/018_graph_facts.sql @@ -0,0 +1,35 @@ +CREATE TABLE IF NOT EXISTS graph_facts ( + fact_id uuid PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + scope text NOT NULL, + subject_entity_id uuid NOT NULL REFERENCES graph_entities(entity_id), + predicate text NOT NULL, + object_entity_id uuid NULL REFERENCES graph_entities(entity_id), + object_value text NULL, + valid_from timestamptz NOT NULL, + valid_to timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT graph_facts_object_exactly_one_source + CHECK ((object_entity_id IS NULL AND object_value IS NOT NULL) + OR (object_entity_id IS NOT NULL AND object_value IS NULL)), + CONSTRAINT graph_facts_valid_window + CHECK (valid_to IS NULL OR valid_to > valid_from) +); + +CREATE INDEX IF NOT EXISTS idx_graph_facts_tenant_project_subject_predicate + ON graph_facts (tenant_id, project_id, subject_entity_id, predicate); +CREATE INDEX IF NOT EXISTS idx_graph_facts_tenant_project_valid_to + ON graph_facts (tenant_id, project_id, valid_to); +CREATE INDEX IF NOT EXISTS idx_graph_facts_tenant_project_object_entity + ON graph_facts (tenant_id, project_id, object_entity_id) + WHERE object_entity_id IS NOT NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS uq_graph_facts_active_entity_object + ON graph_facts (tenant_id, project_id, scope, subject_entity_id, predicate, object_entity_id) + WHERE valid_to IS NULL AND object_entity_id IS NOT NULL; +CREATE UNIQUE INDEX IF NOT EXISTS uq_graph_facts_active_entity_value + ON graph_facts (tenant_id, project_id, scope, subject_entity_id, predicate, object_value) + WHERE valid_to IS NULL AND object_value IS NOT NULL; diff --git a/sql/tables/019_graph_fact_evidence.sql b/sql/tables/019_graph_fact_evidence.sql new file mode 100644 index 0000000..0eee36d --- /dev/null +++ b/sql/tables/019_graph_fact_evidence.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS graph_fact_evidence ( + evidence_id uuid PRIMARY KEY, + fact_id uuid NOT NULL REFERENCES graph_facts(fact_id) ON DELETE CASCADE, + note_id uuid NOT NULL REFERENCES memory_notes(note_id) ON DELETE CASCADE, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_graph_fact_evidence_fact_note + ON graph_fact_evidence (fact_id, note_id); +CREATE INDEX IF NOT EXISTS idx_graph_fact_evidence_note + ON graph_fact_evidence (note_id); +CREATE INDEX IF NOT EXISTS idx_graph_fact_evidence_fact + ON graph_fact_evidence (fact_id); +