1895 Rate this article:
5.0

Stick a Fork in It

Process data faster using ESE and fork-join

Doug Edmundson

Last time I blogged on the ESE API, which makes it easy for IDL programs to make use of the ENVI Services Engine.  (See the March 26, 2015 post titled “The ESE API for IDL”.)  Typically one will want to use ESE to run PRO-code remotely, on a cluster that has the close proximity to the data and a lot of computational power.  However, ESE can also run on a single-user machine just the same.

As you’ll recall, the ESE API makes it easy to write PRO code to run ESE tasks, monitor them and get results.  All the complexity of making HTTP requests and such is handled by the API.  This leads to PRO code that feels natural.  For example:

oTask = ESE.findTask( 'penny64', 'ese_addition' )

oJob = oTask.run( a = 1, b = 2 )

print, oJob.answer

This runs the addition task on a remote machine named “penny64”.  Simple.  So let’s try something less simple.  Let’s use ESE to parallelize a computation, thereby finding the answer faster.

We are going to use the fork-join coding pattern.  This involves:

1) Splitting the data into independent chunks

2) Processing chunks in parallel

3) Merging the results

The example we are going to use is trivial, but illustrates the point.  Instead of using IDL’s “total” function, we are going to write our own that uses ESE for the parallelization part of fork-join.  First, consider this task:

pro sum, a = a, b = b, sum = sum

  sum = a + b

end

It’s simple, so just imagine that this is an amazingly brilliant, time consuming algorithm suitable to the kind of data you work with.  This is the one that will get you the Nobel Prize... in computer science.  Shucks.

Now, examine the client code that computes the total, using the ESE API, running the task in parallel, on chunks of the data:

pro fork_join_total

 

  ; generate the data

 

  n = 20      ; an even number, please

  m = n / 2

 

  data = indgen( n )

 

  ; get the task that simply adds two numbers

 

  oTask = ESE.findTask( 'penny64', 'sum' )

 

  ; run the task on pairs of numbers

  ;

  ; Several pairs are summed at the same time.  This is where we get our

  ; performance improvement over doing them one at a time, linearly.

 

  oJobs = objarr( m )

  for i = 0, m - 1 do begin

    x = 2 * i

    oJobs[ i ] = oTask.run( a = data[ x ], b = data[ x + 1 ], /async )

  endfor

 

  ; wait for all jobs to complete

 

  status = ESE.join( oJobs )

 

  ; merge the results from all the jobs that ran independently

 

  sumTotal = 0

  for i = 0, m - 1 do begin

    sumTotal += oJobs[ i ].sum

  endfor

 

  print, sumTotal

 

end

 

The key bits of code that should grab your eye are:

 

oJobs[ i ] = oTask.run( a = data[ x ], b = data[ x + 1 ], /async )

and


status = ESE.join( oJobs )

That first line comes from the loop that hands off chunks of data to the “sum” algorithm.  Note the “/async”.  This allows the loop to continue iterating over the chunks of data, firing off more calls to the algorithm before the prior ones have finished.

That other line blocks the client application until all externally running jobs are completed.  There are options to ignore failures (and keep processing) and to report job status via a callback mechanism.  Those features are described in the documentation.  Note that we’re not doing error checking, which the “status” variable could help with.

There is actually another important line that should draw your attention:

sumTotal += oJobs[ i ].sum

This comes from the loop that merges the results of each independent jobs.  This is the standard pattern in the fork-join model.  Split the data.  Run an algorithm independently on the chunks.  Wait.  Merge the results into something more meaningful.

You may have noticed that fork-join sounds a lot like map-reduce (Hadoop style).  There are differences, but the basic idea is the same: divide and conquer.  If your data, algorithm and computer system allow for it, fork-join should decrease the total time to perform a computation.

When might it not pay off?  If concurrently running tasks overly compete for system resources then one may not see benefits.  This is highly system dependent, as machines will vary in disk, network and CPU performance.  Performance is also dependent upon the algorithm.  For example, if the computation spends most of its time reading and writing to disk (it’s I/O bound) then the disk and network will be the limiting factors.

To show how performance might improve with parallelization, our task has been modified to invoke FFT.  This uses more of the CPUs, slowing down the computation, and hence better simulates a real-world computation:

pro sum, a = a, b = b, sum = sum

  f = fft( randomu( s, 1729, 1729, 5 ) )

  sum = a + b

end

The following graph shows how parallelization can greatly improve performance and yet how throwing more ESE workers at a computation has diminishing returns:

In this case, the sweet spot is about 4 ESE workers.  A different computation, on a different system, might have a different graph and hence a different optimal number of ESE workers.

If you have a copy of the ENVI Services Engine then you can run this example.  See the attached task and client code.  Be sure to adjust the machine name from “penny64” to the name or IP address of the machine that hosts your ESE installation.  Also, don’t forget to upload the task.  Finally, note that despite the name, ESE can run plain-old IDL code, sans ENVI.  Moreover, a local installation of ESE is not required.

For more information see the on-line IDL and ESE documentation: http://www.exelisvis.com/docs/.  The ESE API docs are part of the IDL documentation.

Please login or register to post comments.