From Documentation
Jump to: navigation, search
Note: This page's content only applies to the SHARCNET's legacy systems; it does not apply to Graham.

Introduction

Throughput computing / serial and parallel farming

Definitions

Throughput computing is a method for solving computational problems in a short amount of time. The method involves decomposing a problem into independent work units, which are then submitted as a large number of independent jobs to a computer system and run concurrently in parallel. Key to this approach is the requirement that the jobs to contribute to the overall solution in parallel, without having to communicate any values amongst themselves during the calculation or return the results in a particular order. This is often referred to as decomposing the workload into independent chunks, embarrassingly parallel or perfectly parallel.

Serial farming is a concept closely related (essentially synonimous) to throughput computing. One can define it as running multiple instances of a serial code (serial jobs on a cluster), when the order of execution is not important (no data dependencies between the jobs). It can be trivially extended to cover batches of independent parallel jobs, which one might call “MPI farming”, “OpenMP farming”, “GPU farming” etc.

Why NOT to use MPI or a threading model

It's very important to note that while one could use an MPI wrapper or implement threads to partition and allocate the chunks, it is not necessary for throughput computation /serial farming and is actually less optimal than using a set of serial jobs. It's not robust (if one process/thread fails they will all fail) and it will typically take far longer for the system to start the job since it needs to free up a lot of processors at once, rather than having the jobs "fill the gaps" when other serial jobs vacate a slot on a shared node. As such, using a lot of serial jobs on systems that give them priority will lead to the fastest turnaround (time to solution) and should always be used when possible instead of writing a threaded or MPI master/slave wrapper around the non-communicating program. In other words, it is only necessary to move to MPI or threads when there is a significant amount of communication amongst the parallel threads or processes, which occurs during a significant portion of the program's runtime.

