diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f341539af..e5f2abb18 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -17,16 +17,8 @@ * */ -use std::{ - collections::{HashMap, HashSet}, - fs::{self, File, OpenOptions, remove_file, write}, - num::NonZeroU32, - path::{Path, PathBuf}, - sync::{Arc, Mutex, RwLock}, - time::{Instant, SystemTime, UNIX_EPOCH}, -}; - use arrow_array::RecordBatch; +use arrow_ipc::reader::StreamReader; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::derive::{Deref, DerefMut}; @@ -41,6 +33,15 @@ use parquet::{ schema::types::ColumnPath, }; use relative_path::RelativePathBuf; +use std::io::BufReader; +use std::{ + collections::{HashMap, HashSet}, + fs::{self, File, OpenOptions, remove_file, write}, + num::NonZeroU32, + path::{Path, PathBuf}, + sync::{Arc, Mutex, RwLock}, + time::{Instant, SystemTime, UNIX_EPOCH}, +}; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; use ulid::Ulid; @@ -61,7 +62,7 @@ use crate::{ }; use super::{ - ARROW_FILE_EXTENSION, LogStream, + ARROW_FILE_EXTENSION, LogStream, PART_FILE_EXTENSION, staging::{ StagingError, reader::{MergedRecordReader, MergedReverseRecordReader}, @@ -994,12 +995,112 @@ impl Stream { None } + /// Recovers orphaned .part files from a previous interrupted run. + /// These are incomplete arrow files that weren't finalized before the server crashed. + /// Valid .part files are renamed to .arrows for processing, invalid ones are removed. + fn recover_orphan_part_files(&self) { + let Ok(dir) = self.data_path.read_dir() else { + return; + }; + + for entry in dir.flatten() { + let path = entry.path(); + if path + .extension() + .is_some_and(|ext| ext == PART_FILE_EXTENSION) + { + info!( + "Found orphaned .part file: {:?} for stream {}", + path, self.stream_name + ); + + // Check if file is non-empty and potentially valid + match path.metadata() { + Ok(meta) if meta.len() == 0 => { + warn!( + "Removing empty orphaned .part file: {:?} for stream {}", + path, self.stream_name + ); + if let Err(e) = remove_file(&path) { + error!("Failed to remove empty .part file {:?}: {e}", path); + } + continue; + } + Ok(_) => { + // Try to validate the arrow file by reading its schema + match File::open(&path) { + Ok(file) => { + match StreamReader::try_new(BufReader::new(file), None) { + Ok(_reader) => { + // File has valid schema, rename to .arrows + let mut arrow_path = path.clone(); + arrow_path.set_extension(ARROW_FILE_EXTENSION); + + // If arrow file with same name exists, generate a unique name + if arrow_path.exists() { + let file_name = + arrow_path.file_name().unwrap().to_string_lossy(); + if let Some(date_pos) = file_name.find(".date") { + let random_suffix = ulid::Ulid::new().to_string(); + let new_name = format!( + "{}{}", + random_suffix, + &file_name[date_pos..] + ); + arrow_path.set_file_name(new_name); + } + } + + info!( + "Recovering orphaned .part file: {:?} -> {:?} for stream {}", + path, arrow_path, self.stream_name + ); + if let Err(e) = std::fs::rename(&path, &arrow_path) { + error!( + "Failed to rename .part file {:?} to {:?}: {e}", + path, arrow_path + ); + } + } + Err(e) => { + // File is invalid/corrupted, remove it + warn!( + "Removing invalid/corrupted .part file: {:?} for stream {}: {e}", + path, self.stream_name + ); + if let Err(e) = remove_file(&path) { + error!( + "Failed to remove invalid .part file {:?}: {e}", + path + ); + } + } + } + } + Err(e) => { + error!("Failed to open .part file {:?} for validation: {e}", path); + } + } + } + Err(e) => { + warn!("Could not get metadata for .part file {:?}: {e}", path); + } + } + } + } + } + /// First flushes arrows onto disk and then converts the arrow into parquet pub fn flush_and_convert( &self, init_signal: bool, shutdown_signal: bool, ) -> Result<(), StagingError> { + // On init, recover any orphaned .part files from previous interrupted runs + if init_signal { + self.recover_orphan_part_files(); + } + let start_flush = Instant::now(); // Force flush for init or shutdown signals to convert all .part files to .arrows // For regular cycles, use false to only flush non-current writers