Benchmarking Independent Jobs on the Cluster

In this example, we show how to benchmark an application using independent jobs on the cluster, and we analyze the results in some detail. In particular, we:

  • Show how to benchmark a mixture of sequential code and task parallel code.

  • Explain strong and weak scaling.

  • Discuss some of the potential bottlenecks, both on the client and on the cluster.

Note: If you run this example on a large cluster, it might take an hour to run.

Related examples:

The code shown in this example can be found in this function:

function paralleldemo_distribjob_bench

Check the Cluster Profile

Before we interact with the cluster, we verify that the MATLAB® client is configured according to our needs. Calling parcluster will give us a cluster using the default profile or will throw an error if the default is not usable.

myCluster = parcluster;

Timing

We time all operations separately to allow us to inspect them in detail. We will need all those detailed timings to understand where the time is spent, and to isolate the potential bottlenecks. For the purposes of the example, the actual function we benchmark is not very important; in this case we simulate hands of the card game blackjack or 21.

We write all of the operations to be as efficient as possible. For example, we use vectorized task creation. We use tic and toc for measuring the elapsed time of all the operations instead of using the job and task properties CreateTime, StartTime, FinishTime, etc., because tic and toc give us sub-second granularity. Note that we have also instrumented the task function so that it returns the time spent executing our benchmark computations.

function [times, description] = timeJob(myCluster, numTasks, numHands)
    % The code that creates the job and its tasks executes sequentially in
    % the MATLAB client starts here.
    % We first measure how long it takes to create a job.
    timingStart = tic;
    start = tic;
    job = createJob(myCluster);
    times.jobCreateTime = toc(start);
    description.jobCreateTime = 'Job creation time';

    % Create all the tasks in one call to createTask, and measure how long
    % that takes.
    start = tic;
    taskArgs = repmat({{numHands, 1}}, numTasks, 1);
    createTask(job, @pctdemo_task_blackjack, 2, taskArgs);
    times.taskCreateTime = toc(start);
    description.taskCreateTime = 'Task creation time';

    % Measure how long it takes to submit the job to the cluster.
    start = tic;
    submit(job);
    times.submitTime = toc(start);
    description.submitTime = 'Job submission time';

    % Once the job has been submitted, we hope all its tasks execute in
    % parallel.  We measure how long it takes for all the tasks to start
    % and to run to completion.
    start = tic;
    wait(job);
    times.jobWaitTime = toc(start);
    description.jobWaitTime = 'Job wait time';

    % Tasks have now completed, so we are again executing sequential code
    % in the MATLAB client.  We measure how long it takes to retrieve all
    % the job results.
    start = tic;
    results = fetchOutputs(job);
    times.resultsTime = toc(start);
    description.resultsTime = 'Result retrieval time';

    % Verify that the job ran without any errors.
    if ~isempty([job.Tasks.Error])
        taskErrorMsgs = pctdemo_helper_getUniqueErrors(job);
        delete(job);
        error('pctexample:distribjobbench:JobErrored',  ...
          ['The following error(s) occurred during task ' ...
          'execution:\n\n%s'], taskErrorMsgs);
    end

    % Get the execution time of the tasks.  Our task function returns this
    % as its second output argument.
    times.exeTime = max([results{:,2}]);
    description.exeTime = 'Task execution time';

    % Measure how long it takes to delete the job and all its tasks.
    start = tic;
    delete(job);
    times.deleteTime = toc(start);
    description.deleteTime = 'Job deletion time';

    % Measure the total time elapsed from creating the job up to this
    % point.
    times.totalTime = toc(timingStart);
    description.totalTime = 'Total time';

    times.numTasks = numTasks;
    description.numTasks = 'Number of tasks';
end

