Partitioning a datastore in parallel, with a portion of the datastore on each worker in a parallel pool, can provide benefits in many cases:
Perform some action on only one part of the whole datastore, or on several defined parts simultaneously.
Search for specific values in the data store, with all workers acting simultaneously on their own partitions.
Perform a reduction calculation on the workers across all partitions.
This example shows how to use partition
to parallelize the reading
of data from a datastore. It uses a small datastore of airline data
provided in MATLAB®, and finds the mean of the non-NaN values
from its 'ArrDelay'
column.
A simple way to calculate the mean is to divide the sum of all the non-NaN values by the number of non-NaN values. The following code does this for the datastore first in a non-parallel way. To begin, you define a function to amass the count and sum. If you want to run this example, copy and save this function in a folder on the MATLAB command search path.
% Copyright 2015 The MathWorks, Inc. function [total,count] = sumAndCountArrivalDelay(ds) total = 0; count = 0; while hasdata(ds) data = read(ds); total = total + sum(data.ArrDelay,1,'OmitNaN'); count = count + sum(~isnan(data.ArrDelay)); end end
The following code creates a datastore, calls the function,
and calculates the mean without any parallel execution. The tic
and toc
functions
are used to time the execution, here and in the later parallel cases.
ds = datastore(repmat({'airlinesmall.csv'},20,1),'TreatAsMissing','NA'); ds.SelectedVariableNames = 'ArrDelay'; reset(ds); tic [total,count] = sumAndCountArrivalDelay(ds) sumtime = toc mean = total/count
total = 17211680 count = 2417320 sumtime = 12.2227 mean = 7.1201
The partition
function allows you to partition
the datastore into smaller parts, each represented as a datastore
itself. These smaller datastores work completely independently of
each other, so that you can work with them inside of parallel language
features such as parfor
loops
and spmd
blocks.
The number of partitions in the following code is set by the numpartitions
function,
based on the datastore itself (ds
) and the parallel
pool (gcp
) size. This does not necessarily equal
the number of workers in the pool. In this case, the number of loop
iterations is then set to the number of partitions (N
).
The following code starts a parallel pool on a local cluster,
then partitions the datastore among workers for iterating over the
loop. Again, a separate function is called, which includes the parfor
loop
to amass the count and sum totals. Copy and save this function if
you want to run the example.
% Copyright 2015 The MathWorks, Inc. function [total, count] = parforSumAndCountArrivalDelay(ds) N = numpartitions(ds,gcp); total = 0; count = 0; parfor ii = 1:N % Get partition ii of the datastore. subds = partition(ds,N,ii); [localTotal,localCount] = sumAndCountArrivalDelay(subds); total = total + localTotal; count = count + localCount; end end
Now the MATLAB code calls this new function, so that the counting and summing of the non-NAN values can occur in parallel loop iterations.
p = parpool('local',4);
reset(ds);
tic
[total,count] = parforSumAndCountArrivalDelay(ds)
parfortime = toc
mean = total/count
Starting parallel pool (parpool) using the 'local' profile ... connected to 4 workers. Analyzing and transferring files to the workers ...done. total = 17211680 count = 2417320 parfortime = 7.5169 mean = 7.1201
Rather than let the software calculate the number of partitions,
you can explicitly set this value, so that the data can be appropriately
partitioned to fit your algorithm. For example, to parallelize data
from within an spmd
block, you can specify the
number of workers (numlabs
) as the number of partitions
to use. The following function uses an spmd
block
to perform a parallel read, and explicitly sets the number of partitions
equal to the number of workers. To run this example, copy and save
the function.
% Copyright 2015 The MathWorks, Inc. function [total,count] = spmdSumAndCountArrivalDelay(ds) spmd subds = partition(ds,numlabs,labindex); [total,count] = sumAndCountArrivalDelay(subds); end total = sum([total{:}]); count = sum([count{:}]); end
Now the MATLAB code calls the function that uses an spmd
block.
reset(ds); tic [total,count] = spmdSumAndCountArrivalDelay(ds) spmdtime = toc mean = total/count
Analyzing and transferring files to the workers ...done. total = 17211680 count = 2417320 spmdtime = 5.4566 mean = 7.1201
delete(p);
Parallel pool using the 'local' profile is shutting down.
You might get some idea of modest performance improvements by
comparing the times recorded in the variables sumtime
, parfortime
,
and spmdtime
. Your results might vary, as the performance
can be affected by the datastore size, parallel pool size, hardware
configuration, and other factors.