1843 Rate this article:
No rating

Using Spawn and Timers to Manage Concurrent Asynchronous Jobs

Brian Griglak

  A common complaint about IDL is its single threaded nature, and how you can’t peg all your CPUs by running a single PRO routine.  I’m not here to announce any new multi-threaded version of IDL, but instead discuss a very cool implementation that a colleague showed me that allows you to fork off multiple processes and manage their lifecycles from IDL.  You might argue that I could do this in Python, or even a simple shell script to a degree, but without IDL and ENVI’s rich support for data formats, it would difficult to implement a generic solution that can inspect your input data and determine how to partition the problem, nevermind reassemble to results into the final output product.

  The main component is IDL’s Spawn procedure, though the use is a little nonstandard.  The most common use of Spawn() is to build a command string, and then pass it into Spawn() along with positional parameters for stdout and maybe stderr.  The big problems here are that this is a blocking call and there is no parameter for stdin, so you end up writing a file to disk and adding redirection or a pipe to the command string.  The former may be overcome with the NOWAIT keyword, but that only works on Windows and I’m not entirely sure how you would capture the stdout or stderr output.  Instead we will use the UNIT keyword, which makes the call non-blocking and wraps a bidirectional pipe in a LUN, so you can easily send data to stdin and read the results of stdout.  Of course if we follow up the Spawn() call with a “while not EOF(lun)” loop then we gain nothing, as our code still effectively blocks until the external process has terminated and we’re done reading from its stdout.  Enter timers, which allow us to spawn multiple processes and revisit them until they are complete.  So my colleague created this class AsyncSpawner which will do just that, allowing you to enqueuer as many Spawn requests as you want, with a callback that will be invoked as the process spits stuff out on stdout.  We use a class to maintain a catalog of all the job requests that have been made and the state of the running processes.

The AsyncSpawn class has been trimmed down for this post, you would normally want a lot more error checking and configurability.  The “started” Boolean member indicates whether it should be running any timers to process events, which is controlled by the appropriately named method Start and Stop.  There is a configurable CONCURRENCY parameter passed into Init() that controls how many processes can run simultaneously.  The “queue” member is a List which stores the job requests in the order they are received.  There are two Hash members, one for the running jobs keyed on their shell’s process id and one for the timers used to query the processes’ stdout pipe, keyed on the timer id.

A client uses Enqueue to request a process to be spawned.  The first argument is the exact string you want passed to Spawn(), and the STDIN keyword is set to whatever string or string array you want written to the process’s stdin pipe.  The CALLBACK keyword specifies either a procedure name or object reference to a class with the ‘JobCallback’ prodedure.  In either case the callback procedure will be passed two arguments, a string message and a Dictionary containing the current job state.  The values of the input parameters are all stored in a Dictionary object (for case insensitivity and dot notation access to members), which is then added to the List member queue.

The main method is called ExecutiveLoop, which is initially invoked by Start and then subsequently by a timer using the special userdate value of -1.  If the “started” state is changed to False by the Stop method, then it won’t do anything.  When “started” is True it will dequeue as many job requests as it can and spawn them, until it hits the concurrency limit.  It will then set a new timer to call itself in 0.1 seconds.  When it dequeues a job request, it calls Spawn with the UNIT and PID keywords so that it can write and read from stdin and stdout, respectively, as well as to have the shell process id as a unique identifier.  Once any stdin data has been sent and flushed to stdin, a Dictionary is created to store the state of the job, which in turn is added to the “running” member Hash keying on the process id.  The job state includes to full command string requested, the stdin contents, the callback procedure name/object reference, the LUN of the process, the process id of the process, a List to contain all the stdout contents, and the start time of the process.  It is up to clients to not change any of this information when they implement the callback procedure, as that could break things.  After storing the job state, the callback procedure is invoked with the “spawned” message, and then a timer is started, using the job’s process id as the userdata.