We look at some of the details of what we are measuring:

  • Job creation time: The time it takes to create a job. For an MJS cluster, this involves a remote call, and the MJS allocates space in its data base. For other cluster types, job creation involves writing a few files to disk.

  • Task creation time: The time it takes to create and save the task information. The MJS saves this in its data base, whereas other cluster types save it in files on the file system.

  • Job submission time: The time it takes to submit the job. For an MJS cluster, we tell it to start executing the job it has in its data base. We ask other cluster types to execute all the tasks we have created.

  • Job wait time: The time we wait after the job submission until job completion. This includes all the activities that take place between job submission and when the job has completed, such as: cluster may need to start all the workers and to send the workers the task information; the workers read the task information, and execute the task function. In the case of an MJS cluster, the workers then send the task results to the MJS, which writes them to its data base, whereas for the other cluster types, the workers write the task results to disk.

  • Task execution time: The time spent simulating blackjack. We instrument the task function to accurately measure this time. This time is also included in the job wait time.

  • Results retrieval time: The time it takes to bring the job results into the MATLAB client. For the MJS, we obtain them from its data base. For other cluster types, we read them from the file system.

  • Job deletion time: The time it takes to delete all the job and task information. The MJS deletes it from its data base. For the other cluster types, we delete the files from the file system.

  • Total time: The time it takes to perform all of the above.

Choosing Problem Size

We know that most clusters are designed for batch execution of medium or long running jobs, so we deliberately try to have our benchmark calculations fall within that range. Yet, we do not want this example to take hours to run, so we choose the problem size so that each task takes approximately 1 minute on our hardware, and we then repeat the timing measurements a few times for increased accuracy. As a rule of thumb, if your calculations in a task take much less than a minute, you should consider whether parfor meets your low-latency needs better than jobs and tasks.

numHands = 1.2e6;
numReps = 5;

We explore speedup by running on a different number of workers, starting with 1, 2, 4, 8, 16, etc., and ending with as many workers as we can possibly use. In this example, we assume that we have dedicated access to the cluster for the benchmarking, and that the cluster's NumWorkers property has been set correctly. Assuming that to be the case, each task will execute right away on a dedicated worker, so we can equate the number of tasks we submit with the number of workers that execute them.

numWorkers = myCluster.NumWorkers ;
if isinf(numWorkers) || (numWorkers == 0)
    error('pctexample:distribjobbench:InvalidNumWorkers', ...
          ['Cannot deduce the number of workers from the cluster.  ' ...
           'Set the NumWorkers on your default profile to be ' ...
           'a value other than 0 or inf.']);
end

numTasks = [pow2(0:ceil(log2(numWorkers) - 1)), numWorkers];

Weak Scaling Measurements

We vary the number of tasks in a job, and have each task perform a fixed amount of work. This is called weak scaling, and is what we really care the most about, because we usually scale up to the cluster to solve larger problems. It should be compared with the strong scaling benchmarks shown later in this example. Speedup based on weak scaling is also known as scaled speedup.

fprintf(['Starting weak scaling timing.  ' ...
         'Submitting a total of %d jobs.\n'], numReps*length(numTasks));
for j = 1:length(numTasks)
    n = numTasks(j);
    for itr = 1:numReps
        [rep(itr), description] = timeJob(myCluster, n, numHands);  %#ok<AGROW>
    end
    % Retain the iteration with the lowest total time.
    totalTime = [rep.totalTime];
    fastest = find(totalTime == min(totalTime), 1);
    weak(j) = rep(fastest);  %#ok<AGROW>
    fprintf('Job wait time with %d task(s): %f seconds\n', ...
            n, weak(j).jobWaitTime);
end
Starting weak scaling timing.  Submitting a total of 45 jobs.
Job wait time with 1 task(s): 59.631733 seconds
Job wait time with 2 task(s): 60.717059 seconds
Job wait time with 4 task(s): 61.343568 seconds
Job wait time with 8 task(s): 60.759119 seconds
Job wait time with 16 task(s): 63.016560 seconds
Job wait time with 32 task(s): 64.615484 seconds
Job wait time with 64 task(s): 66.581806 seconds
Job wait time with 128 task(s): 91.043285 seconds
Job wait time with 256 task(s): 150.411704 seconds

Sequential Execution

We measure the sequential execution time of the computations. Note that this time should be compared to the execution time on the cluster only if they have the same hardware and software configuration.

