diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index fcfcf2a..4cb7032 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -926,15 +926,55 @@ Body: "importance": 0.0, "confidence": 0.0, "ttl_days": 180, + "structured": { + "summary": "string|null", + "facts": "string[]|null", + "concepts": "string[]|null", + "entities": [ + { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + } + ]|null, + "relations": [ + { + "subject": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }, + "predicate": "string", + "object": { + "entity": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }|null, + "value": "string|null" + }, + "valid_from": "ISO8601 datetime|null", + "valid_to": "ISO8601 datetime|null" + } + ]|null + }|null, "source_ref": { ... } } ] } +Notes: +- Exactly one of object.entity and object.value must be non-null. + Response: { "results": [ - { "note_id": "uuid|null", "op": "ADD|UPDATE|NONE|DELETE|REJECTED", "reason_code": "optional" } + { + "note_id": "uuid|null", + "op": "ADD|UPDATE|NONE|DELETE|REJECTED", + "reason_code": "optional", + "field_path": "optional" + } ] } @@ -959,7 +999,13 @@ Response: { "extracted": { ...extractor output... }, "results": [ - { "note_id": "uuid|null", "op": "ADD|UPDATE|NONE|DELETE|REJECTED", "reason_code": "optional", "reason": "optional" } + { + "note_id": "uuid|null", + "op": "ADD|UPDATE|NONE|DELETE|REJECTED", + "reason_code": "optional", + "reason": "optional", + "field_path": "optional" + } ] } @@ -1187,6 +1233,38 @@ Schema: "importance": 0.0, "confidence": 0.0, "ttl_days": number|null, + "structured": { + "summary": "string|null", + "facts": "string[]|null", + "concepts": "string[]|null", + "entities": [ + { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + } + ]|null, + "relations": [ + { + "subject": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }, + "predicate": "string", + "object": { + "entity": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }|null, + "value": "string|null" + }, + "valid_from": "ISO8601 datetime|null", + "valid_to": "ISO8601 datetime|null" + } + ]|null + }|null, "scope_suggestion": "agent_private|project_shared|org_shared|null", "evidence": [ { "message_index": number, "quote": "string" } @@ -1196,6 +1274,9 @@ Schema: ] } +Notes: +- Exactly one of object.entity and object.value must be non-null. + Hard rules: - notes.length <= MAX_NOTES - text must contain no CJK diff --git a/docs/spec/system_graph_memory_postgres_v1.md b/docs/spec/system_graph_memory_postgres_v1.md index 69c73ea..a3dcc5f 100644 --- a/docs/spec/system_graph_memory_postgres_v1.md +++ b/docs/spec/system_graph_memory_postgres_v1.md @@ -103,6 +103,7 @@ Indexes: - lowercase - An active fact is defined by: `valid_from <= now AND (valid_to IS NULL OR valid_to > now)`. - Active duplicate prevention is enforced by partial unique indexes. +- When ingestion reintroduces a note equivalent to an existing active fact, the system reuses the existing fact row and appends additional evidence rows for the new note instead of creating another active duplicate fact row. ============================================================ 5. CALL EXAMPLES diff --git a/packages/elf-service/src/add_event.rs b/packages/elf-service/src/add_event.rs index d9bc088..17fad24 100644 --- a/packages/elf-service/src/add_event.rs +++ b/packages/elf-service/src/add_event.rs @@ -38,6 +38,7 @@ pub struct AddEventResult { pub op: NoteOp, pub reason_code: Option, pub reason: Option, + pub field_path: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -206,7 +207,16 @@ impl ElfService { let (note_id, op) = match decision { UpdateDecision::Add { note_id } => (Some(note_id), NoteOp::Add), UpdateDecision::Update { note_id } => (Some(note_id), NoteOp::Update), - UpdateDecision::None { note_id } => (Some(note_id), NoteOp::None), + UpdateDecision::None { note_id } => { + let op = if structured.as_ref().is_some_and(StructuredFields::has_graph_fields) + { + NoteOp::Update + } else { + NoteOp::None + }; + + (Some(note_id), op) + }, }; return Ok(AddEventResult { @@ -214,6 +224,7 @@ impl ElfService { op, reason_code: None, reason: note.reason.clone(), + field_path: None, }); } @@ -317,11 +328,28 @@ impl ElfService { upsert_structured_fields_tx(tx, args.structured, memory_note.note_id, args.now).await?; + if let Some(structured) = args.structured + && structured.has_graph_fields() + { + crate::graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + args.req.project_id.as_str(), + args.req.agent_id.as_str(), + args.scope, + memory_note.note_id, + structured, + args.now, + ) + .await?; + } + Ok(AddEventResult { note_id: Some(note_id), op: NoteOp::Add, reason_code: None, reason: args.reason.cloned(), + field_path: None, }) } @@ -373,11 +401,28 @@ impl ElfService { upsert_structured_fields_tx(tx, args.structured, existing.note_id, args.now).await?; + if let Some(structured) = args.structured + && structured.has_graph_fields() + { + crate::graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + args.req.project_id.as_str(), + args.req.agent_id.as_str(), + args.scope, + existing.note_id, + structured, + args.now, + ) + .await?; + } + Ok(AddEventResult { note_id: Some(note_id), op: NoteOp::Update, reason_code: None, reason: args.reason.cloned(), + field_path: None, }) } @@ -387,6 +432,8 @@ impl ElfService { args: PersistExtractedNoteArgs<'_>, note_id: Uuid, ) -> Result { + let mut did_update = false; + if let Some(structured) = args.structured && !structured.is_effectively_empty() { @@ -397,11 +444,33 @@ impl ElfService { crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", args.embed_version, args.now) .await?; + did_update = true; + } + if let Some(structured) = args.structured + && structured.has_graph_fields() + { + crate::graph_ingestion::persist_graph_fields_tx( + tx, + args.req.tenant_id.as_str(), + args.req.project_id.as_str(), + args.req.agent_id.as_str(), + args.scope, + note_id, + structured, + args.now, + ) + .await?; + + did_update = true; + } + + if did_update { return Ok(AddEventResult { note_id: Some(note_id), op: NoteOp::Update, reason_code: None, reason: args.reason.cloned(), + field_path: None, }); } @@ -410,6 +479,7 @@ impl ElfService { op: NoteOp::None, reason_code: None, reason: args.reason.cloned(), + field_path: None, }) } } @@ -459,6 +529,7 @@ fn reject_extracted_note_if_evidence_invalid( op: NoteOp::Rejected, reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), reason: reason.cloned(), + field_path: None, }); } @@ -469,6 +540,7 @@ fn reject_extracted_note_if_evidence_invalid( op: NoteOp::Rejected, reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), reason: reason.cloned(), + field_path: None, }); } if !evidence::evidence_matches(message_texts, quote.message_index, quote.quote.as_str()) { @@ -477,6 +549,7 @@ fn reject_extracted_note_if_evidence_invalid( op: NoteOp::Rejected, reason_code: Some(REJECT_EVIDENCE_MISMATCH.to_string()), reason: reason.cloned(), + field_path: None, }); } } @@ -507,11 +580,14 @@ fn reject_extracted_note_if_structured_invalid( ) { tracing::info!(error = %err, "Rejecting extracted note due to invalid structured fields."); + let field_path = extract_structured_rejection_field_path(&err); + return Some(AddEventResult { note_id: None, op: NoteOp::Rejected, reason_code: Some(REJECT_STRUCTURED_INVALID.to_string()), reason: reason.cloned(), + field_path, }); } @@ -537,12 +613,22 @@ fn reject_extracted_note_if_writegate_rejects( op: NoteOp::Rejected, reason_code: Some(crate::writegate_reason_code(code).to_string()), reason: reason.cloned(), + field_path: None, }); } None } +fn extract_structured_rejection_field_path(err: &Error) -> Option { + match err { + Error::NonEnglishInput { field } => Some(field.clone()), + Error::InvalidRequest { message } if message.starts_with("structured.") => + message.split_whitespace().next().map(ToString::to_string), + _ => None, + } +} + fn build_extractor_messages( messages: &[EventMessage], max_notes: u32, @@ -557,7 +643,34 @@ fn build_extractor_messages( "structured": { "summary": "string|null", "facts": "string[]|null", - "concepts": "string[]|null" + "concepts": "string[]|null", + "entities": [ + { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + } + ], + "relations": [ + { + "subject": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }, + "predicate": "string", + "object": { + "entity": { + "canonical": "string|null", + "kind": "string|null", + "aliases": "string[]|null" + }, + "value": "string|null" + }, + "valid_from": "string|null", + "valid_to": "string|null" + } + ] }, "importance": 0.0, "confidence": 0.0, @@ -575,6 +688,7 @@ Output must be valid JSON only and must match the provided schema exactly. \ Extract at most MAX_NOTES high-signal, cross-session reusable memory notes from the given messages. \ Each note must be one English sentence and must not contain any CJK characters. \ The structured field is optional. If present, summary must be short, facts must be short sentences supported by the evidence quotes, and concepts must be short phrases. \ +structured.entities and structured.relations should mirror the structured schema with optional entity and relation metadata and relation timestamps. \ Preserve numbers, dates, percentages, currency amounts, tickers, URLs, and code snippets exactly. \ Never store secrets or PII: API keys, tokens, private keys, seed phrases, passwords, bank IDs, personal addresses. \ For every note, provide 1 to 2 evidence quotes copied verbatim from the input messages and include the message_index. \ diff --git a/packages/elf-service/src/add_note.rs b/packages/elf-service/src/add_note.rs index d069a2d..bea6b8b 100644 --- a/packages/elf-service/src/add_note.rs +++ b/packages/elf-service/src/add_note.rs @@ -40,6 +40,7 @@ pub struct AddNoteResult { pub note_id: Option, pub op: NoteOp, pub reason_code: Option, + pub field_path: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -115,7 +116,12 @@ impl ElfService { self.handle_add_note_add(&mut tx, ctx, ¬e, note_id).await?; tx.commit().await?; - Ok(AddNoteResult { note_id: Some(note_id), op: NoteOp::Add, reason_code: None }) + Ok(AddNoteResult { + note_id: Some(note_id), + op: NoteOp::Add, + reason_code: None, + field_path: None, + }) }, UpdateDecision::Update { note_id } => { let result = self @@ -128,7 +134,7 @@ impl ElfService { }, UpdateDecision::None { note_id } => { let result = self - .handle_add_note_none(&mut tx, ¬e, note_id, ctx.now, ctx.embed_version) + .handle_add_note_none(&mut tx, ctx, ¬e, note_id, ctx.now, ctx.embed_version) .await?; tx.commit().await?; @@ -192,6 +198,17 @@ impl ElfService { ctx.now, ) .await?; + self.persist_graph_fields_if_present( + tx, + ctx.tenant_id, + ctx.project_id, + ctx.agent_id, + ctx.scope, + memory_note.note_id, + ctx.now, + note.structured.as_ref(), + ) + .await?; Ok(()) } @@ -239,6 +256,7 @@ impl ElfService { note_id: Some(note_id), op: NoteOp::None, reason_code: None, + field_path: None, }); } @@ -265,6 +283,17 @@ impl ElfService { ) .await?; + self.persist_graph_fields_if_present( + tx, + existing.tenant_id.as_str(), + existing.project_id.as_str(), + existing.agent_id.as_str(), + existing.scope.as_str(), + existing.note_id, + now, + note.structured.as_ref(), + ) + .await?; self.upsert_structured_and_enqueue_outbox( tx, note, @@ -274,32 +303,93 @@ impl ElfService { ) .await?; - Ok(AddNoteResult { note_id: Some(note_id), op: NoteOp::Update, reason_code: None }) + Ok(AddNoteResult { + note_id: Some(note_id), + op: NoteOp::Update, + reason_code: None, + field_path: None, + }) } async fn handle_add_note_none( &self, tx: &mut Transaction<'_, Postgres>, + ctx: &AddNoteContext<'_>, note: &AddNoteInput, note_id: Uuid, now: OffsetDateTime, embed_version: &str, ) -> Result { - if let Some(structured) = note.structured.as_ref() - && !structured.is_effectively_empty() - { - crate::structured_fields::upsert_structured_fields_tx(tx, note_id, structured, now) + let mut should_update = false; + + if let Some(structured) = note.structured.as_ref() { + if !structured.is_effectively_empty() { + crate::structured_fields::upsert_structured_fields_tx(tx, note_id, structured, now) + .await?; + crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", embed_version, now).await?; + + should_update = true; + } + if structured.has_graph_fields() { + self.persist_graph_fields_if_present( + tx, + ctx.tenant_id, + ctx.project_id, + ctx.agent_id, + ctx.scope, + note_id, + now, + Some(structured), + ) .await?; - crate::enqueue_outbox_tx(&mut **tx, note_id, "UPSERT", embed_version, now).await?; + should_update = true; + } + } + + if should_update { return Ok(AddNoteResult { note_id: Some(note_id), op: NoteOp::Update, reason_code: None, + field_path: None, }); } - Ok(AddNoteResult { note_id: Some(note_id), op: NoteOp::None, reason_code: None }) + Ok(AddNoteResult { + note_id: Some(note_id), + op: NoteOp::None, + reason_code: None, + field_path: None, + }) + } + + #[allow(clippy::too_many_arguments)] + async fn persist_graph_fields_if_present( + &self, + tx: &mut Transaction<'_, Postgres>, + tenant_id: &str, + project_id: &str, + agent_id: &str, + scope: &str, + note_id: Uuid, + now: OffsetDateTime, + structured: Option<&StructuredFields>, + ) -> Result<()> { + let Some(structured) = structured else { + return Ok(()); + }; + + if !structured.has_graph_fields() { + return Ok(()); + } + + crate::graph_ingestion::persist_graph_fields_tx( + tx, tenant_id, project_id, agent_id, scope, note_id, structured, now, + ) + .await?; + + Ok(()) } async fn upsert_structured_and_enqueue_outbox( @@ -371,10 +461,13 @@ fn reject_note_if_structured_invalid(note: &AddNoteInput) -> Option String { key.replace('\\', "\\\\").replace('"', "\\\"") } +fn extract_structured_rejection_field_path(err: &Error) -> Option { + match err { + Error::NonEnglishInput { field } => Some(field.clone()), + Error::InvalidRequest { message } if message.starts_with("structured.") => + message.split_whitespace().next().map(ToString::to_string), + _ => None, + } +} + async fn insert_memory_note_tx( tx: &mut Transaction<'_, Postgres>, memory_note: &MemoryNote, diff --git a/packages/elf-service/src/graph.rs b/packages/elf-service/src/graph.rs index 43274c2..489f57c 100644 --- a/packages/elf-service/src/graph.rs +++ b/packages/elf-service/src/graph.rs @@ -1,7 +1,7 @@ use time::OffsetDateTime; use uuid::Uuid; -use crate::Result; +use crate::{ElfService, Result}; use elf_storage::graph; #[allow(dead_code)] @@ -19,7 +19,7 @@ pub(crate) struct GraphUpsertFactArgs<'a> { pub evidence_note_ids: &'a [Uuid], } -impl crate::ElfService { +impl ElfService { #[allow(dead_code)] pub(crate) async fn graph_upsert_fact(&self, args: GraphUpsertFactArgs<'_>) -> Result { let mut tx = self.db.pool.begin().await?; diff --git a/packages/elf-service/src/graph_ingestion.rs b/packages/elf-service/src/graph_ingestion.rs new file mode 100644 index 0000000..0e070c4 --- /dev/null +++ b/packages/elf-service/src/graph_ingestion.rs @@ -0,0 +1,140 @@ +use sqlx::{Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Error, StructuredFields, structured_fields::StructuredEntity}; +use elf_storage::graph; + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn persist_graph_fields_tx( + tx: &mut Transaction<'_, Postgres>, + tenant_id: &str, + project_id: &str, + agent_id: &str, + scope: &str, + note_id: Uuid, + structured: &StructuredFields, + now: OffsetDateTime, +) -> crate::Result<()> { + if !structured.has_graph_fields() { + return Ok(()); + } + + if let Some(entities) = structured.entities.as_ref() { + for (entity_idx, entity) in entities.iter().enumerate() { + let base_path = format!("structured.entities[{entity_idx}]"); + upsert_graph_entity_and_aliases(tx, tenant_id, project_id, entity, base_path.as_str()) + .await?; + } + } + + let relations = structured.relations.as_deref().unwrap_or_default(); + for (relation_idx, relation) in relations.iter().enumerate() { + let relation_path = format!("structured.relations[{relation_idx}]"); + let subject = relation.subject.as_ref().ok_or_else(|| Error::InvalidRequest { + message: format!("{relation_path}.subject is required."), + })?; + let predicate = relation.predicate.as_deref().ok_or_else(|| Error::InvalidRequest { + message: format!("{relation_path}.predicate is required."), + })?; + + let subject_entity_id = upsert_graph_entity_and_aliases( + tx, + tenant_id, + project_id, + subject, + &format!("{relation_path}.subject"), + ) + .await?; + + let valid_from = relation.valid_from.unwrap_or(now); + let valid_to = relation.valid_to; + if let Some(valid_to) = valid_to + && valid_to <= valid_from + { + return Err(Error::InvalidRequest { + message: format!("{relation_path}.valid_to must be greater than valid_from."), + }); + } + + let object = relation.object.as_ref().ok_or_else(|| Error::InvalidRequest { + message: format!("{relation_path}.object is required."), + })?; + + let (object_entity_id, object_value) = match (&object.entity, &object.value) { + (Some(entity), None) => { + let entity_id = upsert_graph_entity_and_aliases( + tx, + tenant_id, + project_id, + entity, + &format!("{relation_path}.object.entity"), + ) + .await?; + (Some(entity_id), None) + }, + (None, Some(value)) => (None, Some(value.as_str())), + _ => { + return Err(Error::InvalidRequest { + message: format!( + "{relation_path}.object must provide exactly one of entity or value.", + ), + }); + }, + }; + + graph::upsert_fact_with_evidence( + tx, + tenant_id, + project_id, + agent_id, + scope, + subject_entity_id, + predicate, + object_entity_id, + object_value, + valid_from, + valid_to, + &[note_id], + ) + .await + .map_err(|err| Error::Storage { message: err.to_string() })?; + } + + Ok(()) +} + +async fn upsert_graph_entity_and_aliases( + tx: &mut Transaction<'_, Postgres>, + tenant_id: &str, + project_id: &str, + entity: &StructuredEntity, + context_path: &str, +) -> crate::Result { + let canonical = entity.canonical.as_deref().ok_or_else(|| Error::InvalidRequest { + message: format!("{context_path}.canonical is required."), + })?; + + let canonical = canonical.trim(); + let entity_id = + graph::upsert_entity(tx, tenant_id, project_id, canonical, entity.kind.as_deref()) + .await + .map_err(|err| Error::Storage { message: err.to_string() })?; + + if let Some(aliases) = entity.aliases.as_ref() { + for (alias_idx, alias) in aliases.iter().enumerate() { + let alias = alias.trim(); + if alias.is_empty() { + return Err(Error::InvalidRequest { + message: format!("{context_path}.aliases[{alias_idx}] must not be empty."), + }); + } + + graph::upsert_entity_alias(tx, entity_id, alias) + .await + .map_err(|err| Error::Storage { message: err.to_string() })?; + } + } + + Ok(entity_id) +} diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 19e6b5c..a4e79ec 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -12,6 +12,7 @@ pub mod time_serde; pub mod update; mod error; +mod graph_ingestion; mod ranking_explain_v2; pub use self::{ diff --git a/packages/elf-service/src/structured_fields.rs b/packages/elf-service/src/structured_fields.rs index 4408805..da27ff0 100644 --- a/packages/elf-service/src/structured_fields.rs +++ b/packages/elf-service/src/structured_fields.rs @@ -10,6 +10,9 @@ use crate::{Error, Result}; use elf_domain::{cjk, evidence}; const MAX_LIST_ITEMS: usize = 64; +const MAX_ENTITIES: usize = 32; +const MAX_RELATIONS: usize = 64; +const MAX_ALIASES: usize = 16; const MAX_ITEM_CHARS: usize = 1_000; #[derive(Clone, Debug, Default, Serialize, Deserialize)] @@ -17,6 +20,8 @@ pub struct StructuredFields { pub summary: Option, pub facts: Option>, pub concepts: Option>, + pub entities: Option>, + pub relations: Option>, } impl StructuredFields { pub fn is_effectively_empty(&self) -> bool { @@ -34,6 +39,36 @@ impl StructuredFields { summary_empty && facts_empty && concepts_empty } + + pub fn has_graph_fields(&self) -> bool { + self.entities.as_ref().is_some_and(|entities| !entities.is_empty()) + || self.relations.as_ref().is_some_and(|relations| !relations.is_empty()) + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct StructuredEntity { + pub canonical: Option, + pub kind: Option, + pub aliases: Option>, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct StructuredRelation { + pub subject: Option, + pub predicate: Option, + pub object: Option, + #[serde(with = "crate::time_serde::option")] + pub valid_from: Option, + #[serde(with = "crate::time_serde::option")] + pub valid_to: Option, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct StructuredRelationObject { + pub entity: Option, + pub value: Option, } #[derive(Clone, Debug, Deserialize)] @@ -47,18 +82,39 @@ pub fn validate_structured_fields( source_ref: &Value, add_event_evidence: Option<&[(usize, String)]>, ) -> Result<()> { + let evidence_quotes: Vec = if let Some(event_evidence) = add_event_evidence { + event_evidence.iter().map(|(_, quote)| quote.clone()).collect() + } else { + extract_source_ref_quotes(source_ref) + }; + if let Some(summary) = structured.summary.as_ref() { validate_text_field(summary, "structured.summary")?; } + if let Some(entities) = structured.entities.as_ref() { + validate_list_field_count(entities.len(), MAX_ENTITIES, "structured.entities")?; + + for (idx, entity) in entities.iter().enumerate() { + let base = format!("structured.entities[{idx}]"); + + validate_structured_entity(entity, &base, true)?; + } + } + if let Some(relations) = structured.relations.as_ref() { + validate_list_field_count(relations.len(), MAX_RELATIONS, "structured.relations")?; + + for (idx, relation) in relations.iter().enumerate() { + validate_structured_relation( + relation, + note_text, + &evidence_quotes, + &format!("structured.relations[{idx}]"), + )?; + } + } if let Some(facts) = structured.facts.as_ref() { validate_list_field(facts, "structured.facts")?; - let evidence_quotes: Vec = if let Some(event_evidence) = add_event_evidence { - event_evidence.iter().map(|(_, quote)| quote.clone()).collect() - } else { - extract_source_ref_quotes(source_ref) - }; - for (idx, fact) in facts.iter().enumerate() { validate_text_field(fact, &format!("structured.facts[{idx}]"))?; @@ -165,6 +221,119 @@ ORDER BY note_id ASC, field_kind ASC, item_index ASC", Ok(out) } +fn validate_structured_entity( + entity: &StructuredEntity, + base: &str, + require_canonical: bool, +) -> Result<()> { + if require_canonical { + validate_required_text_field(entity.canonical.as_ref(), &format!("{base}.canonical"))?; + } + + if let Some(kind) = entity.kind.as_ref() { + validate_text_field(kind, &format!("{base}.kind"))?; + } + if let Some(aliases) = entity.aliases.as_ref() { + validate_list_field_count(aliases.len(), MAX_ALIASES, &format!("{base}.aliases"))?; + + for (alias_idx, alias) in aliases.iter().enumerate() { + validate_text_field(alias, &format!("{base}.aliases[{alias_idx}]"))?; + } + } + + Ok(()) +} + +fn validate_structured_relation( + relation: &StructuredRelation, + note_text: &str, + evidence_quotes: &[String], + base: &str, +) -> Result<()> { + if relation.predicate.is_none() { + return Err(Error::InvalidRequest { message: format!("{base}.predicate is required.") }); + } + + let subject = relation + .subject + .as_ref() + .ok_or_else(|| Error::InvalidRequest { message: format!("{base}.subject is required.") })?; + + validate_structured_entity(subject, &format!("{base}.subject"), true)?; + + let predicate = relation.predicate.as_ref().ok_or_else(|| Error::InvalidRequest { + message: format!("{base}.predicate is required."), + })?; + + validate_text_field(predicate, &format!("{base}.predicate"))?; + + let object = relation + .object + .as_ref() + .ok_or_else(|| Error::InvalidRequest { message: format!("{base}.object is required.") })?; + + match (&object.entity, object.value.as_ref()) { + (Some(entity), None) => { + validate_structured_entity(entity, &format!("{base}.object.entity"), true)?; + + let canonical = entity.canonical.as_deref().ok_or_else(|| Error::InvalidRequest { + message: format!("{base}.object.entity.canonical is required."), + })?; + + if !fact_is_evidence_bound(canonical, note_text, evidence_quotes) { + return Err(Error::InvalidRequest { + message: format!( + "{base}.object.entity.canonical is not supported by note text or evidence quotes." + ), + }); + } + }, + (None, Some(value)) => { + validate_text_field(value, &format!("{base}.object.value"))?; + + if !fact_is_evidence_bound(value, note_text, evidence_quotes) { + return Err(Error::InvalidRequest { + message: format!( + "{base}.object.value is not supported by note text or evidence quotes." + ), + }); + } + }, + (_, _) => { + return Err(Error::InvalidRequest { + message: format!("{base}.object must provide exactly one of entity or value."), + }); + }, + } + + if !fact_is_evidence_bound( + subject.canonical.as_deref().unwrap_or_default(), + note_text, + evidence_quotes, + ) { + return Err(Error::InvalidRequest { + message: format!( + "{base}.subject.canonical is not supported by note text or evidence quotes." + ), + }); + } + if !fact_is_evidence_bound(predicate, note_text, evidence_quotes) { + return Err(Error::InvalidRequest { + message: format!("{base}.predicate is not supported by note text or evidence quotes."), + }); + } + + if let (Some(valid_from), Some(valid_to)) = (relation.valid_from, relation.valid_to) + && valid_to <= valid_from + { + return Err(Error::InvalidRequest { + message: format!("{base}.valid_to must be greater than valid_from."), + }); + } + + Ok(()) +} + fn validate_list_field(items: &[String], label: &str) -> Result<()> { if items.len() > MAX_LIST_ITEMS { return Err(Error::InvalidRequest { @@ -193,6 +362,24 @@ fn validate_text_field(value: &str, label: &str) -> Result<()> { Ok(()) } +fn validate_required_text_field(value: Option<&String>, label: &str) -> Result<()> { + let Some(value) = value else { + return Err(Error::InvalidRequest { message: format!("{label} is required.") }); + }; + + validate_text_field(value, label) +} + +fn validate_list_field_count(len: usize, max: usize, label: &str) -> Result<()> { + if len > max { + return Err(Error::InvalidRequest { + message: format!("{label} must have at most {max} items."), + }); + } + + Ok(()) +} + fn extract_source_ref_quotes(source_ref: &Value) -> Vec { let Some(evidence) = source_ref.get("evidence") else { return Vec::new() }; let Ok(quotes) = serde_json::from_value::>(evidence.clone()) else { @@ -284,6 +471,8 @@ mod tests { summary: None, facts: Some(vec!["Deploy uses reranking".to_string()]), concepts: None, + entities: None, + relations: None, }; let res = validate_structured_fields( &structured, @@ -301,6 +490,8 @@ mod tests { summary: None, facts: Some(vec!["Nonexistent claim.".to_string()]), concepts: None, + entities: None, + relations: None, }; let res = validate_structured_fields(&structured, "Some note.", &serde_json::json!({}), None); diff --git a/packages/elf-service/tests/acceptance/graph_ingestion.rs b/packages/elf-service/tests/acceptance/graph_ingestion.rs new file mode 100644 index 0000000..414af84 --- /dev/null +++ b/packages/elf-service/tests/acceptance/graph_ingestion.rs @@ -0,0 +1,544 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + sync::{Arc, atomic::AtomicUsize}, +}; + +use uuid::Uuid; + +use elf_service::{ + AddEventRequest, AddNoteInput, AddNoteRequest, EmbeddingProvider, EventMessage, NoteOp, + Providers, +}; + +struct HashEmbedding { + vector_dim: u32, +} + +impl EmbeddingProvider for HashEmbedding { + fn embed<'a>( + &'a self, + _: &'a elf_config::EmbeddingProviderConfig, + texts: &'a [String], + ) -> elf_service::BoxFuture<'a, elf_service::Result>>> { + let vector_dim = self.vector_dim as usize; + let vectors = texts + .iter() + .map(|text| { + let mut values = Vec::with_capacity(vector_dim); + + for idx in 0..vector_dim { + let mut hasher = DefaultHasher::new(); + text.hash(&mut hasher); + idx.hash(&mut hasher); + let raw = hasher.finish(); + let normalized = ((raw % 2_000_000) as f32 / 1_000_000.0) - 1.0; + values.push(normalized); + } + + values + }) + .collect(); + + Box::pin(async move { Ok(vectors) }) + } +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn add_note_duplicate_fact_attaches_multiple_evidence() { + let Some(test_db) = crate::acceptance::test_db().await else { + eprintln!( + "Skipping add_note_duplicate_fact_attaches_multiple_evidence; set ELF_PG_DSN to run.", + ); + + return; + }; + let Some(qdrant_url) = crate::acceptance::test_qdrant_url() else { + eprintln!( + "Skipping add_note_duplicate_fact_attaches_multiple_evidence; set ELF_QDRANT_URL to run.", + ); + + return; + }; + + let providers = Providers::new( + Arc::new(HashEmbedding { vector_dim: 4_096 }), + Arc::new(crate::acceptance::StubRerank), + Arc::new(crate::acceptance::SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let cfg = + crate::acceptance::test_config(test_db.dsn().to_string(), qdrant_url, 4_096, collection); + let service = + crate::acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + crate::acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + let response = service + .add_note(AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![ + AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship-a".to_string()), + text: "Alice mentors Bob in 2026.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.8, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + }, + AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship-b".to_string()), + text: "Alice also mentors Bob often.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.7, + confidence: 0.8, + ttl_days: None, + source_ref: serde_json::json!({}), + }, + ], + }) + .await + .expect("add_note failed."); + + assert_eq!(response.results.len(), 2); + assert_eq!(response.results[0].op, NoteOp::Add); + assert_eq!(response.results[1].op, NoteOp::Add); + let first_note_id = response.results[0].note_id.expect("Expected note_id."); + let second_note_id = response.results[1].note_id.expect("Expected note_id."); + assert_ne!(first_note_id, second_note_id); + + let fact_id: Uuid = sqlx::query_scalar( + "\ +SELECT gf.fact_id +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact."); + + let fact_count: i64 = sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to count fact rows."); + + let evidence_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1") + .bind(fact_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact evidence."); + + assert_eq!(fact_count, 1); + assert_eq!(evidence_count, 2); + + let first_evidence_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", + ) + .bind(fact_id) + .bind(first_note_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to load first note evidence."); + let second_evidence_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", + ) + .bind(fact_id) + .bind(second_note_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to load second note evidence."); + + assert_eq!(first_evidence_count, 1); + assert_eq!(second_evidence_count, 1); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn add_note_invalid_relation_rejected_has_field_path() { + let Some(test_db) = crate::acceptance::test_db().await else { + eprintln!( + "Skipping add_note_invalid_relation_rejected_has_field_path; set ELF_PG_DSN to run." + ); + + return; + }; + let Some(qdrant_url) = crate::acceptance::test_qdrant_url() else { + eprintln!( + "Skipping add_note_invalid_relation_rejected_has_field_path; set ELF_QDRANT_URL to run.", + ); + + return; + }; + + let providers = Providers::new( + Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), + Arc::new(crate::acceptance::StubRerank), + Arc::new(crate::acceptance::SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let cfg = + crate::acceptance::test_config(test_db.dsn().to_string(), qdrant_url, 4_096, collection); + let service = + crate::acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + let response = service + .add_note(AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship".to_string()), + text: "Alice mentors Bob.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.8, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + }], + }) + .await + .expect("add_note failed."); + + assert_eq!(response.results.len(), 1); + assert_eq!(response.results[0].op, NoteOp::Rejected); + assert_eq!(response.results[0].reason_code.as_deref(), Some("REJECT_STRUCTURED_INVALID")); + assert_eq!( + response.results[0].field_path, + Some("structured.relations[0].predicate".to_string()), + ); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn add_note_persists_graph_relations() { + let Some(test_db) = crate::acceptance::test_db().await else { + eprintln!("Skipping add_note_persists_graph_relations; set ELF_PG_DSN to run."); + + return; + }; + let Some(qdrant_url) = crate::acceptance::test_qdrant_url() else { + eprintln!("Skipping add_note_persists_graph_relations; set ELF_QDRANT_URL to run."); + + return; + }; + + let providers = Providers::new( + Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), + Arc::new(crate::acceptance::StubRerank), + Arc::new(crate::acceptance::SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: serde_json::json!({ "notes": [] }), + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let cfg = + crate::acceptance::test_config(test_db.dsn().to_string(), qdrant_url, 4_096, collection); + let service = + crate::acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + crate::acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + let response = service + .add_note(AddNoteRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: "agent_private".to_string(), + notes: vec![AddNoteInput { + r#type: "fact".to_string(), + key: Some("mentorship".to_string()), + text: "Alice mentors Bob.".to_string(), + structured: Some( + serde_json::from_value::( + serde_json::json!({ + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }), + ) + .expect("Failed to build structured fields."), + ), + importance: 0.8, + confidence: 0.9, + ttl_days: None, + source_ref: serde_json::json!({}), + }], + }) + .await + .expect("add_note failed."); + + assert_eq!(response.results.len(), 1); + assert_eq!(response.results[0].op, NoteOp::Add); + let note_id = response.results[0].note_id.expect("Expected note_id."); + + let fact_id: Uuid = sqlx::query_scalar( + "\ +SELECT gf.fact_id +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact."); + + let fact_count: i64 = sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to count fact rows."); + + let evidence_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", + ) + .bind(fact_id) + .bind(note_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact evidence."); + + assert_eq!(fact_count, 1); + assert_eq!(evidence_count, 1); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_URL to run."] +async fn add_event_persists_graph_relations() { + let Some(test_db) = crate::acceptance::test_db().await else { + eprintln!("Skipping add_event_persists_graph_relations; set ELF_PG_DSN to run."); + + return; + }; + let Some(qdrant_url) = crate::acceptance::test_qdrant_url() else { + eprintln!("Skipping add_event_persists_graph_relations; set ELF_QDRANT_URL to run."); + + return; + }; + + let extractor_payload = serde_json::json!({ + "notes": [{ + "type": "fact", + "key": "mentorship", + "text": "Alice mentors Bob.", + "structured": { + "relations": [{ + "subject": { "canonical": "Alice" }, + "predicate": "mentors", + "object": { "value": "Bob" } + }] + }, + "importance": 0.8, + "confidence": 0.9, + "ttl_days": null, + "scope_suggestion": "agent_private", + "evidence": [{ "message_index": 0, "quote": "Alice mentors Bob." }], + "reason": "test" + }] + }); + let providers = Providers::new( + Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), + Arc::new(crate::acceptance::StubRerank), + Arc::new(crate::acceptance::SpyExtractor { + calls: Arc::new(AtomicUsize::new(0)), + payload: extractor_payload, + }), + ); + let collection = test_db.collection_name("elf_acceptance"); + let cfg = + crate::acceptance::test_config(test_db.dsn().to_string(), qdrant_url, 4_096, collection); + let service = + crate::acceptance::build_service(cfg, providers).await.expect("Failed to build service."); + + crate::acceptance::reset_db(&service.db.pool).await.expect("Failed to reset test database."); + + let response = service + .add_event(AddEventRequest { + tenant_id: "t".to_string(), + project_id: "p".to_string(), + agent_id: "a".to_string(), + scope: Some("agent_private".to_string()), + dry_run: Some(false), + messages: vec![EventMessage { + role: "user".to_string(), + content: "Alice mentors Bob.".to_string(), + ts: None, + msg_id: None, + }], + }) + .await + .expect("add_event failed."); + + assert_eq!(response.results.len(), 1); + assert_eq!(response.results[0].op, NoteOp::Add); + let note_id = response.results[0].note_id.expect("Expected note_id."); + + let fact_id: Uuid = sqlx::query_scalar( + "\ +SELECT gf.fact_id +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact."); + + let fact_count: i64 = sqlx::query_scalar( + "\ +SELECT COUNT(*) +FROM graph_facts gf +JOIN graph_entities ge ON ge.entity_id = gf.subject_entity_id +WHERE ge.canonical_norm = $1 + AND gf.predicate = $2 + AND gf.object_value = $3 + AND gf.tenant_id = $4 + AND gf.project_id = $5 + AND gf.scope = $6", + ) + .bind("alice") + .bind("mentors") + .bind("Bob") + .bind("t") + .bind("p") + .bind("agent_private") + .fetch_one(&service.db.pool) + .await + .expect("Failed to count fact rows."); + + let evidence_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM graph_fact_evidence WHERE fact_id = $1 AND note_id = $2", + ) + .bind(fact_id) + .bind(note_id) + .fetch_one(&service.db.pool) + .await + .expect("Failed to load fact evidence."); + + assert_eq!(fact_count, 1); + assert_eq!(evidence_count, 1); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} diff --git a/packages/elf-service/tests/acceptance/suite.rs b/packages/elf-service/tests/acceptance/suite.rs index 75590a7..9ea74e5 100644 --- a/packages/elf-service/tests/acceptance/suite.rs +++ b/packages/elf-service/tests/acceptance/suite.rs @@ -3,6 +3,7 @@ mod chunk_search; mod chunking; mod english_only_boundary; mod evidence_binding; +mod graph_ingestion; mod idempotency; mod outbox_eventual_consistency; mod rebuild_qdrant; diff --git a/packages/elf-storage/src/graph.rs b/packages/elf-storage/src/graph.rs index e84ae6e..233c836 100644 --- a/packages/elf-storage/src/graph.rs +++ b/packages/elf-storage/src/graph.rs @@ -72,7 +72,6 @@ RETURNING fact_id", .bind(valid_to) .fetch_one(&mut *executor) .await?; - let fact_id = row.0; for note_id in evidence_note_ids { @@ -92,6 +91,135 @@ ON CONFLICT (fact_id, note_id) DO NOTHING", Ok(fact_id) } +#[allow(clippy::too_many_arguments)] +pub async fn upsert_fact_with_evidence( + executor: &mut PgConnection, + tenant_id: &str, + project_id: &str, + agent_id: &str, + scope: &str, + subject_entity_id: Uuid, + predicate: &str, + object_entity_id: Option, + object_value: Option<&str>, + valid_from: OffsetDateTime, + valid_to: Option, + evidence_note_ids: &[Uuid], +) -> Result { + if evidence_note_ids.is_empty() { + return Err(Error::InvalidArgument( + "graph fact evidence is required; evidence_note_ids must not be empty".to_string(), + )); + } + + let fact_id = match (object_entity_id, object_value) { + (Some(object_entity_id), None) => { + let row: (Uuid,) = sqlx::query_as::<_, (Uuid,)>( + "\ +INSERT INTO graph_facts ( +\tfact_id, +\ttenant_id, +\tproject_id, +\tagent_id, +\tscope, +\tsubject_entity_id, +\tpredicate, +\tobject_entity_id, +\tobject_value, +\tvalid_from, +\tvalid_to, +\tcreated_at, +\tupdated_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, now(), now()) +ON CONFLICT (tenant_id, project_id, scope, subject_entity_id, predicate, object_entity_id) +WHERE valid_to IS NULL AND object_entity_id IS NOT NULL +DO UPDATE +SET updated_at = graph_facts.updated_at +RETURNING fact_id", + ) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(scope) + .bind(subject_entity_id) + .bind(predicate) + .bind(Some(object_entity_id)) + .bind(None::) + .bind(valid_from) + .bind(valid_to) + .fetch_one(&mut *executor) + .await?; + + row.0 + }, + (None, Some(object_value)) => { + let row: (Uuid,) = sqlx::query_as::<_, (Uuid,)>( + "\ +INSERT INTO graph_facts ( +\tfact_id, +\ttenant_id, +\tproject_id, +\tagent_id, +\tscope, +\tsubject_entity_id, +\tpredicate, +\tobject_entity_id, +\tobject_value, +\tvalid_from, +\tvalid_to, +\tcreated_at, +\tupdated_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, now(), now()) +ON CONFLICT (tenant_id, project_id, scope, subject_entity_id, predicate, object_value) +WHERE valid_to IS NULL AND object_value IS NOT NULL +DO UPDATE +SET updated_at = graph_facts.updated_at +RETURNING fact_id", + ) + .bind(Uuid::new_v4()) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(scope) + .bind(subject_entity_id) + .bind(predicate) + .bind(None::) + .bind(Some(object_value)) + .bind(valid_from) + .bind(valid_to) + .fetch_one(&mut *executor) + .await?; + + row.0 + }, + _ => { + return Err(Error::InvalidArgument( + "graph fact must provide exactly one of object_entity_id and object_value" + .to_string(), + )); + }, + }; + + for note_id in evidence_note_ids { + sqlx::query( + "\ +INSERT INTO graph_fact_evidence (evidence_id, fact_id, note_id, created_at) +VALUES ($1, $2, $3, now()) +ON CONFLICT (fact_id, note_id) DO NOTHING", + ) + .bind(Uuid::new_v4()) + .bind(fact_id) + .bind(*note_id) + .execute(&mut *executor) + .await?; + } + + Ok(fact_id) +} + pub async fn upsert_entity( executor: &mut PgConnection, tenant_id: &str, @@ -100,7 +228,6 @@ pub async fn upsert_entity( kind: Option<&str>, ) -> Result { let canonical_norm = normalize_entity_name(canonical); - let row: (Uuid,) = sqlx::query_as( "\ INSERT INTO graph_entities ( diff --git a/packages/elf-storage/tests/graph_memory.rs b/packages/elf-storage/tests/graph_memory.rs index a5f50c0..b72c453 100644 --- a/packages/elf-storage/tests/graph_memory.rs +++ b/packages/elf-storage/tests/graph_memory.rs @@ -1,16 +1,10 @@ -use serde_json::json; use sqlx::PgConnection; use time::{Duration, OffsetDateTime}; use uuid::Uuid; use elf_config::Postgres; use elf_storage::{ - Error as StorageError, db::Db, - graph::{ - fetch_active_facts_for_subject, insert_fact_with_evidence, normalize_entity_name, - upsert_entity, - }, models::{GraphFact, MemoryNote}, queries, }; @@ -26,7 +20,6 @@ async fn graph_entity_upsert_is_idempotent_by_normalized_canonical() { return; }; - let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); @@ -34,22 +27,35 @@ async fn graph_entity_upsert_is_idempotent_by_normalized_canonical() { db.ensure_schema(4_096).await.expect("Failed to ensure schema."); let mut tx = db.pool.begin().await.expect("Failed to open transaction."); - let tenant_id = "tenant-a"; let project_id = "project-a"; - let entity_id = upsert_entity(&mut tx, tenant_id, project_id, " Alice Doe ", Some("person")) - .await - .expect("Failed to upsert canonical entity."); - let canonical_norm = normalize_entity_name("Alice doe"); + let entity_id = elf_storage::graph::upsert_entity( + &mut tx, + tenant_id, + project_id, + " Alice Doe ", + Some("person"), + ) + .await + .expect("Failed to upsert canonical entity."); + let canonical_norm = elf_storage::graph::normalize_entity_name("Alice doe"); + assert_eq!(canonical_norm, "alice doe"); - let entity_again = upsert_entity(&mut tx, tenant_id, project_id, "Alice\tDoe", Some("person")) - .await - .expect("Failed to upsert canonical alias."); + let entity_again = elf_storage::graph::upsert_entity( + &mut tx, + tenant_id, + project_id, + "Alice\tDoe", + Some("person"), + ) + .await + .expect("Failed to upsert canonical alias."); assert_eq!(entity_id, entity_again); tx.commit().await.expect("Failed to commit transaction."); + assert!(test_db.cleanup().await.is_ok(), "Failed to cleanup test database."); } @@ -61,7 +67,6 @@ async fn graph_fact_with_empty_evidence_is_rejected() { return; }; - let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); @@ -69,11 +74,11 @@ async fn graph_fact_with_empty_evidence_is_rejected() { db.ensure_schema(4_096).await.expect("Failed to ensure schema."); let mut tx = db.pool.begin().await.expect("Failed to open transaction."); - let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity A", None) - .await - .expect("Failed to upsert subject."); - - let err = insert_fact_with_evidence( + let subject = + elf_storage::graph::upsert_entity(&mut tx, "tenant-a", "project-a", "Entity A", None) + .await + .expect("Failed to upsert subject."); + let err = elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -90,7 +95,7 @@ async fn graph_fact_with_empty_evidence_is_rejected() { .await .expect_err("Expected empty evidence to be rejected."); - assert!(matches!(err, StorageError::InvalidArgument(_))); + assert!(matches!(err, elf_storage::Error::InvalidArgument(_))); tx.rollback().await.expect("Failed to rollback transaction."); test_db.cleanup().await.expect("Failed to cleanup test database."); @@ -106,7 +111,6 @@ async fn graph_fact_duplicates_with_active_window_fail_unique_constraint() { return; }; - let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); @@ -115,17 +119,17 @@ async fn graph_fact_duplicates_with_active_window_fail_unique_constraint() { let mut tx = db.pool.begin().await.expect("Failed to open transaction."); let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; - - let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) - .await - .expect("Failed to upsert subject."); - let object = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Object", None) - .await - .expect("Failed to upsert object."); - + let subject = + elf_storage::graph::upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); + let object = + elf_storage::graph::upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Object", None) + .await + .expect("Failed to upsert object."); let now = OffsetDateTime::now_utc(); - insert_fact_with_evidence( + elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -142,7 +146,7 @@ async fn graph_fact_duplicates_with_active_window_fail_unique_constraint() { .await .expect("Failed to insert graph fact."); - let err = insert_fact_with_evidence( + let err = elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -172,7 +176,6 @@ async fn graph_fact_rejects_invalid_valid_window() { return; }; - let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); @@ -181,13 +184,12 @@ async fn graph_fact_rejects_invalid_valid_window() { let mut tx = db.pool.begin().await.expect("Failed to open transaction."); let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; - - let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) - .await - .expect("Failed to upsert subject."); - + let subject = + elf_storage::graph::upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); let now = OffsetDateTime::now_utc(); - let err = insert_fact_with_evidence( + let err = elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -219,7 +221,6 @@ async fn graph_fetch_active_facts_returns_active_window_only() { return; }; - let test_db = TestDatabase::new(&base_dsn).await.expect("Failed to create test database."); let cfg = Postgres { dsn: test_db.dsn().to_string(), pool_max_conns: 1 }; let db = Db::connect(&cfg).await.expect("Failed to connect to Postgres."); @@ -228,14 +229,12 @@ async fn graph_fetch_active_facts_returns_active_window_only() { let mut tx = db.pool.begin().await.expect("Failed to open transaction."); let note_id = insert_memory_note(&mut tx, "tenant-a", "project-a").await; - - let subject = upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) - .await - .expect("Failed to upsert subject."); - + let subject = + elf_storage::graph::upsert_entity(&mut tx, "tenant-a", "project-a", "Entity Subject", None) + .await + .expect("Failed to upsert subject."); let now = OffsetDateTime::now_utc(); - - let active = insert_fact_with_evidence( + let active = elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -252,7 +251,7 @@ async fn graph_fetch_active_facts_returns_active_window_only() { .await .expect("Failed to insert active graph fact."); - insert_fact_with_evidence( + elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -268,8 +267,7 @@ async fn graph_fetch_active_facts_returns_active_window_only() { ) .await .expect("Failed to insert expired graph fact."); - - insert_fact_with_evidence( + elf_storage::graph::insert_fact_with_evidence( &mut tx, "tenant-a", "project-a", @@ -286,10 +284,16 @@ async fn graph_fetch_active_facts_returns_active_window_only() { .await .expect("Failed to insert future graph fact."); - let facts: Vec = - fetch_active_facts_for_subject(&mut tx, "tenant-a", "project-a", "scope-a", subject, now) - .await - .expect("Failed to fetch active graph facts."); + let facts: Vec = elf_storage::graph::fetch_active_facts_for_subject( + &mut tx, + "tenant-a", + "project-a", + "scope-a", + subject, + now, + ) + .await + .expect("Failed to fetch active graph facts."); assert_eq!(facts.len(), 1); assert_eq!(facts[0].fact_id, active); @@ -321,7 +325,7 @@ async fn insert_memory_note( updated_at: OffsetDateTime::now_utc(), expires_at: None, embedding_version: "test:vec:1".to_string(), - source_ref: json!({}), + source_ref: serde_json::json!({}), hit_count: 0, last_hit_at: None, };