diff --git a/Cargo.toml b/Cargo.toml index 795f3a5..8963c54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "arrayify" -version = "0.1.2" +version = "0.2.0" edition = "2024" [dependencies] diff --git a/src/args.rs b/src/args.rs index a2fb443..255ca3b 100644 --- a/src/args.rs +++ b/src/args.rs @@ -2,20 +2,28 @@ use clap::{Arg, ArgMatches, Command as ClapCommand}; pub fn parse_args() -> ArgMatches { ClapCommand::new("arrayify") - .version("0.1.2") + .version("0.2.0") .author("Sam Dougan") - .about("Submits and checks bsub job arrays from a CSV file") + .about("Submits and checks bsub job arrays from a CSV file or directory") .subcommand_required(true) .arg_required_else_help(true) .subcommand( ClapCommand::new("sub") - .about("Submit job array from CSV") + .about("Submit job array from CSV or directory") .arg(Arg::new("csv") .short('s') .long("csv") .value_name("CSV_FILE") .help("Path to the CSV file") - .required(true)) + .conflicts_with("dir") + .required_unless_present("dir")) + .arg(Arg::new("dir") + .short('d') + .long("dir") + .value_name("DIRECTORY") + .help("Path to the input directory") + .conflicts_with("csv") + .required_unless_present("csv")) .arg(Arg::new("command") .short('c') .long("command") @@ -44,7 +52,7 @@ pub fn parse_args() -> ArgMatches { .short('b') .long("batch") .value_name("BATCH_SIZE") - .help("Batch size of actively running jobs normally signified by %N default is 20% of the array") + .help("Batch size of actively running jobs normally signified by %N (default: 20% of the array)") .default_value("auto"))) .subcommand( ClapCommand::new("check") diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 0000000..3ba8dd8 --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,100 @@ +use csv::ReaderBuilder; +use std::collections::HashMap; +use std::fs::{self}; +use std::io::{self}; +use std::path::{Path, PathBuf}; + +pub fn read_jobs_from_csv(csv_file: &str, command_template: &str) -> io::Result> { + let mut rdr = ReaderBuilder::new() + .has_headers(true) + .from_path(csv_file) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + let headers = rdr + .headers() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + .clone(); + let mut jobs = Vec::new(); + + for result in rdr.records() { + let record = result.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut job_command = command_template.to_string(); + + for (i, header) in headers.iter().enumerate() { + let placeholder = format!("{{{}}}", header); + if let Some(value) = record.get(i) { + job_command = job_command.replace(&placeholder, value); + } + } + jobs.push(job_command); + } + + Ok(jobs) +} + +pub fn read_jobs_from_dir( + dir_path: &str, + command_template: &str, +) -> io::Result> { + let dir = Path::new(dir_path); + if !dir.is_dir() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Provided path is not a directory", + )); + } + + // Collect all files in the directory + let mut file_map: HashMap, Option)> = HashMap::new(); + + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|f| f.to_str()) { + if file_name.contains("_1") { + let id = file_name + .split("_1") + .next() + .unwrap_or(file_name) + .to_string(); + file_map.entry(id).or_insert((None, None)).0 = Some(path.clone()); + } else if file_name.contains("_2") { + let id = file_name + .split("_2") + .next() + .unwrap_or(file_name) + .to_string(); + file_map.entry(id).or_insert((None, None)).1 = Some(path.clone()); + } + } + } + } + + // Validate and collect paired files + let mut jobs = Vec::new(); + for (id, (r1, r2)) in file_map { + if let (Some(r1_path), Some(r2_path)) = (r1, r2) { + // Replace placeholders in the command template + let job_command = command_template + .replace("{ID}", &id) + .replace("{R1}", r1_path.to_str().unwrap_or_default()) + .replace("{R2}", r2_path.to_str().unwrap_or_default()); + jobs.push(job_command); + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Missing R1 or R2 for ID: {}", id), + )); + } + } + + if jobs.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "No valid file pairs found in the directory", + )); + } + + Ok(jobs) +} diff --git a/src/main.rs b/src/main.rs index f121c11..71f7c55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,10 @@ mod args; +mod jobs; +mod submission; -use chrono::Local; use clap::Subcommand; -use csv::ReaderBuilder; -use regex::Regex; -use std::fs::{self, File}; -use std::io::{self, Write}; use std::process::Command; +use submission::InputFormat; #[derive(Subcommand)] enum SubCommands { @@ -17,145 +15,6 @@ enum SubCommands { }, } -fn read_jobs_from_csv(csv_file: &str, command_template: &str) -> io::Result> { - let mut rdr = ReaderBuilder::new() - .has_headers(true) - .from_path(csv_file) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - let headers = rdr - .headers() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? - .clone(); - let mut jobs = Vec::new(); - - for result in rdr.records() { - let record = result.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let mut job_command = command_template.to_string(); - - for (i, header) in headers.iter().enumerate() { - let placeholder = format!("{{{}}}", header); - if let Some(value) = record.get(i) { - job_command = job_command.replace(&placeholder, value); - } - } - jobs.push(job_command); - } - - Ok(jobs) -} - -fn count_lines_in_file(file_path: &str) -> io::Result { - let content = std::fs::read_to_string(file_path)?; - Ok(content.lines().count()) -} - -fn calculate_batch_size(num_jobs: usize, batch_size: Option) -> usize { - batch_size.unwrap_or_else(|| { - let calculated = ((num_jobs as f64) * 0.2).ceil() as usize; - calculated.min(num_jobs) - }) -} - -fn submit_jobs_to_scheduler( - job_file_path: &str, - log_dir: &str, - memory_mb: u32, - threads: u32, - batch_size: usize, -) -> io::Result { - // Count the number of lines in the file to determine the job array size - let num_jobs = count_lines_in_file(job_file_path)?; - let job_array = format!("arrayify_job_array[1-{}]%{}", num_jobs, batch_size); - let output_log = format!("{}/job_%J_%I.out", log_dir); - let error_log = format!("{}/job_%J_%I.err", log_dir); - - // Generate the bsub command - let bsub_cmd = format!( - "bsub -J {} -n {} -M {} -R \"select[mem>{}] rusage[mem={}]\" -o {} -e {}", - job_array, threads, memory_mb, memory_mb, memory_mb, output_log, error_log - ); - - // Generate the script that uses `sed` to extract the job command from the file - let script = format!( - r#"#!/bin/bash - -INDEX=$((LSB_JOBINDEX - 1)) -COMMAND=$(sed -n "$((INDEX + 1))p" {}) -$COMMAND -"#, - job_file_path - ); - - // Submit the job using the bsub command - let child = Command::new("bash") - .arg("-c") - .arg(format!("echo '{}' | {}", script, bsub_cmd)) - .output()?; - - // Extract the job ID from the bsub output - let bsub_output = String::from_utf8_lossy(&child.stdout); - let re = Regex::new(r"Job <(\d+)>").unwrap(); - let job_id = re - .captures(&bsub_output) - .and_then(|cap| cap.get(1)) - .map(|m| m.as_str()) - .unwrap_or("unknown"); - - Ok(job_id.to_string()) -} - -fn write_job_log(log_file_path: &str, jobs: &[String]) -> io::Result<()> { - let mut log_file = File::create(log_file_path)?; - for job_command in jobs.iter() { - writeln!(log_file, "{}", job_command)?; - } - Ok(()) -} - -pub fn submit_jobs( - csv_file: &str, - command_template: &str, - log_dir: &str, - memory_gb: u32, - threads: u32, - batch_size: Option, -) -> io::Result<()> { - let memory_mb = memory_gb * 1000; - fs::create_dir_all(log_dir)?; - - let jobs = read_jobs_from_csv(csv_file, command_template)?; - if jobs.is_empty() { - eprintln!("No jobs found in CSV."); - return Ok(()); - } - - let timestamp = Local::now().format("%Y-%m-%d-%H-%M").to_string(); - let log_file_path = format!("{}/arrayify-{}.log", log_dir, timestamp); - write_job_log(&log_file_path, &jobs)?; - - let batch_size = calculate_batch_size(jobs.len(), batch_size); - let job_id = submit_jobs_to_scheduler(&log_file_path, log_dir, memory_mb, threads, batch_size)?; - - print_run_stats(jobs.len(), log_dir, log_file_path, &job_id); - Ok(()) -} - -fn print_run_stats(num_jobs: usize, log_dir: &str, log_file_path: &str, job_id: &str) { - let message = format!( - r#"🚀 Job submission complete! ✅ -🔖 Job ID is: {} -📌 {} jobs submitted. -📝 Job commands logged in: {} -📂 Logs can be found in: {} -📡 Track with - - arrayify check {}"#, - job_id, num_jobs, log_file_path, log_dir, job_id - ); - - println!("{}", message); -} - fn check_jobs(job_id: &str) { let output = Command::new("bjobs") .arg("-noheader") @@ -239,7 +98,15 @@ fn main() { match matches.subcommand() { Some(("sub", sub_matches)) => { - let csv_file = sub_matches.get_one::("csv").unwrap(); + let csv_file = sub_matches.get_one::("csv"); + let dir_path = sub_matches.get_one::("dir"); + + // Ensure only one of csv_file or dir_path is provided + if csv_file.is_some() && dir_path.is_some() { + eprintln!("Error: Cannot provide both --csv and --dir at the same time"); + std::process::exit(1); + } + let command_template = sub_matches.get_one::("command").unwrap(); let log_dir = sub_matches.get_one::("log").unwrap(); let memory_gb: u32 = sub_matches @@ -263,13 +130,24 @@ fn main() { }) .unwrap_or(None); - submit_jobs( - csv_file, + // Determine the input format and set input_path + let (format, input_path) = if let Some(csv) = csv_file { + (InputFormat::Csv, csv) + } else if let Some(dir) = dir_path { + (InputFormat::Directory, dir) + } else { + eprintln!("Error: Either --csv or --dir must be provided"); + std::process::exit(1); + }; + + submission::submit_jobs( + input_path, command_template, log_dir, memory_gb, threads, batch_size, + format, ) .expect("Job submission failed"); } @@ -280,59 +158,3 @@ fn main() { _ => {} } } - -#[cfg(test)] -mod tests { - use super::*; - use std::io::Write; - use tempfile::NamedTempFile; - - #[test] - fn test_read_jobs_from_csv() { - let mut csv_file = NamedTempFile::new().unwrap(); - writeln!(csv_file, "header1,header2\nvalue1,value2").unwrap(); - - let jobs = read_jobs_from_csv( - csv_file.path().to_str().unwrap(), - "echo {header1} {header2}", - ) - .unwrap(); - assert_eq!(jobs, vec!["echo value1 value2"]); - } - - #[test] - fn test_calculate_batch_size() { - assert_eq!(calculate_batch_size(10, None), 2); // 20% of 10, rounded up - assert_eq!(calculate_batch_size(10, Some(5)), 5); // Custom batch size - assert_eq!(calculate_batch_size(1, None), 1); // Minimum batch size - } - - #[test] - fn test_write_job_log() { - let log_file = NamedTempFile::new().unwrap(); - let jobs = vec!["job1".to_string(), "job2".to_string()]; - - write_job_log(log_file.path().to_str().unwrap(), &jobs).unwrap(); - - let contents = fs::read_to_string(log_file.path()).unwrap(); - assert!(contents.contains("job1")); - assert!(contents.contains("job2")); - } - - #[test] - fn test_submit_jobs_empty_csv() { - let mut csv_file = NamedTempFile::new().unwrap(); - writeln!(csv_file, "header1,header2").unwrap(); // Empty CSV - - let result = submit_jobs( - csv_file.path().to_str().unwrap(), - "echo {header1}", - "logs", - 1, - 1, - None, - ); - - assert!(result.is_ok()); - } -} diff --git a/src/submission.rs b/src/submission.rs new file mode 100644 index 0000000..7be365d --- /dev/null +++ b/src/submission.rs @@ -0,0 +1,191 @@ +use crate::jobs; +use chrono::Local; +use regex::Regex; +use std::fs::{self, File}; +use std::io::{self, Write}; +use std::process::Command; + +#[derive(Debug, Clone, Copy)] +pub enum InputFormat { + Csv, + Directory, + // Add new formats here in the future +} + +pub fn write_job_log(log_file_path: &str, jobs: &[String]) -> io::Result<()> { + let mut log_file = File::create(log_file_path)?; + for job_command in jobs.iter() { + writeln!(log_file, "{}", job_command)?; + } + Ok(()) +} + +pub fn calculate_batch_size(num_jobs: usize, batch_size: Option) -> usize { + batch_size.unwrap_or_else(|| { + let calculated = ((num_jobs as f64) * 0.2).ceil() as usize; + calculated.min(num_jobs) + }) +} + +fn count_lines_in_file(file_path: &str) -> io::Result { + let content = std::fs::read_to_string(file_path)?; + Ok(content.lines().count()) +} + +fn print_run_stats(num_jobs: usize, log_dir: &str, log_file_path: &str, job_id: &str) { + let message = format!( + r#"🚀 Job submission complete! ✅ +🔖 Job ID is: {} +📌 {} jobs submitted. +📝 Job commands logged in: {} +📂 Logs can be found in: {} +📡 Track with - + arrayify check {}"#, + job_id, num_jobs, log_file_path, log_dir, job_id + ); + + println!("{}", message); +} + +fn submit_jobs_to_scheduler( + job_file_path: &str, + log_dir: &str, + memory_mb: u32, + threads: u32, + batch_size: usize, +) -> io::Result { + // Count the number of lines in the file to determine the job array size + let num_jobs = count_lines_in_file(job_file_path)?; + let job_array = format!("arrayify_job_array[1-{}]%{}", num_jobs, batch_size); + let output_log = format!("{}/job_%J_%I.out", log_dir); + let error_log = format!("{}/job_%J_%I.err", log_dir); + + // Generate the bsub command + let bsub_cmd = format!( + "bsub -J {} -n {} -M {} -R \"select[mem>{}] rusage[mem={}]\" -o {} -e {}", + job_array, threads, memory_mb, memory_mb, memory_mb, output_log, error_log + ); + + // Generate the script that uses `sed` to extract the job command from the file + let script = format!( + r#"#!/bin/bash + +INDEX=$((LSB_JOBINDEX - 1)) +COMMAND=$(sed -n "$((INDEX + 1))p" {}) +$COMMAND +"#, + job_file_path + ); + + // Submit the job using the bsub command + let child = Command::new("bash") + .arg("-c") + .arg(format!("echo '{}' | {}", script, bsub_cmd)) + .output()?; + + // Extract the job ID from the bsub output + let bsub_output = String::from_utf8_lossy(&child.stdout); + let re = Regex::new(r"Job <(\d+)>").unwrap(); + let job_id = re + .captures(&bsub_output) + .and_then(|cap| cap.get(1)) + .map(|m| m.as_str()) + .unwrap_or("unknown"); + + Ok(job_id.to_string()) +} + +pub fn submit_jobs( + input_path: &str, + command_template: &str, + log_dir: &str, + memory_gb: u32, + threads: u32, + batch_size: Option, + format: InputFormat, +) -> io::Result<()> { + let memory_mb = memory_gb * 1000; + fs::create_dir_all(log_dir)?; + + // Read jobs based on the input format + let jobs = match format { + InputFormat::Csv => jobs::read_jobs_from_csv(input_path, command_template)?, + InputFormat::Directory => jobs::read_jobs_from_dir(input_path, command_template)?, + // Add new formats here in the future + }; + + if jobs.is_empty() { + eprintln!("No jobs found."); + return Ok(()); + } + + // Log the jobs + let timestamp = Local::now().format("%Y-%m-%d-%H-%M").to_string(); + let log_file_path = format!("{}/arrayify-{}.log", log_dir, timestamp); + write_job_log(&log_file_path, &jobs)?; + + // Submit jobs to the scheduler + let batch_size = calculate_batch_size(jobs.len(), batch_size); + let job_id = submit_jobs_to_scheduler(&log_file_path, log_dir, memory_mb, threads, batch_size)?; + + // Print run statistics + print_run_stats(jobs.len(), log_dir, &log_file_path, &job_id); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::NamedTempFile; + + #[test] + fn test_read_jobs_from_csv() { + let mut csv_file = NamedTempFile::new().unwrap(); + writeln!(csv_file, "header1,header2\nvalue1,value2").unwrap(); + + let jobs = jobs::read_jobs_from_csv( + csv_file.path().to_str().unwrap(), + "echo {header1} {header2}", + ) + .unwrap(); + assert_eq!(jobs, vec!["echo value1 value2"]); + } + + #[test] + fn test_calculate_batch_size() { + assert_eq!(calculate_batch_size(10, None), 2); // 20% of 10, rounded up + assert_eq!(calculate_batch_size(10, Some(5)), 5); // Custom batch size + assert_eq!(calculate_batch_size(1, None), 1); // Minimum batch size + } + + #[test] + fn test_write_job_log() { + let log_file = NamedTempFile::new().unwrap(); + let jobs = vec!["job1".to_string(), "job2".to_string()]; + + write_job_log(log_file.path().to_str().unwrap(), &jobs).unwrap(); + + let contents = fs::read_to_string(log_file.path()).unwrap(); + assert!(contents.contains("job1")); + assert!(contents.contains("job2")); + } + + #[test] + fn test_submit_jobs_empty_csv() { + let mut csv_file = NamedTempFile::new().unwrap(); + writeln!(csv_file, "header1,header2").unwrap(); // Empty CSV + + let result = submit_jobs( + csv_file.path().to_str().unwrap(), + "echo {header1}", + "logs", + 1, + 1, + None, + InputFormat::Csv, + ); + + assert!(result.is_ok()); + } +}