seqTime = inf;
for itr = 1:numReps
    start = tic;
    pctdemo_task_blackjack(numHands, 1);
    seqTime = min(seqTime, toc(start));
end
fprintf('Sequential execution time: %f seconds\n', seqTime);
Sequential execution time: 84.771630 seconds

Speedup Based on Weak Scaling and Total Execution Time

We first look at the overall speedup achieved by running on different numbers of workers. The speedup is based on the total time used for the computations, so it includes both the sequential and the parallel portions of our code.

This speedup curve represents the capabilities of multiple items with unknown weights associated with each of them: The cluster hardware, the cluster software, the client hardware, the client software, and the connection between the client and the cluster. Therefore, the speedup curve does not represent any one of these, but all taken together.

If the speedup curve meets your desired performance targets, you know that all the aforementioned factors work well together in this particular benchmark. However, if the speedup curve fails to meet your targets, you do not know which of the many factors listed above is the most to blame. It could even be that the approach taken in the parallelization of the application is to blame rather than either the other software or hardware.

All too often, novices believe that this single graph gives the complete picture of the performance of their cluster hardware or software. This is indeed not the case, and one always needs to be aware that this graph does not allow us to draw any conclusions about potential performance bottlenecks.

titleStr = sprintf(['Speedup based on total execution time\n' ...
                    'Note: This graph does not identify performance ' ...
                    'bottlenecks']);
pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.totalTime], ...
    weak(1).totalTime, titleStr);

Detailed Graphs, Part 1

We dig a little bit deeper and look at the times spent in the various steps of our code. We benchmarked weak scaling, that is, the more tasks we create, the more work we perform. Therefore, the size of the task output data increases as we increase the number of tasks. With that in mind, we expect the following to take longer the more tasks we create:

  • Task creation

  • Retrieval of job output arguments

  • Job destruction time

We have no reason to believe that the following increases with the number of tasks:

  • Job creation time

After all, the job is created before we define any of its tasks, so there is no reason why it should vary with the number of tasks. We might expect to see only some random fluctuations in the job creation time.

pctdemo_plot_distribjob('fields', weak, description, ...
    {'jobCreateTime', 'taskCreateTime',  'resultsTime', 'deleteTime'}, ...
    'Time in seconds');

Normalized Times

We already concluded that task creation time is expected to increase as we increase the number of tasks, as does the time to retrieve job output arguments and to delete the job. However, this increase is due to the fact that we are performing more work as we increase the number of workers/tasks. It is therefore meaningful to measure the efficiency of these three activities by looking at the time it takes to perform these operations, and normalize it by the number of tasks. This way, we can look to see if any of the following times stay constant, increase, or decrease as we vary the number of tasks:

  • The time it takes to create a single task

  • The time it takes to retrieve output arguments from a single task

  • The time it takes to delete a task in a job

The normalized times in this graph represent the capabilities of the MATLAB client and the portion of the cluster hardware or software that it might interact with. It is generally considered good if these curves stay flat, and excellent if they are decreasing.

pctdemo_plot_distribjob('normalizedFields', weak, description, ...
    {'taskCreateTime', 'resultsTime', 'deleteTime'});

These graphs sometimes show that the time spent retrieving the results per task goes down as the number of tasks increases. That is undeniably good: We become more efficient the more work we perform. This might happen if there is a fixed amount of overhead for the operation and if it takes a fixed amount of time per task in the job.

We cannot expect a speedup curve based on total execution time to look particularly good if it includes a significant amount of time spent on sequential activities such as the above, where the time spent increases with the number of tasks. In that case, the sequential activities will dominate once there are sufficiently many tasks.

Detailed Graphs, Part 2

It is possible that the time spent in each of following steps varies with the number of tasks, but we hope it does not:

  • Job submission time.

  • Task execution time. This captures the time spent simulating blackjack. Nothing more, nothing less.

In both cases, we look at the elapsed time, also referred to as wall clock time. We look at neither the total CPU time on the cluster nor the normalized time.

pctdemo_plot_distribjob('fields', weak, description, ...
    {'submitTime', 'exeTime'});