The AsyncSpawn class implements a HandleTimerEvent method, which is what the timers invoke when they go off.  This method takes two arguments, the timer id and userdata, which is either the process id returned from Spawn or -1 to indicate ExecutiveLoop should be invoked again.  While it would be cleaner to have one timer and select on all the job LUNs, the FILE_POLL_INPUT() method doesn’t seem to work the way I’d like, at least on Windows.  So we resorted to separate timers for each job LUN, as well as one for ExecuteLoop.  When a positive value is passed in for userdata, then we invoke PollJobStatus with that process id.

The PollJobStatus method does just what it sounds like it should.  It gets the job state Dictionary from the “running” member, and checks if that job’s stdout LUN has reached its EOF marker.  If so then we free the LUN, remove the job from “running”, and invoke the callback with the “done” message and the final job state.  If we haven’t reached EOF, then we verify if there is activity on the pipe using FILE_POLL_INPUT(), and read a string from stdout.  That string is added to the job’s stdout state, and then the callback is invoked with an “updated” message and the current job state.

That pretty much describes the minimalist implementation of AsyncSpawn, here is the code:

function AsyncSpawn::Init, CONCURRENCY=concurrency
  compile_opt idl2
  self.started = !False
  self.concurrency = (N_Elements(concurrency) eq 1) ? (1>concurrency<20) : 1
  self.timers = Hash()
  self.running = Hash()
  self.queue = List()
  return, 1
end
 
pro AsyncSpawn::Cleanup
  compile_opt idl2
  self.Stop
end
 
pro AsyncSpawn::Start
  compile_opt idl2
  Timer.Block
  if (~self.started) then begin
    self.started = !True
    self.ExecutiveLoop
  endif
  Timer.Unblock
end
 
pro AsyncSpawn::Stop
  compile_opt idl2
  Timer.Block
  self.started = !False
  ; kill existing timers
  foreach t, self.timers.Keys() do !null = Timer.Cancel(t)
  self.timers = Hash()
  ; free luns for pipes to active jobs
  foreach job, self.running, pid do Free_Lun, job.unit
  Timer.Unblock
end
 
pro AsyncSpawn::Enqueue, cmd, STDIN=stdin, CALLBACK=callback
  compile_opt idl2
  if (~ISA(callback, /SCALAR, /STRING) && ~Obj_Valid(callback)) $
   
then Message, 'CALLBACK must be scalar string or object'
  if (~ISA(stdin, /STRING)) $
         then Message, 'STDIN must be string or string array'
  self.queue.Add, Dictionary('cmd', cmd, 'stdin', stdin, $
                                                                     'callback', callback)
end
 
pro AsyncSpawn::ExecutiveLoop
  compile_opt idl2
  ; this is the main loop, it runs queued jobs and manages the
  ; polling timers that watch for results on the jobs
  while (self.started && ~self.queue.IsEmpty() && $
         (self.running.Count() lt self.concurrency)) do begin
    req = self.queue.Remove(0)
    Spawn, req.cmd, UNIT=u, PID=pid, /NOSHELL, /HIDE
    ; print to the pipe, it will be read by stdin in the spawned process
    printf, u, req.stdin
    flush, u
    ; add job to running catalog
    job = Dictionary('cmd', req.cmd, 'stdin', req.stdin, 'unit', u, $
                     'callback', req.callback, 'pid', pid, $
                     'stdout', List(), 'start', SysTime(/Seconds))

    self.running[pid] = job

    if (Obj_Valid(req.callback)) then begin
      Call_Method, 'JobCallback', req.callback, 'spawned', job
    endif else begin
      Call_Procedure, req.callback, 'spawned', job
    endelse
    ; if not stopped, set a timer to inspect this new job
    if (self.started) then self.timers[Timer.Set(0.1, self, pid)] = !True
  endwhile
  ; if not stopped, set a timer to call myself
  if (self.started) then self.timers[Timer.Set(0.1, self, -1)] = !True
end
 
