From dc11f2b06295e3a04fda15becde17f3bb8d992f3 Mon Sep 17 00:00:00 2001 From: OmarElhagagy Date: Sat, 22 Feb 2025 22:02:43 +0200 Subject: [PATCH 1/2] Add DELETE /api/v0/pipeline-nodes/:id with transactions, error handling, and tests #20 --- api/src/main.rs | 6 +- api/src/routes/api/v0/pipeline_nodes.rs | 137 ++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 1 deletion(-) diff --git a/api/src/main.rs b/api/src/main.rs index fc40f64..a350142 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -4,7 +4,7 @@ use async_nats::{ jetstream::{self}, }; use axum::{ - routing::{get, post}, + routing::{get, post, delete}, Router, }; use db::seed::seed_database; @@ -167,6 +167,10 @@ async fn main() -> io::Result<()> { "/pipeline-nodes/:id", post(routes::api::v0::pipeline_nodes::update), ) + .route( + "/pipeline-nodes/:id", + delete(routes::api::v0::pipeline_nodes::delete) + ) .route( "/pipeline-node-connections", post(routes::api::v0::pipeline_node_connections::create), diff --git a/api/src/routes/api/v0/pipeline_nodes.rs b/api/src/routes/api/v0/pipeline_nodes.rs index bbcce35..9a70271 100644 --- a/api/src/routes/api/v0/pipeline_nodes.rs +++ b/api/src/routes/api/v0/pipeline_nodes.rs @@ -1,6 +1,8 @@ use axum::{extract::Path, Json}; use hyper::StatusCode; use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::info; use uuid::Uuid; use crate::{ @@ -49,6 +51,83 @@ pub struct PipelineNodeCreationParams { coords: Coords, } +#[derive(Debug, Serialize)] +struct ApiError { + error: String, +} + +impl From for ApiError { + fn from(err: sqlx::Error) -> Self { + ApiError { + error: err.to_string(), + } + } +} + +pub async fn delete( + DatabaseConnection(mut conn): DatabaseConnection, + Path(id): Path, +) -> Result)> { + info!("Attempting to delete pipeline node: {}", id); + + let mut tx = conn + .begin() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + // Check if node exists + let node_exists = sqlx::query!("SELECT id FROM pipeline_nodes WHERE id = $1", id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))? + .is_some(); + + if !node_exists { + return Err(( + StatusCode::NOT_FOUND, + Json(ApiError { + error: "Pipeline node not found".to_string(), + }), + )); + } + + // Delete connections + sqlx::query!( + r#" + DELETE FROM pipeline_node_connections + WHERE source_node_id = $1 OR target_node_id = $1 + "#, + id + ) + .execute(&mut *tx) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + // Delete inputs and outputs + sqlx::query!("DELETE FROM pipeline_node_inputs WHERE pipeline_node_id = $1", id) + .execute(&mut *tx) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + sqlx::query!("DELETE FROM pipeline_node_outputs WHERE pipeline_node_id = $1", id) + .execute(&mut *tx) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + // Delete the node + sqlx::query!("DELETE FROM pipeline_nodes WHERE id = $1", id) + .execute(&mut *tx) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + tx.commit() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + + info!("Successfully deleted pipeline node: {}", id); + Ok(StatusCode::NO_CONTENT) +} + pub async fn create( DatabaseConnection(mut conn): DatabaseConnection, Json(payload): Json, @@ -163,3 +242,61 @@ pub async fn update( outputs: vec![], })) } + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::PgPool; + + #[tokio::test] + async fn test_delete_pipeline_node_success() { + // Replace with your test database URL + let pool = PgPool::connect("postgres://user:pass@localhost/test_db") + .await + .unwrap(); + let mut conn = DatabaseConnection(pool.acquire().await.unwrap()); + + // Insert test data + let pipeline_id = Uuid::new_v4(); + let node_id = Uuid::new_v4(); + let id = Uuid::new_v4(); + sqlx::query!( + "INSERT INTO pipeline_nodes (id, pipeline_id, node_id, node_version, coords) VALUES ($1, $2, $3, $4, $5)", + id, + pipeline_id, + node_id, + "1.0", + serde_json::json!({"x": 0, "y": 0}) + ) + .execute(&mut *conn.0) + .await + .unwrap(); + + // Delete the node + let result = delete(conn, Path(id)).await; + assert!(matches!(result, Ok(StatusCode::NO_CONTENT))); + + // Verify deletion + let exists = sqlx::query!("SELECT id FROM pipeline_nodes WHERE id = $1", id) + .fetch_optional(&mut *pool.acquire().await.unwrap()) + .await + .unwrap() + .is_none(); + assert!(exists); + } + + #[tokio::test] + async fn test_delete_pipeline_node_not_found() { + let pool = PgPool::connect("postgres://user:pass@localhost/test_db") + .await + .unwrap(); + let conn = DatabaseConnection(pool.acquire().await.unwrap()); + + let id = Uuid::new_v4(); + let result = delete(conn, Path(id)).await; + assert!(matches!( + result, + Err((StatusCode::NOT_FOUND, _)) + )); + } +} From 567489c0e275dd976e0ce036d0f7573cdc7b3d86 Mon Sep 17 00:00:00 2001 From: OmarElhagagy Date: Sun, 23 Feb 2025 23:35:32 +0200 Subject: [PATCH 2/2] Update DELETE /api/v0/pipeline-nodes/:id with Acquire, internal_error, CASCADE, and fixes #20 --- api/src/routes/api/v0/pipeline_nodes.rs | 44 ++++++------------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/api/src/routes/api/v0/pipeline_nodes.rs b/api/src/routes/api/v0/pipeline_nodes.rs index 9a70271..600e2a0 100644 --- a/api/src/routes/api/v0/pipeline_nodes.rs +++ b/api/src/routes/api/v0/pipeline_nodes.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::info; use uuid::Uuid; +use sqlx::Acquire; use crate::{ app_state::{Coords, DatabaseConnection}, @@ -68,18 +69,18 @@ pub async fn delete( DatabaseConnection(mut conn): DatabaseConnection, Path(id): Path, ) -> Result)> { - info!("Attempting to delete pipeline node: {}", id); + info!("Attempting to delete pipeline node: {id}"); let mut tx = conn .begin() .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + .map_err(internal_error)?; // Check if node exists let node_exists = sqlx::query!("SELECT id FROM pipeline_nodes WHERE id = $1", id) .fetch_optional(&mut *tx) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))? + .map_err(internal_error)? .is_some(); if !node_exists { @@ -91,40 +92,17 @@ pub async fn delete( )); } - // Delete connections - sqlx::query!( - r#" - DELETE FROM pipeline_node_connections - WHERE source_node_id = $1 OR target_node_id = $1 - "#, - id - ) - .execute(&mut *tx) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; - - // Delete inputs and outputs - sqlx::query!("DELETE FROM pipeline_node_inputs WHERE pipeline_node_id = $1", id) - .execute(&mut *tx) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; - - sqlx::query!("DELETE FROM pipeline_node_outputs WHERE pipeline_node_id = $1", id) - .execute(&mut *tx) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; - - // Delete the node + // Delete the node (dependencies handled by ON DELETE CASCADE) sqlx::query!("DELETE FROM pipeline_nodes WHERE id = $1", id) .execute(&mut *tx) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + .map_err(internal_error)?; tx.commit() .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::from(e))))?; + .map_err(internal_error)?; - info!("Successfully deleted pipeline node: {}", id); + info!("Successfully deleted pipeline node: {id}"); Ok(StatusCode::NO_CONTENT) } @@ -257,14 +235,12 @@ mod tests { let mut conn = DatabaseConnection(pool.acquire().await.unwrap()); // Insert test data - let pipeline_id = Uuid::new_v4(); - let node_id = Uuid::new_v4(); let id = Uuid::new_v4(); sqlx::query!( "INSERT INTO pipeline_nodes (id, pipeline_id, node_id, node_version, coords) VALUES ($1, $2, $3, $4, $5)", id, - pipeline_id, - node_id, + Uuid::new_v4, + Uuid::new_v4, "1.0", serde_json::json!({"x": 0, "y": 0}) )