There are situations where each of the times shown above could increase with the number of tasks. For example:

  • With some third-party cluster types, the job submission involves one system call for each task in the job, or the job submission involves copying files across the network. In those cases, the job submission time may increase linearly with the number of tasks.

  • The graph of the task execution time is the most likely to expose hardware limitations and resource contention. For example, the task execution time could increase if we are executing multiple workers on the same computer, due to contention for limited memory bandwidth. Another example of resource contention is if the task function were to read or write large data files using a single, shared file system. The task function in this example, however, does not access the file system at all. These types of hardware limitations are covered in great detail in the example Resource Contention in Task Parallel Problems.

Speedup Based on Weak Scaling and Job Wait Time

Now that we have dissected the times spent in the various stages of our code, we want to create a speedup curve that more accurately reflects the capabilities of our cluster hardware and software. We do this by calculating a speedup curve based on the job wait time.

When calculating this speedup curve based on the job wait time, we first compare it to the time it takes to execute a job with a single task on the cluster.

titleStr = 'Speedup based on job wait time compared to one task';
pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.jobWaitTime], ...
    weak(1).jobWaitTime, titleStr);

Job wait time might include the time to start all the MATLAB workers. It is therefore possible that this time is bounded by the IO capabilities of a shared file system. The job wait time also includes the average task execution time, so any deficiencies seen there also apply here. If we do not have dedicated access to the cluster, we could expect the speedup curve based on job wait time to suffer significantly.

Next, we compare the job wait time to the sequential execution time, assuming that the hardware of the client computer is comparable to the compute nodes. If the client is not comparable to the cluster nodes, this comparison is absolutely meaningless. If your cluster has a substantial time lag when assigning tasks to workers, e.g., by assigning tasks to workers only once per minute, this graph will be heavily affected because the sequential execution time does not suffer this lag. Note that this graph will have the same shape as the previous graph, they will only differ by a constant, multiplicative factor.

titleStr = 'Speedup based on job wait time compared to sequential time';
pctdemo_plot_distribjob('speedup', [weak.numTasks], [weak.jobWaitTime], ...
    seqTime, titleStr);

Comparing Job Wait Time with Task Execution Time

As we have mentioned before, the job wait time consists of the task execution time plus scheduling, wait time in the cluster's queue, MATLAB startup time, etc. On an idle cluster, the difference between the job wait time and task execution time should remain constant, at least for small number of tasks. As the number of tasks grows into the tens, hundreds, or thousands, we are bound to eventually run into some limitations. For example, once we have sufficiently many tasks/workers, the cluster cannot tell all the workers simultaneously to start executing their task, or if the MATLAB workers all use the same file system, they might end up saturating the file server.

titleStr = 'Difference between job wait time and task execution time';
pctdemo_plot_distribjob('barTime', [weak.numTasks], ...
    [weak.jobWaitTime] - [weak.exeTime], titleStr);

Strong Scaling Measurements

We now measure the execution time of a fixed-size problem, while varying the number of workers we use to solve the problem. This is called strong scaling, and it is well known that if an application has any sequential parts, there is an upper limit to the speedup that can be achieved with strong scaling. This is formalized in Amdahl's law, which has been widely discussed and debated over the years.

You can easily run into the limits of speedup with strong scaling when submitting jobs to the cluster. If the task execution has a fixed overhead (which it ordinarily does), even if it is as little as one second, the execution time of our application will never go below one second. In our case, we start with an application that executes in approximately 60 seconds on one MATLAB worker. If we divide the computations among 60 workers, it might take as little as one second for each worker to compute its portion of the overall problem. However, the hypothetical task execution overhead of one second has become a major contributor to the overall execution time.

Unless your application runs for a long time, jobs and tasks are usually not the way to achieve good results with strong scaling. If the overhead of task execution is close to the execution time of your application, you should investigate whether parfor meets your requirements. Even in the case of parfor, there is a fixed amount of overhead, albeit much smaller than with regular jobs and tasks, and that overhead limits to the speedup that can be achieved with strong scaling. Your problem size relative to your cluster size may or may not be so large that you experience those limitations.