That being said, one can use a throughput job submission model with a threaded or MPI program ("parallel farming") if the underlying program necessitates or is justified by their use (see #Fully automated farming section here for how to set up both serial and parallel farming).

When to use

One common situation when serial/parallel farming is needed is when the code output depends on a few poorly constrained parameters, and the task is to either find a global solution(s), or the global maximum or minimum.

When the number of unknown parameters is small (say <5) one can attempt a brute force approach: running separate jobs corresponding to different points on a grid in the multi-dimensional parameter space.

How many grid points in each dimension depends on the expected properties of the solution: for a uni-modal situation one can use as few as three grid points per dimension. In a more general (multi-modal) case number of grid points will be determined by practical considerations (maximum number of cpu hours one can spare for the project).

In a larger dimensionality case (say >4) one can resort to Monte Carlo (random guessing) way of sampling the parameter space.

In either regular grid or Monte Carlo approaches, one can run just one batch of jobs, or run one, analyze the results, and then run the next batch, where one is zooming in onto the region(s) of interest in the parameter space.

Another typical situation when one needs serial farming is when one has to post-process multiple “snapshots” (from a time evolution) produced by a large parallel simulation code – to get some global quantitities, or perhaps to produce a movie. These tasks do not have data dependencies, so are perfect for serial/parallel farming.

Film.png

An example of the porting process

The basic concept of throughput computing / serial farming is to submit multiple independent instances of a program to compute a portion of the answer. This can be facilitated in a batch queuing job system by writing an auxiliary shell script or program that can be used to submit and collect the results. It may require pre or post-processing to address running the jobs concurrently. The following walk-through example illustrates the porting process with a C program, using BASH shell scripting, and goes through the following steps:

  • minor modifications to the original C program to compute a fraction of the total computation
  • writing a BASH shell script to submit the jobs
  • writing a second BASH shell script to accumulate the results after the jobs have completed

Not all of these steps are necessary and there are other ways of managing a large number of jobs (covered here in detail in the following sections), but the following example is a relatively straight-forward way to implement independent task parallelism in a portable way. There is no dependency on the underlying job management system or any other supporting libraries beyond access to the BASH shell. One could conceive of a more elaborate way to incorporate all of the required logic in the C program itself but that is beyond the scope of this tutorial.

Example C Code: Integrating PI

We begin with this simple C code that integrates arctan from 0:1 to calculate PI:

#include <stdio.h>
#include <math.h>
int
main(int argc, char *argv[])
{
 
  double mypi,h,sum,x;
  int n,i;
 
  n=1000000;
  h=1.0/n;
  sum=0.0;
 
  printf("Calculating PI:\n");
 
  for (i = 1; i <= n; i+=1 ) {
    x = h * ( i - 0.5 ) ;    //calculate at center of interval
    sum += 4.0 / ( 1.0 + pow(x,2) ) ;
  }
 
  mypi = h * sum;
 
  printf("%f\n",mypi) ;
 
  return 0;
}

Running it gives:

[sn_user@wha780 integration]$ cc pi_orig.c 
[sn_user@wha780 integration]$ ./a.out 
Calculating PI:
3.141593

Modifying the code to run in parallel

In order to solve this problem in parallel, we will partition the for loop into discrete chunks that each serial job will calculate, and then after all of the jobs are finished we can simply sum the individual answers to get the final answer. In particular, if we used 10 chunks, then each chunk would compute 0.1 of the integration path, ie, chunk one computes (0:0.1], chunk two computes (0.1:0.2], etc.

The modified C code, which reads the particular chunk to calculate and total number of chunks on the command line as standard input, follows:

#include <stdlib.h>
#include <stdio.h>
#include <math.h>
 
int
main(int argc, char *argv[])
{
 
  double mypi,h,sum,x;
  int n,i,chunk,tot_chunks,chunk_size,n_low,n_high;
 
  n=1000000;
  h=1.0/n;
  sum=0.0;
 
// check to make sure we've been given two arguments
 
  if (argc != 3) {
    printf("Usage:  pi.x chunk tot_chunks\n");
    exit(0);
  }
 
// break up number of total intervals based on the 
// number of chunks and assign loop indexes for this
// instance of the program to n_low and n_high
 
  chunk= atoi(argv[1]);
  tot_chunks= atoi(argv[2]);
  chunk_size=n/tot_chunks;
  n_low=(chunk-1)*chunk_size+1;
  n_high=chunk*chunk_size;
  if (chunk == tot_chunks) {
    n_high=n;
  }
  printf("Calculating PI for interval %d of %d from %d to %d \n", chunk,tot_chunks,n_low,n_high);
 
// modify the loop to use n_low and n_high
 
  for (i = n_low; i <= n_high; i+=1 ) {
    x = h * ( i - 0.5 ) ;    //calculate at center of interval
    sum += 4.0 / ( 1.0 + pow(x,2) ) ;
  }
 
  mypi = h * sum;
 
  printf("%f\n",mypi) ;
 
  return 0;
}

There are smarter ways to decompose this workload (ie. spreading out the remainder instead of allocating it all to the final chunk), but this should suffice to get started.

BASH Job Submission Script

This script submits a series of jobs to the batch queue system via sqsub. It specifies 2 additional arguments to the executable: the current iteration, and the total number of iterations. The iterations are indexed from 1:N . (For more examples of BASH loops, see the section #Command line below).

It is expected that the executable would use these input arguments to decide which portion of the total workload it accounts for. The logic associated with partitioning the workload should be addressed in the executable program.

The following variables should be set appropriately before calling this script:

  • DEST_DIR
    • path to the base directory for submission of the job
    • it should exist and contain the executable
  • EXENAME
    • name of the executable
  • NUM_ITERS
    • number of jobs to submit
    • eg. integration intervals
  • RUNTIME
    • how long to let the job run for, in sqsub format
  • OUTFILE
    • optionally change the format to something more meaningful

The script follows - it should be put in a file (I've used sub_script below) and made executable by issuing the command chmod +x sub_script:

 #!/bin/bash
 DEST_DIR=/work/sn_user/integration
 EXENAME=pi.x
 NUM_ITERS=20
 RUNTIME=10m
 echo "${DEST_DIR}/${EXENAME}"
 if [ -e "${DEST_DIR}/${EXENAME}" ]
 then
   cd ${DEST_DIR}
   for z_interval in `seq 1 ${NUM_ITERS}`; do
     echo "Submitting interval: ${z_interval} of ${NUM_ITERS}"
     OUTFILE="OUTPUT-${z_interval}.txt"
     sqsub -r ${RUNTIME} -q serial -o ${OUTFILE} ./${EXENAME} ${z_interval} ${NUM_ITERS} 
  done;
 else
   echo "couldn't find the above executable, exiting"
 fi

For brevity of the output, I've reduced NUM_ITERS to 4 and run the script with the modified program to illustrate. In practice, users will want to use a lot of chunks, in the tens to hundreds:

[sn_user@wha780 integration]$ pwd; ls; cc pi.c -o pi.x
/work/sn_user/integration
accumulate  pi.c  pi_orig.c  sub_script
[sn_user@wha780 integration]$ ./sub_script 
/work/sn_user/integration/pi.x
Submitting interval: 1 of 4
THANK YOU for providing a runtime estimate of 10m!
submitted as jobid 4109486
Submitting interval: 2 of 4
THANK YOU for providing a runtime estimate of 10m!
submitted as jobid 4109487
Submitting interval: 3 of 4
THANK YOU for providing a runtime estimate of 10m!
submitted as jobid 4109488
Submitting interval: 4 of 4
THANK YOU for providing a runtime estimate of 10m!
submitted as jobid 4109489
<wait 30 seconds...>
[sn_user@wha780 integration]$ sqjobs
[sn_user@wha780 integration]$ ls
accumulate  OUTPUT-1.txt  OUTPUT-2.txt  OUTPUT-3.txt  OUTPUT-4.txt  pi.c  pi_orig.c  pi.x  sub_script

Each of the OUTPUT* files will contain a portion of the answer, eg.

[sn_user@wha780 integration]$ cat OUTPUT-1.txt 
Calculating PI for interval 1 of 4 from 1 to 250000 
0.979915

------------------------------------------------------------
Sender: LSF System <lsfadmin@wha482>
<snip>

So we need to write a post-processing utility that can quickly sum up the answer if we're going to use a lot of chunks.

BASH Post-Processing Script

In this case we just need a program or script to read in the second line of each of the OUTPUT files, and then sum the results. In this case it's pretty straight-forward and doesn't take a long time to run, so instead of writing a full program we'll just use a BASH script. The following script will calculate the answer (note the use of the bc command - there are no floating point numbers in BASH):

#!/bin/bash
total=0;
value=0;
for file in OUTPUT*; do 
  value=`head -2 $file | tail -1`; 
  total=`echo $total + $value | bc`; 
done
echo $total;

Copying this into a file (I've chosen accumulate), setting it executable with chmod +x accumulate and then running it in the directory where I submitted the job returns the correct answer:

[sn_user@wha780 integration]$ ./accumulate 
3.141593

Semi-automated approaches

Overview

This section is for very casual serial/parallel farmers, and covers simple ways to organize your workflow when doing serial or parallel farming. In particular, it covers the BASH command line, BASH scripting, and Array Jobs approaches. For a fully automated approach, and for advanced serial farming features (like the ability to resubmit automatically all the jobs which failed or never ran, dynamic workload balancing, and meta-cluster - or grid - functionality), see the next section, #Fully automated farming.

Command line

sq* commands

In what follows we will be using extensively the SHARCNET's job manipulation scripts (sqsub, sqjobs, sqkill) plus BASH commands (loops etc.). For serial jobs, sqsub requires only two arguments (plus the path to the executable):

   [user@orc-login2 ~]$  sqsub  -r 1d  -o out.log  ./code

We will also need some optional arguments, like “--idfile”, (for the full options' list, execute “sqsub -h”):

   [user@orc-login2 ~]$  sqsub  --idfile=idfile

Here “idfile” is the name of the file which will store the jobid.

For loop (sequentially numbered files)

To submit a batch of serial jobs from the command line, using BASH' “for” loop command, one can do something like this:

   [user@orc-login2 ~]$  for ((i=0; i<25; i++)); do sqsub -r 1d -o out.${i}  ./code  ic.${i}; done

Here it is assumed that 25 different initial condition files, with names ic.0 ... ic.24, are provided, and that the code is executed as “./code ic.xxx”. It is very useful to use the “--idfile” option here; this will enable subsequent meta-job operations:

   [user@orc-login2 ~]$  \rm idfile; for ((i=0; i<25; i++)); do sqsub -r 1d -o out.${j} --idfile=id  ./code  ic.${i}; cat id >> idfile; done

Now we have file “jobid” containing all jobid's of the jobs in the batch. This file can be used to do query for the batch status:

   [user@orc-login2 ~]$  sqjobs `cat jobid`

And we can kill all the jobs in the batch with a single command:

   [user@orc-login2 ~]$  sqkill `cat jobid`

For loop (arbitrarily named files)

If we want to launch one job per each initial conditions file in the current directory (say, with names ic.*), we can use a different flavor of the “for” loop command:

   [user@orc-login2 ~]$  for name in ic.*; do sqsub -r 1d -o ${name}.log  ./code  ${name}; done

Here the standard output from each job will go to files “ic.*.log”. As before, one can add “--idfile” stuff to enable meta-job manipulation.

Handling standard input (FORTRAN)

For FORTRAN codes (which typically use standard input instead of command line arguments), the previous example can be re-written as follows:

   [user@orc-login2 ~]$  for name in ic.*; do sqsub -r 1d -o ${name}.log  -i ${name}  ./code; done

Here we use the sqsub option “-i input_file” to provide the file for the job's standard input.

While loop (cases table)

In another common scenario – when arguments to be used in serial farming are stored in one text file, one line per job, it is convenient to use BASH's “while” loop command. E.g. your code needs three numbers as its command line arguments. Your IC table (say “IC.dat”) might look like this:

     10 4.5 1.1e9
     20 3.7 7.5e8
     ...
     50 4.8 1.1e9

“While” loop can be used here as follows:

   [user@orc-login2 ~]$  i=0;  while read a b c; do i=$(($i+1)); sqsub -r 1d -o out.${i}  ./code $a $b $c; done < IC.dat

Here the standard output will go to files out.1 ... out.N (N – number of jobs). You can add “--idfile” as before. “read a b c” reads all the three columns from the file IC.dat to shell variables $a, $b, $c, one line at a time.

Using separate subdirectories

As the most complex (and perhaps the most realistic) scenario so far, let's consider the case when each job needs to run in a separate sub-directory (because it creates one or more output files with idential names). Expanding upon the previous example, we can accomplish this task as follows:

   [user@orc-login2 ~]$  i=0;  while read a b c; do i=$(($i+1)); mkdir RUN$i; cd RUN$i; sqsub -r 1d -o out ../code $a $b $c; cd ..; done < IC.dat

BASH scripting

Basics

The previous example shows that the command line approach has its limitations; it becomes too difficult to deal with a single line when the number of commands becomes too large. Instead, one can put all these commands in a text file – BASH script. Instead of using the command line's separator “;”, in a script one can place different commands on separate lines. The previous example, written as a BASH script, will become

#!/bin/bash
i=0  
while read a b c
  do
  i=$(($i+1))
  mkdir RUN$i 
  cd RUN$i
  sqsub -r 1d -o out ../code $a $b $c
  cd ..
  done < IC.dat

Here “#!/bin/bash” tells the OS that this is a BASH script (for Python you'd put “#!/usr/bin/python”). The script file (say, “metajob.sh”) should be made executable:

   [user@orc-login2 ~]$  chmod u+x metajob.sh

Then one can run it as follows:

   [user@orc-login2 ~]$  ./metajob.sh

Command line arguments

BASH scripts can accept command line arguments. E.g. the previous script can be modified to accept two arguments: the full path to the code, and the directory where the script should run (where the file IC.dat is located). One would run the script like this:

   [user@orc-login2 ~]$  ./metajob.sh  /home/user/bin/code  /work/user/case1

The required changes to the metajob.sh file are:

#!/bin/bash
# Reading the command line arguments:
CODE=$1
WD=$2
 
cd $WD
i=0  
while read a b c
  do
  i=$(($i+1))
  mkdir RUN$i 
  cd RUN$i
  sqsub -r 1d -o out $CODE $a $b $c
  cd ..
  done < IC.dat

Job dependencies

If you have N jobs that must be executed in specific order, one after another (i.e. i-th job can only start after (i-1)th job has completed), you can use the -w flag of sqsub in your submitting script, which allows you to specify which jobs must complete before the job being submitted can start.

#!/bin/bash
# number of jobs, adjust this to be the actual number of jobs in your run
N=5
 
# first submission
i=1
sqsub -r 2h -o run$i.log --idfile=jobid.$i your_executable.x
JOBID=`cat jobid.$i`
 
# subsequent submissions
for i in `seq 2 $N`;
do
 sqsub -w $JOBID -r 2h -o run$i.log --idfile=jobid.$i your_executable.x
 JOBID=`cat jobid.$i`
done

When using a script like this, you should implement a checking mechanisms which allows a job to check that the input it requires generated by previous jobs has in fact been generated correctly.


Array jobs / Job Arrays

Job arrays provide an alternative method to repeatedly calling the underlying submission program in a loop in order to submit a large number of similar jobs. Details of how the jobs are specialized, unfortunately, differ between job submission systems, something that can be a fairly significant impediment to their utilization.

The recommended way to use these options is to first run your standard sqsub command with the -d option. This will display information about how sqsub is calling the underlying qsub (Torque -- most of the clusters) or bsub (LSF) commands. This gives you a starting bsub or qsub command to add the appropriate job array options to.

Torque

Torque job array reference

Under torque, the -t range option is used with qsub to specify a job array, where range is a range of numbers (e.g., 1-4 or 2,4-5,7) for versions 2.3 or later and a single number for earlier versions. The details are

  1. a job is submitted for each number in the range (or the specified number of times for pre-2.3 versions),
  2. the specified constraints and what not are applied independently to each job,
  3. individuals jobs are referenced as jobid-number, and the entire array can be referenced as jobid for easy killing etc., and
  4. each jobs has PBS_ARRAYID set to its number which allows the script/program to specialize for that job.

LSF

LSF job array reference

Please note that currently the only cluster still using LSF scheduler is "requin".

Under LSF, the -J "id[range]%max" options is used with bsub to specify a job array, where id is a arbitrary identifier (e.g., myarray) and range is a range of number with a :step option (e.g., 1-4, 2,4-5,7, or 1-6:2) and max is the maximum number of jobs that should run at once (e.g., 4 or nothing at all for no limits). The details are

  1. a job is submitted for each number in the range,
  2. the specified constraints and what not are applied independently to each job,
  3. individuals jobs are referenced as jobid[number], the entire array can be referenced as jobid for easy killing etc., and
  4. like %J expands to jobid, %I expands to number, which can specialize input and output.

Fully automated farming

Overview

In this section we describe our suite of scripts designed to fully automate throughput computing (serial/parallel farming) in SHARCNET. The same set of scripts can be used with little or no modifications to organize almost any type of farming workflow, including

  • either "one case per job" mode or "many cases per job" mode, with dynamic workload balancing;
  • either single cluster mode or meta-cluster (grid) mode;
  • the ability to automatically resubmit all the jobs which failed or never ran;
  • the support for serial, multi-threaded (OpenMP), MPI, CUDA etc. farming.

Small number of cases

Overview

Let's call a single execution of the code in a serial/parallel farm a “case”. When the total number of cases, N_cases, is fairly small (say, <500) it is convenient to dedicate a separate job to each case – the way it was handled in the previous section (#Semi-automated approaches).

We created a set of BASH scripts utilizing both the “one case – one job” approach (described in this section; works for cases when the number of jobs is <500 or so) and “many cases per job” approach (described in #Large number of cases; best for larger number of cases). The scripts can be found on our clusters in the following directory:

  ~syam/Serial_farming/META

The two essential scripts are “submit.run” and “single_case.run”.

submit.run script

“submit.run” has two obligatory command line arguments. The first one is the path to the cases table, containing information (command line arguments, and/or the name(s) of the input file(s)) for your code, each line corresponding to a case to be executed. The second argument (when used in the “one case per job” mode) should be “-1”, e.g.

   [user@orc-login2 ~]$ ./submit.run  /work/user/dir/cases.dat  -1

single_case.run script

The other principal script, “single_case.run”, is the only script which might need customization. Its task is to read the corresponding line from the case table, parse it, and use these data to launch your code for this particular case. The version of the file provided literally executes one full line from the case table (meaning that the line should start with the path to your code, or with the code binary name if the binary is on your $PATH environment variable) in a separate subdirectory, RUNyyy (yyy being the case number).

“single_case.run”:

...
# ++++++++++++  This part can be customized:  ++++++++++++++++
#  $ID contains the case id from the original table
#  $COMM is the line corresponding to the case $ID in the original table, without the ID field
mkdir RUN$ID
cd RUN$ID
# Executing the command:
$COMM &> out.log
# Exit status of the code:
STATUS=$?
cd ..
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
...

Your “cases.dat” table can look like this:

 /home/user/bin/code1  1.0  10  2.1
 ./code2 < IC.2
 sleep 10m
 ...

In other words, any executable statement which can be written on one line can go there. Note: “submit.run” will modify your cases table once (will add line number at the beginning of each line, if you didn't do it yourself).

Handling code exit status

What is “$STATUS” for in “single_case.run”? It is a shell variable which should be set to “0” if your case was computed correctly, and >0 otherwise. It is very important: it is used by “resubmit.run” to figure out which cases failed, so they can be re-computed. In the provided version of “single_case.run”, $STATUS only reflects the exit code of your program. This likely won't cover all potential problems. (Some codes do not exit with non-zero status even if something went wrong.) You can always change or augment $STATUS derivation in “single_case.run”. E.g., if your code is supposed to create a new non-empty file (say, “out.dat”) at the very end of each case run, the existence of such a non-empty file can be used to judge if the case failed or not:

  STATUS=$?
  if test ! -s out.dat
     then
     STATUS=1
     fi

In the above example, $STATUS will be positive if either code exit status is positive, or "out.dat" file doesn't exists or is empty.

Output files

The “submit.run” script will generate some files in the current directory:

  • out.cluster.xxx files (one file per job): standard output from jobs;
  • status.yyy files (one file per case): files containing the status of processed cases; yyy is the case number in the original table.

Also, every "submit.run" script execution will create a unique subdirectory inside "/home/$USER/tmp" (it will be created if it doen't exist). Inside that subdirectory, some small scratch files (like files used by "lockfile" command, to serialize certain operations inside the jobs - see #Meta-cluster ("Grid") mode) will be created. These subdirectories have names "NODE.PID", where "NODE" is the name of the current node (typically a login node), and "PID" is the unique process ID for the script. Once the farm execution is done, one can safely erase this subdirectory.

Users normally don't need to access these files directly.

Auxiliary scripts

Other auxiliary scripts are also provided for your convenience.

  • “list.run” will list all the jobs with their current state for the serial/parallel farm (no arguments).
  • “query.run” will provide a one line summary (number of queued / running / done jobs) in the farm, which is more convenient than using “list.run” when the number of jobs is large. It will also “prune” queued jobs if warranted (see below).
  • “kill.run”: will kill all the running/queued jobs in the farm.
  • “prune.run”: will only kill queued jobs.
  • “Status.run” (capital “S”!) will list statuses of all processed cases.
  • "clean.run": will delete all the files in the current directory (including subdirectories if any present), except for *.run scripts and test*.dat files. Be very careful with this script! Note: the script will not restore *.run scripts to their default state (for that, you'd need to copy *.run scripts again from /home/syam/Serial_farming/META directory).

Resubmitting failed/not-run jobs

Finally, script “resubmit.run” is run the same way as “submit.run”, e.g.:

   [user@orc-login2 ~]$  ./resubmit.run  /work/user/dir/cases.dat  -1

“resubmit.run”:

  • will analyze all those status.* files (#Output files);
  • figure out which cases failed and which never ran for whatever reason (e.g. because of the 7d runtime limit);
  • create a new case table (adding “_” at the end of the original table name), which lists only the cases which still need to be run;
  • uses “submit.run” internally to launch a new farm, for the unfinished/failed jobs.

Notes: You won't be able to run “resubmit.run” until all the jobs from the original run are done or killed. If some cases still fail or do not run, one can resubmit the farm again and again, with the same arguments as before:

   [user@orc-login2 ~]$  ./resubmit.run  /work/user/dir/cases.dat  -1

Of course, if certain cases persistantly fail, then there must a be a problem with either your initial conditions parameters or files, or with your code (a code bug). It is convenient to use here the script "Status.run" (capital S!) to see a sorted list of statuses for all computed cases.

Meta-cluster ("Grid") mode

Previous material described the situation when you run your farm on a single cluster. This is the default behaviour when the current directory doesn't have file “clusters.lst”. If the file “clusters.lst” is present and is not empty, the scripts switch to the “meta-cluster” mode.

Proper “clusters.lst” file should contain a list of clusters which you wish to use for you serial farm, one cluster per line. E.g.:

   kraken
   saw
   redfin

The current (local) cluster doesn't have to be in the list.

When you both use “-1” argument (one case per job mode) and provide “clusters.lst” file, “submit.run” and “resubmit.run” will submit 2*N_cases jobs across all the clusters in the list. Only the first N_cases jobs to start will do the computations; the rest will get pruned or die instantly.

This is how it works: The very first job which runs (on any cluster) will request a case to process (case #1 initially). The second job to run will ask for the next available case (#2), and so on. To accomplish this, certain operations need to be serialized (only one job at a time can do it):

  • Read “currently served” case number from a file.
  • Update it (add “1”), and write it back to the file.

To serialize file-related operations, you need a special binary program, “lockfile”. It is included in the META package. It should be placed in a directory listed in your $PATH environment variable. You can accomplish this by

   [user@orc-login2 ~]$ mkdir ~/bin
   [user@orc-login2 ~]$ cp lockfile ~/bin
   [user@orc-login2 ~]$ echo 'export PATH=/home/$USER/bin:$PATH' >> ~/.bashrc
   [user@orc-login2 ~]$ . ~/.bashrc

Some clusters listed in your “clusters.lst” file might not be usable - either not accessible (ssh timeout), ot have unresponsive scheduler (sqsub never returns). Our scripts deal with such problems intelligently:

  • The script will first try to submit one job to each cluster listed in “clusters.lst”.
  • Any cluster where it takes more than 10s to either ssh to the cluster or to submit a job there with sqsub, will be automatically deleted from “clusters.lst”, and will not be used in the current and subsequent farms launched from this directory (until you add the cluster name again to the file "clusters.lst").
  • All the relevant numbers (number of jobs per cluster etc.) will be adjusted on the spot, so you'll get the requested number of jobs even if one or more clusters time out (you'll just end up submitting more jobs per cluster).
  • If all but one cluster in the list time out, the scripts will automatically switch from "meta-cluster" to regular (single cluster, with no overcomitted jobs) mode.
  • If all the clusters in "clusters.lst" time out, the script will exit with a warning and non-zero exit code.

What is the advantage of using the "grid" mode, as opposed to running your farm on a single cluster? SHARCNET has quite a few clusters of different sizes. Many of these clusters are largerly overlooked by users, either because they are smaller, or because they are contributed systems, where regular users' jobs have a lower priority than the jobs from contributors. This results in our largest cluster, orca, being much busier than most of other systems (also because orca is heavily subscribed by NRAC projects, which have the highest priority), so your serial farming (and any other) jobs will spend much longer time waiting in the orca's queue than they would on some other clusters. It is not easy to know a priory which of the clusters would be able to run your serial farming jobs soonest. The "meta-cluster" mode of our farming scripts solves this problem empirically: it submits more jobs than you plan to run, to multiple clusters, and when the least busy clusters will launch enough of your jobs, the rest will get "pruned" (removed from the scheduler). So you get your results sooner, both because you are detecting empirically the least busy clusters, and also because you run jobs on multiple clusters, many of which are rather small and wouldn't be useful if you tried to run the whole farm on just one of them.

Large number of cases

Overview

The “one case per job” works fine when the number of cases is fairly small (<500). When N_cases >> 500, the following problems arise:

  • There is a limit on the number of jobs submitted (for orca, it is 5000).
  • Job submission becomes very slow. (With 5000 jobs and ~2s per job submission, the submission will last ~3 hours!).
  • With very large number of cases, each case run is typically short. If one case runs for <30 min, you start wasting cpu cycles due to scheduling overheads (~30s per job).

The solution: instead of submitting a separate job for each case, one should submit a smaller number of "meta-jobs", each of which would process multiple cases. As cases can take different time to process, it is highly desirable to utilize a dynamic workload balancing scheme here.

This is how it is implemented:

Meta1.png

As the above diagram shows, "submit.run" script in the "many cases per job" mode will submit N jobs, with N being a fairly small number (much smaller than the number of cases to process). Each job would execute the same script - "task.run". Inside that script, there is a "while" loop, for different cases. Each iteration of the loop has to go through a serialized (only one job at a time can do that) portion of the code, where it figures out which next case (if any) to process. Then the already familiar script "single_case.run" (see section #single_case.run script) is executed - once per each case, which in turn calls the user code.

This approach results in dynamic workload balancing achieved across all the running "meta-jobs" belonging to the same farm. This can be seen more clearly in the diagram below:

Meta2.png

The dynamic workload balancing results in all meta-jobs finishing around the same time, regardless of how different the runtimes are for individual cases, regardless of how fast CPUs are on different nodes, and regardless of whether all "meta-jobs" start at the same time (as the above diagram shows), or start at different times (which would normally be the case). In addition, this approach is very robust: not all meta-jobs need to start running for all the cases to be processed; if a meta-job dies (due to a node crash), at most one case will be lost. (The latter can be easily rectified by running the "resubmit.run" script; see #Resubmitting failed/not-run jobs.)

To enable the “multiple cases per job” mode (with dynamic workload balancing), the second argument to “submit.run” script should be the desired number of “task.run” jobs, e.g.:

   [user@orc-login2 ~]$  ./submit.run  /work/user/dir/cases.dat  32

Not all of the requested meta-jobs will necessarily run (this depends on how busy the cluster(s) are). But as described above, in the "many cases per job" mode you will eventually get all your results regardless of how many meta-jobs will run. (You might need to run "resubmit.run", sometimes more than once, to complete particularly large serial farms).

If file “clusters.lst” is present, listing multiple clusters, one per line (which will enable the "meta-cluster" mode - see #Meta-cluster ("Grid") mode), the number of submitted jobs will be the above number times two, but is limited to 256*2=512 jobs.

Runtime problem

Here is one potential problem when one is running multiple cases per job, utilizing dynamic workload balancing: what if the number of running meta-jobs times the maximum allowed runtime per job in SHARCNET (7 days) is not enough to process all your cases? E.g., you managed to start the maximum allowed number of running jobs in SHARCNET, 256, each of which has the 7 day runtime limit. That means that your serial farm can only process all the cases in a single run if the average_case_runtime x N_cases < 256 x 7d = 1792 cpu days. (In less perfect cases, you will be able to run < 256 meta-jobs, resulting in even smaller number of cpu days your farm can process). Once your meta-jobs start hitting the 7d runtime limit, they will start dying in the middle of processing one of your cases. This will result in up to 256 interrupted cases calculations. This is not a big deal in terms of accounting (the "resubmit.run" will find all the cases which failed or never ran, and will resubmit them automatically). But this can become a waste of cpu cycles, because many of your cases are dying half-way through. On average, you will be wasting 0.5 x N_jobs x average_case_runtime cpu-days. E.g. if your cases have an average runtime of 2 days, and you have 256 meta-jobs running, you will waste ~256 cpu days, which would not be acceptable.

Fortunately, the scripts we are providing have some built-in intelligence to mitigate this problem. This is implemented in the "task.run" script as follows:

  • The script measures runtime of each case, and adds the value as one line in a scratch file "times" created inside /home/$USER/tmp/NODE.PID directory (see #Output files). This is done by all running meta-jobs, on all clusters involved.
  • Once the first 8 cases were computed, one of the meta-jobs will read the contents of the file "times" and compute the larger 12.5% quantile for the current distribution of case runtimes. This will serve as a conservative estimate of the runtime for your cases, t_runtime.
  • From now on, each meta-job will estimate if it has the time to finish the case it is about to start computing, by ensuring that t_finish - t_now > t_runtime. (Here t_finish is the time when the job will die because of the 7d runtime limit; t_now is the current time.) If it thinks it doesn't have the time, it will exit early, which will minimize the chance of a case computation aborted half-way due to the 7d runtime limit.
  • At every subsequent power of two number of computed cases (8, then 16, then 32 and so on) t_runtime is recomputed using the above algorithm. This will make t_runtime estimate more and more accurate. Power of two is used to minimize the overheads related to computing t_runtime; the algorith will be equally efficient for both very small (tens) and very large (many thousands) number of cases.

Simple usage scenario

Additional information

Passing additional sqsub arguments

What if you need to use additional sqsub arguments (like --mpp, -q mpi, -q threaded, -n etc.)? Simple: just add all those arguments at the end of “submit.run” and “resubmit.run” command line, and they will be passed to sqsub, e.g.:

   [user@orc-login2 ~]$  ./submit.run  test.dat  -1  --mpp 4G

You can also override the default job runtime value (encoded to be 7 days in “submit.run”), by adding a “-r” argument.

Multi-threaded farming

How to do “multi-threaded farming” (OpenMP etc.)? Add these sqsub arguments to “(re)submit.run”:

   -q threaded -n N

Here “N” is the number of cpu cores/threads to use. Nothing special needs to be done inside “single_case.run”: run the code as in the serial case.

MPI farming

What about “MPI farming”? Use these sqsub arguments with “(re)submit.run”:

   -q mpi  -n N  --nompirun

and add “mpirun” before the path to your code inside “single_case.run”, e.g.:

   mpirun  $COMM &> out.log

Alternatively, you can prepend “mpirun” on each line of your cases table:

  mpirun /path/to/mpi_code arg1 arg2
  mpirun /path/to/mpi_code arg1 arg2
  ...
  mpirun /path/to/mpi_code arg1 arg2

FORTRAN code example: using standard input

You have a FORTRAN serial code, “fcode”; each case needs to read a separate file from standard input – say “data.xxx” (in /work/user/IC directory), where xxx goes from 1 to N_cases. Place “fcode” on your $PATH (e.g., in ~/bin, make sure /home/$USER/bin is added to $PATH in .bashrc; alternatively, use a full path to your code in the cases table). Create the cases table (inside META directory) like this:

  fcode < /work/user/IC/data.1
  fcode < /work/user/IC/data.2
  ...
  fcode < /work/user/IC/data.N_cases

The task of creating the table can be greatly simplified if you use a BASH loop command, e.g.:

   [user@orc-login2 ~]$  for ((i=1; i<=10; i++)); do echo "fcode < /work/user/IC/data.$i"; done >table.dat

FORTRAN code example: copying files for each case

Finally, another typical FORTRAN code situation: you need to copy a file (say, /path/to/data.xxx) to each case subdirectory, before executing the code. Your cases table can look like this:

  /path/to/code
  /path/to/code
  ...

Add one line (first line in the example below) to your “single_case.run”:

   \cp /path/to/data.$ID .
   $COMM &> out.log
   STATUS=$?

Using all the columns in the cases table explicitly

The examples shown so far presume that each line in the cases table is an executable statement (except for the first column which is added automatically by the scripts and contains the line #), starting with either the code binary name (when the binary is on your $PATH) or full path to the binary, and then listing the code's command line arguments (if any) particular to that case, or something like " < input.$ID" if your code expects the initial conditions via standard input.

In the most general case, one wants to have the ultimate flexibility in being able to access all the columns in the table individually. That is easy to achieve by slightly modifying the "single_case.run" script:

...
# ++++++++++++  This part can be customized:  ++++++++++++++++
#  $ID contains the case id from the original table
#  $COMM is the line corresponding to the case $ID in the original table, without the ID field
mkdir RUN$ID
cd RUN$ID
 
# Converting $COMM to an array:
COMM=( $COMM )
# Number of columns in COMM:
Ncol=${#COMM[@]}
# Now one can access the columns individually, as ${COMM[i]} , where i=0...$Ncol-1
# A range of columns can be accessed as ${COMM[@]:i:n} , where i is the first column
# to display, and n is the number of columns to display
# Use the ${COMM[@]:i} syntax to display all the columns starting from the i-th column
# (use for codes with a variable number of command line arguments).
 
# Call the user code here.
...
 
# Exit status of the code:
STATUS=$?
cd ..
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
...

For example, you need to provide to your code both an initial conditions file (to be used via standard input), and a variable number of command line arguments. Your cases table will look like this:

  /path/to/IC.1 0.1
  /path/to/IC.2 0.2 10
  ...

The way to impliment this in "single_case.run" is as follows:

# Call the user code here.
/path/to/code ${COMM[@]:1} < ${COMM[0]} &> out.log

How to use contributed systems?

Some contributed systems (like brown and redfin) apparently still accept 7 day jobs, so use them as any other cluster. Many contributed systems (perhaps all in the future) will only accept short – up to 4h – jobs. To make full advantage of the contributed systems, add “-r 4h” argument to “(re)submit.run”, and either use “-1” mode of “(re)submit.run” if your cases take between 0.5 and 4 hours to run each, or use “many cases per job” mode if each case takes <0.5h.

Word of caution

  • Please use common sense when using the “meta-cluster” mode of the scripts.

Most of the jobs you submit should be expected to run (don't submit many more jobs than you might need - “just in case”). The “$Overcommit_factor” variable defined in “submit.run” (=2) was introduced to force you to be reasonably compliant with the above point. After submitting a farm to multiple clusters, regularly check its state with “query.run” script. It will keep you informed about the overall “health” of your farm, and if things don't look right you can do something about it. As importantly, “query.run” script will run “prune.run” script internally when it detects that you won't need the jobs which are still queued (because you already hit the maximum number of running jobs limit).

  • Make sure your code only accesses /work or /home. This is critical when you use the "meta-cluster" mode, as /work and /home are the only SHARCNET's file systems which are accessible from any compute node on any cluster.
  • Always start with a much smaller test farm run, to make sure everything works, before submitting a large production run farm.
  • Stay away from orca (when using meta-cluster mode): it is very busy because of NRAC jobs. Don't include requin (the scripts don't work there).