pro AsyncSpawn::PollJobStatus, pid
  compile_opt idl2
  job = self.running[pid]
  if (EOF(job.unit)) then begin
    Free_Lun, job.unit
    self.running.Remove, pid
    if (Obj_Valid(job.callback)) then begin
      Call_Method, 'JobCallback', job.callback, 'done', job
    endif else begin
      Call_Procedure, job.callback, 'done', job
    endelse
    return
  endif
  ; check if there is content on this job's stdout
  if (File_Poll_Input(job.unit, TIMEOUT=0) gt 0) then begin
    s = ''
    readf, job.unit, s
    job.stdout.Add, s
    if (Obj_Valid(job.callback)) then begin
      Call_Method, 'JobCallback', job.callback, 'updated', job
    endif else begin
      Call_Procedure, job.callback, 'updated', job
    endelse
  endif
  ; if not stopped, set a timer to reinspect this job
  if (self.started) then self.timers[Timer.Set(0.1, self, pid)] = !True
end
 
pro AsyncSpawn::HandleTimerEvent, id, pid
  compile_opt idl2
  self.timers.Remove, id
  if (pid eq -1) then begin
    self.ExecutiveLoop
  endif else begin
    self.PollJobStatus, pid
  endelse
end
 
pro AsyncSpawn__define
  compile_opt idl2
  !null = {AsyncSpawn,      $
           started: !False, $
           concurrency: 0L, $
           timers: Hash(),  $
           running: Hash(), $
           queue: List()    $
          }
end