As a general rule of thumb, it is only possible to achieve strong scaling of small problems on large numbers of processors with specialized hardware and a great deal of programming effort.

fprintf(['Starting strong scaling timing.  ' ...
         'Submitting a total of %d jobs.\n'], numReps*length(numTasks))
for j = 1:length(numTasks)
    n = numTasks(j);
    strongNumHands = ceil(numHands/n);
    for itr = 1:numReps
        rep(itr) = timeJob(myCluster, n, strongNumHands);
    end
    ind = find([rep.totalTime] == min([rep.totalTime]), 1);
    strong(n) = rep(ind); %#ok<AGROW>
    fprintf('Job wait time with %d task(s): %f seconds\n', ...
            n, strong(n).jobWaitTime);
end
Starting strong scaling timing.  Submitting a total of 45 jobs.
Job wait time with 1 task(s): 60.531446 seconds
Job wait time with 2 task(s): 31.745135 seconds
Job wait time with 4 task(s): 18.367432 seconds
Job wait time with 8 task(s): 11.172390 seconds
Job wait time with 16 task(s): 8.155608 seconds
Job wait time with 32 task(s): 6.298422 seconds
Job wait time with 64 task(s): 5.253394 seconds
Job wait time with 128 task(s): 5.302715 seconds
Job wait time with 256 task(s): 49.428909 seconds

Speedup Based on Strong Scaling and Total Execution Time

As we have already discussed, speedup curves that depict the sum of the time spent executing sequential code in the MATLAB client and time executing parallel code on the cluster can be very misleading. The following graph shows this information in the worst-case scenario of strong scaling. We deliberately chose the original problem to be so small relative to our cluster size that the speedup curve would look bad. Neither the cluster hardware nor software was designed with this kind of a use in mind.

titleStr = sprintf(['Speedup based on total execution time\n' ...
                    'Note: This graph does not identify performance ' ...
                    'bottlenecks']);
pctdemo_plot_distribjob('speedup', [strong.numTasks], ...
    [strong.totalTime].*[strong.numTasks], strong(1).totalTime, titleStr);

Alternative for Short Tasks: PARFOR

The strong scaling results did not look good because we deliberately used jobs and tasks to execute calculations of short duration. We now look at how parfor applies to that same problem. Note that we do not include the time it takes to open the pool in our time measurements.

pool = parpool(numWorkers);
parforTime = inf;
strongNumHands = ceil(numHands/numWorkers);
for itr = 1:numReps
    start = tic;
    r = cell(1, numWorkers);
    parfor i = 1:numWorkers
        r{i} = pctdemo_task_blackjack(strongNumHands, 1); %#ok<PFOUS>
    end
    parforTime = min(parforTime, toc(start));
end
delete(pool);
Starting parallel pool (parpool) using the 'bigMJS' profile ... connected to 256 workers.
Analyzing and transferring files to the workers ...done.

Speedup Based on Strong Scaling with PARFOR

The original, sequential calculations took approximately one minute, so each worker needs to perform only a few seconds of computations on a large cluster. We therefore expect strong scaling performance to be much better with parfor than with jobs and tasks.

fprintf('Execution time with parfor using %d workers: %f seconds\n', ...
    numWorkers, parforTime);
fprintf(['Speedup based on strong scaling with parfor using ', ...
    '%d workers: %f\n'], numWorkers, seqTime/parforTime);
Execution time with parfor using 256 workers: 1.126914 seconds
Speedup based on strong scaling with parfor using 256 workers: 75.224557

Summary

We have seen the difference between weak and strong scaling, and discussed why we prefer to look at weak scaling: It measures our ability to solve larger problems on the cluster (more simulations, more iterations, more data, etc.). The large number of graphs and the amount of detail in this example should also be a testament to the fact that benchmarks cannot be boiled down to a single number or a single graph. We need to look at the whole picture to understand whether the application performance can be attributed to the application, the cluster hardware or software, or a combination of both.

We have also seen that for short calculations, parfor can be a great alternative to jobs and tasks. For more benchmarking results using parfor, see the example Simple Benchmarking of PARFOR Using Blackjack.

end
Was this topic helpful?