HOW-TO Process data in a distributed (parallel) way

Summary

Here are some of the actions that one may want to do when Processing using the Distributed Processing Unit (DPU). These actions are described in this HOW-TO.

  • Submit you own jobs to the queue (either from AWE or by using a webservice)
  • Inspect the status of your jobs or a DPU in general
  • Request which tasks (sequence of tasks) the dpu recognizes
  • Request which options a task (sequence of tasks) recognizes.
  • Using your local (changed) code when processing remotely
  • Cancel jobs
  • Obtain the logs of your jobs

At the AWE prompt an object is available which is used to run jobs on the DPU. Public methods for this object are:

  • dpu.get_jobids(): Return the list of identifiers of your jobs known to the DPU
  • dpu.run(<arguments>): Submit jobs
  • dpu.get_dpu_identifiers(): Returns a list of all known DPU’s.
  • dpu.show_dpu_identifiers(): Print the above, more verbose
  • dpu.select_dpu(): Select a different DPU
  • dpu.get_sequence_identifiers(): Returns a list of known task sequences
  • dpu.show_sequence_identifiers(): Print the above, more verbose
  • dpu.get_sequence_options(<sequence identifier>): Returns a list of all arguments that are recognized for the given sequence identifier
  • dpu.show_sequence_identifiers(sequence identifier): Print the above, more verbose
  • dpu.get_job_result(<jobid>): If results have been committed, returns a list of the main created objects
  • dpu.get_logs([<job identifier(s)>]): Return all (no argument) logs or the logs of the specified job(s)
  • dpu.cancel_job(<jobid>): Cancels a job (as long as it is not yet running)

In the Environment with key dpu_name it is defined which DPU to use. This can be changed to select a different DPU as default. To change the DPU in an AWE session use the method dpu.select_dpu().

Viewing the queue

For each Distributed Processing Unit (DPU) there is a webpage available which displays its queue. These pages can be found from the homepage, under Astro-WISE Information System > Processing Grid > Cluster Queues.

On the page you can inspect the status of your jobs. Shown are among others, user information, job status and running time. Here is the webpage for the DPU in Groningen:

(click on the links in the lower left part of the screen to view the queue)

https://dpu.hpc.rug.astro-wise.org/

Processing in AWE

From the awe-prompt it is possible to process your data remotely and in a distributed fashion. When you start up the interpreter an instance of the class used for this (the Processor class), is automatically generated and assigned to the variable “dpu”. So, when you start AWE you will see something like this:

Python 3.5.1 (default, Oct 15 2017, 09:47:37)
[GCC 4.8.5] on linux

Type "help", "copyright", "credits" or "license" for more information.


               Welcome to the Astro-WISE Environment


Importing Astro-WISE packages. Please wait...

Initializing Distributed Processing Unit...

Current profile:
- username : <username>
- database : db.astro.rug.astro-wise.org
- project  : <project>
- current privileges : 2 (Project)

awe>

The message about the Distributed Processing Unit indicates that a Processor instance (called “dpu”) has been created.

The dpu can be asked which sequence identifiers it recognises:

awe> dpu.get_sequence_identifiers()
['ReadNoise', 'DarkCurrent', 'Bias', 'HotPixels', 'Gain', 'Background',
'TwilightFlat', 'Shutter', 'NightSkyFlat', 'FringeFlat', 'QuickCheck',
'DomeFlat', 'MasterFlat', 'ColdPixels', 'Photom', 'Variability',
'Reduce>Photcat', 'Photcat>Photom', 'Monitoring', 'MDia', 'Reduce>Photom',
'Photcat', 'SourceList', 'Reduce>Astrometry', 'Reduce>Regrid',
'AssociateList', 'Reduce>GAstrometry>Coadd', 'Reduce>GPhotometry>Regrid',
'Coadd', 'GPhotometry', 'Reduce>Coadd', 'GAstrometry',
'Reduce>GAstrometry>Regrid', 'GAstrom', 'Reduce>GPhotometry>Coadd',
'Astrometry', 'GPhotom', 'Regrid>Coadd', 'GAstromSL', 'Regrid', 'Reduce',
'Reference', 'CrossTalk', 'PSF', 'KidsCatMatched', 'Convolve',
'Gaussianize', 'Null>Null', 'GalPhot', 'ExportForESO',
'CrossTalkCorrectedFrame', 'ESOID', 'KidsCat', 'Null', 'Gaussianize>GaAP',
'RawFitsInspect', 'ApertureCorrection', 'RawFrame', 'Null>Null>Null',
'GaAP', 'GalFit', 'BPZ', 'Extinction', 'Target', 'EuclidLIS',
'EuclidExport', 'EuclidImport', 'EuclidLISTest']