The next step is to show this class in action.  I created a simple IDL procedure which can be run from the command line using “idl –e” syntax (http://www.exelisvis.com/docs/command_line_options_for.html#-e).  It’s called ProcessRasterTile and it has three input keywords: URI is a raster filename to load, SUB_RECT is a spatial subset to process, and INDEX is the spectral index name to calculate.  It will launch ENVI, load the raster, spatially subset it, and calculate the requested spectral index.  It then gets a temporary filename and exports the spectral index raster to it, and prints the filename and a special “OUTPUT=” prefix that can be identified by the spawn callbacks.  This isn’t a process you really need parallelize, but this is just a short example of what is possible with this approach.  I did add some error handling, where any error message from any API call is printed to stdout before returning.

pro ProcessRasterTile, URI=uri, SUB_RECT=subRect, INDEX=index

  compile_opt idl2
 
  nv = ENVI(/HEADLESS)
  oRaster = nv.OpenRaster(uri, ERROR=error)
  if (StrLen(error) gt 0) then begin
    print, 'Error opening raster: ' + error
    return
  endif
 
  oSubRaster = ENVISubsetRaster(oRaster, SUB_RECT=subRect, ERROR=error)
  if (StrLen(error) gt 0) then begin
    print, 'Error subsetting raster: ' + error
    return
  endif
 
  oIndexRaster = ENVISpectralIndexRaster(oSubRaster, INDEX=index, ERROR=error)
  if (StrLen(error) gt 0) then begin
    print, 'Error calculating spectral index: ' + error
    return
  endif
 
  outFilename = nv.GetTemporaryFilename('dat')
  oIndexRaster.Export, outFilename, 'ENVI'
 
  print, 'OUTPUT=' + outFilename
  nv.close
end

The next part is callback, which I chose to implement as a class since I need to maintain state of multiple processes until they are all complete, and I didn’t want to use common block variables to do that.  I called this class SpawnCallbackHandler, since that is its job.  This class has a Boolean “done” state, which was needed to avoid garbage collection in my simple test driver.  It also has two Hash members, “jobs” is for active jobs and “finishedJobs” is for jobs that have completed.  The main purposed of this class is its JobCallback method, which is invoked by the SpawnAsync class.  When it gets a “spawned” message it adds new job to its “jobs” hash, using the PID as the key.  When it gets an “updated” message it updates the job Dictionary in the “jobs” hash, and when it gets a “done” message it moves the job Dictionary from the “jobs” hash to the “finishedJobs” hash.  There is some error checking to make sure the PIDs are unique and only in the correct hash.  It then checks if there are no running jobs, only finished jobs, and in that case it calls its AssembleFinalProduct method, a more robust version would make this a callback procedure that takes the “finishedJobs” hash as input.

The AssembleFinalProduct method iterates over all the finished jobs, looking for the line in their stdout list that has that special “OUTPUT=” prefix.  Each of these are catalogued, so they can then be passed into ENVI::OpenRaster() to load all the NDVI tiles.  I then create an ENVIMosaicRaster(http://www.exelisvis.com/docs/ENVIMosaicRaster.html), which is exported to another temporary filename that is printed out so you know what it is.

function SpawnCallbackHandler::Init

  compile_opt idl2
  self.done = !False
  self.jobs = Hash()
  self.finishedJobs = Hash()
  return, 1
end
 
pro SpawnCallbackHandler::Cleanup
  compile_opt idl2
  foo = 1
end
 
function SpawnCallbackHandler::Done
  compile_opt idl2
  return, self.done
end
 
pro SpawnCallbackHandler::JobCallback, msg, job
  compile_opt idl2
  case msg.ToLower() of
    'spawned': begin
      if (self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
        Message, 'Already know about job with PID ' + StrTrim(job.pid, 2)
      endif
      self.jobs[job.pid] = job
    end
    'updated': begin
      if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
        Message, 'Update to unknown job with PID ' + StrTrim(job.pid, 2)
      endif
      self.jobs[job.pid] = job
    end
    'done': begin
      if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
        Message, 'Completion of unknown job with PID ' + StrTrim(job.pid, 2)
      endif
      self.jobs.Remove, job.pid
      self.finishedJobs[job.pid] = job
    end
  endcase
  ; are they all done yet?
  if (self.jobs.IsEmpty() && ~self.finishedJobs.IsEmpty()) then begin
    self.AssembleFinalProduct
  endif
end
 
pro SpawnCallbackHandler::AssembleFinalProduct
  compile_opt idl2
  jobUrls = List()
  foreach job, self.finishedJobs do begin
    foreach line, job.stdout do begin
      if (line.StartsWith('OUTPUT=')) then begin
        jobUrls.Add, (line.Substring(7)).Trim()
      endif
    endforeach
  endforeach
  ; launch ENVI and load all the output rasters
  nv = ENVI(/HEADLESS)
  tiles = List()
  foreach url, jobUrls do begin
    tiles.Add, nv.OpenRaster(url)
  endforeach
  ; mosaic the rasters and export the result
  mosaic = ENVIMosaicRaster(tiles.ToArray())
  outFilename = nv.GetTemporaryFilename('dat')
  mosaic.Export, outFilename, 'ENVI'
  print, outFilename
  nv.close
  self.done = !True
end
 
pro SpawnCallbackHandler__define
  compile_opt idl2
 
  !null = {SpawnCallbackHandler,  $
           done: !False,          $
           jobs: Hash(),          $
           finishedJobs: Hash()   $
          }

end

My test driver code for this is as follows.  Since this example doesn’t have any persistence, I had to use the SpawnCallbackHandler::Done() method to keep it and the AsyncSpawn object alive until everything was complete.  If you used an approach like this in a widget based application you wouldn’t need that method.

pro test_parallelRasterTile

  compile_opt idl2
 
  filename = 'C:\Program Files\Exelis\ENVI53\data\qb_boulder_msi'
  subRect = [[0, 0, 511, 511], [512, 0, 1023, 511], $
             [0, 512, 511, 1023], [512, 512, 1023, 1023]]
  index = 'NDVI'
 
  spawner = AsyncSpawn(CONCURRENCY=4)
  handler = SpawnCallbackHandler()
 
  for i = 0, 3 do begin
    cmd = 'C:\Program Files\Exelis\IDL85\bin\bin.x86_64\idl.exe -e ' + $
          '"ProcessRasterTile, URI=''' + filename + ''', SUB_RECT=[' + $
          StrJoin(StrTrim(Reform(subRect[*,i]), 2), ',') + '], INDEX=''' + index + '''"'
    spawner.Enqueue, cmd, STDIN='', CALLBACK=handler
  endfor
  spawner.Start
  while (~handler.Done()) do begin
    Wait, 0.5
  endwhile
end

Please login or register to post comments.