Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[package]
name = "arrayify"
version = "0.1.1"
version = "0.1.2"
edition = "2024"

[dependencies]
csv = "1.1"
clap = { version = "4.0", features = ["derive"] }
regex = "1"
tempfile = "3.17.1"
chrono = "0.4.40"

[[bin]]
name = "arrayify"
Expand Down
2 changes: 1 addition & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::{Arg, ArgMatches, Command as ClapCommand};

pub fn parse_args() -> ArgMatches {
ClapCommand::new("arrayify")
.version("0.1.1")
.version("0.1.2")
.author("Sam Dougan")
.about("Submits and checks bsub job arrays from a CSV file")
.subcommand_required(true)
Expand Down
49 changes: 30 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use regex::Regex;
use std::fs::{self, File};
use std::io::{self, Write};
use std::process::Command;
use chrono::Local;

#[derive(Subcommand)]
enum SubCommands {
Expand Down Expand Up @@ -41,40 +42,49 @@ fn read_jobs_from_csv(csv_file: &str, command_template: &str) -> io::Result<Vec<
Ok(jobs)
}

fn count_lines_in_file(file_path: &str) -> io::Result<usize> {
let content = std::fs::read_to_string(file_path)?;
Ok(content.lines().count())
}

fn calculate_batch_size(num_jobs: usize, batch_size: Option<usize>) -> usize {
batch_size.unwrap_or_else(|| {
let calculated = ((num_jobs as f64) * 0.2).ceil() as usize;
calculated.min(num_jobs)
})
}

fn submit_jobs_to_scheduler(jobs: &[String], log_dir: &str, memory_mb: u32, threads: u32, batch_size: usize) -> io::Result<String> {
let num_jobs = jobs.len();
fn submit_jobs_to_scheduler(job_file_path: &str, log_dir: &str, memory_mb: u32, threads: u32, batch_size: usize) -> io::Result<String> {
// Count the number of lines in the file to determine the job array size
let num_jobs = count_lines_in_file(job_file_path)?;
let job_array = format!("arrayify_job_array[1-{}]%{}", num_jobs, batch_size);
let output_log = format!("{}/job_%J_%I.out", log_dir);
let error_log = format!("{}/job_%J_%I.err", log_dir);

// Generate the bsub command
let bsub_cmd = format!(
"bsub -J {} -n {} -M {} -R \"select[mem>{}] rusage[mem={}]\" -o {} -e {}",
job_array, threads, memory_mb, memory_mb, memory_mb, output_log, error_log
);

let mut script = String::new();
script.push_str("#!/bin/bash\n\nINDEX=$((LSB_JOBINDEX - 1))\n\n");
script.push_str("JOBS=(");
for job in jobs {
script.push_str(&format!("\"{}\" ", job));
}
script.push_str(")\n\n");
// Generate the script that uses `sed` to extract the job command from the file
let script = format!(
r#"#!/bin/bash

script.push_str("COMMAND=${JOBS[$INDEX]}\n");
script.push_str("$COMMAND\n");
INDEX=$((LSB_JOBINDEX - 1))
COMMAND=$(sed -n "$((INDEX + 1))p" {})
$COMMAND
"#,
job_file_path
);

// Submit the job using the bsub command
let child = Command::new("bash")
.arg("-c")
.arg(format!("echo '{}' | {}", script, bsub_cmd))
.output()?;

// Extract the job ID from the bsub output
let bsub_output = String::from_utf8_lossy(&child.stdout);
let re = Regex::new(r"Job <(\d+)>").unwrap();
let job_id = re
Expand All @@ -88,8 +98,8 @@ fn submit_jobs_to_scheduler(jobs: &[String], log_dir: &str, memory_mb: u32, thre

fn write_job_log(log_file_path: &str, jobs: &[String]) -> io::Result<()> {
let mut log_file = File::create(log_file_path)?;
for (index, job_command) in jobs.iter().enumerate() {
writeln!(log_file, "[{}] {}", index + 1, job_command)?;
for job_command in jobs.iter() {
writeln!(log_file, "{}", job_command)?;
}
Ok(())
}
Expand All @@ -111,12 +121,13 @@ pub fn submit_jobs(
return Ok(());
}

let batch_size = calculate_batch_size(jobs.len(), batch_size);
let job_id = submit_jobs_to_scheduler(&jobs, log_dir, memory_mb, threads, batch_size)?;

let log_file_path = format!("{}/arrayify-{}.log", log_dir, job_id);
let timestamp = Local::now().format("%Y-%m-%d-%H-%M").to_string();
let log_file_path = format!("{}/arrayify-{}.log", log_dir, timestamp);
write_job_log(&log_file_path, &jobs)?;

let batch_size = calculate_batch_size(jobs.len(), batch_size);
let job_id = submit_jobs_to_scheduler(&log_file_path, log_dir, memory_mb, threads, batch_size)?;

print_run_stats(jobs.len(), log_dir, log_file_path, &job_id);
Ok(())
}
Expand Down Expand Up @@ -284,8 +295,8 @@ mod tests {
write_job_log(log_file.path().to_str().unwrap(), &jobs).unwrap();

let contents = fs::read_to_string(log_file.path()).unwrap();
assert!(contents.contains("[1] job1"));
assert!(contents.contains("[2] job2"));
assert!(contents.contains("job1"));
assert!(contents.contains("job2"));
}

#[test]
Expand Down