The dpu can also be asked which options a particular task takes:

awe> dpu.get_sequence_options('HotPixels')
[['i', 'c', 'd', 'tpl', 'p', 'C', 'instrument', 'chip', 'date', 'template', 'pars', 'commit']]

More verbose:

awe> dpu.show_sequence_options('HotPixels')
Recognized options for the astro.recipes.HotPixels OptionsParser are:
"i" or "instrument" (default: )
"c" or "chip" (default: )
"d" or "date" (default: )
"tpl" or "template" (default: None)
"p" or "pars" (default: {})
"C" or "commit" (default: False)

Examples of running tasks on the DPU:

awe> dpu.run('DomeFlat', i='WFI', d='2000-04-28', f='#842')                     # all CCDs
awe> dpu.run('DomeFlat', i='WFI', d='2000-04-28', f='#842', c='ccd50')          # one CCD
awe> dpu.run('DomeFlat', i='WFI', d='2000-04-28', f='#842', C=1)                # commit results
awe> dpu.run('Reduce', i='WFI', o='CDF4_B_?', d='2000-04-29', f='#845')         # OBJECT queries support wildcard
awe> dpu.run('Astrometry', i='OMEGACAM', red_filenames=['filename1.fits'],      # process params
             pars={'AstrometricParameters.process_params.REFCAT': 'USNO-B1.0'})

Tasks may have processing parameter options (pars). See HOW-TO Configure process parameters for how to use these.

The run() method will print a job identifier for every job submitted to the queue of the Distributed Processing Unit (DPU):

[schmidt] 09:55:55 - Calling: Processor.run(ReadNoise, instrument= , i=WFI ,
 c=ccd50 , d=2000-04-28)
[schmidt] 09:55:55 - Estimated process time : 10 seconds
[schmidt] 09:55:55 - Sending job with identifier 82382 to DPU

These jobids can be used to retrieve logs of finished jobs (see the next section). Jobids of jobs you submitted can be requested with this command:

awe> dpu.get_jobids()
[84820]

A concise status overview for your jobs can be requested as well:

awe> dpu.get_status()
[schmidt] 13:37:11 - Jobid 84820 has status FINISHED N/E/A/S/U 75/0/0/0/0

Using your local (changed) code when processing remotely

When you run a job using the DPU and you have a local checkout of the code, this code is shipped to the computing cluster and used there, except when the option send_code=False is given to the dpu.run method.

Options

Several options can be specified for dpu.run:

  • dpu_time: override any dpu_time (expected processing time) derived by the tasks themselves
  • dpu_mem: specifying the memory required may influence which nodes are used
  • dpu_aweversion: version (master or develop) of binaries to use on the compute cluster
  • send_code: send your own version of the Python code (located in directory AWEPIPE) to use on the DPU
  • return_jobid: return jobid for accounting purposes

Logs and job identifiers

In an active AWE session it is possible to ask the Processor to return logs:

awe> dpu.get_logs()
12:22:16 - ++++++++++++++++++++++++++++++++++++++++++++
12:22:16 - job_status = 0
12:22:16 - ++++++++++++++++++++++++++++++++++++++++++++
12:22:16 - 12:10:00 - Querying database for instances of class RawBiasFrame
12:10:22 - Running : imcopy 'OCAM.2005-09-07T07:43:44.905_3.fits[1]' 'tmpdTqMxN.fits'
12:10:22 - Running : imcopy 'OCAM.2005-09-07T07:44:36.165_3.fits[1]' 'tmpz6bkOr.fits'
12:10:23 - Making ReadNoise object
12:10:23 - Using RawBiasFrame OCAM.2005-09-07T07:43:44.905_3.fits
12:10:23 - Using RawBiasFrame OCAM.2005-09-07T07:44:36.165_3.fits
12:10:23 - Computing difference...
12:10:23 - Estimating rms iteratively...
12:10:23 - Maximum number of iterations : 5
12:10:23 - Rejection threshold :    5.0 (sigma)
12:10:24 - The read noise (ADU) for ccd ESO_CCD_#67 is :   2.34
12:10:24 - Difference between biases is :  -0.077 (mean),   0.00 (median)

The returned lines are also written to a single (“date+time”.log) file in your local directory per awe-prompt session.

Cancelling jobs

Jobs can be cancelled from the awe-prompt using the following command:

awe> dpu.cancel_job(313)

or

awe> for jobid in dpu.get_jobids():
         dpu.cancel_job(jobid)

I.e. the argument is the job identifier of the job you want to cancel.