Partition a Datastore in Parallel

Partitioning a datastore in parallel, with a portion of the datastore on each worker in a parallel pool, can provide benefits in many cases:

  • Perform some action on only one part of the whole datastore, or on several defined parts simultaneously.

  • Search for specific values in the data store, with all workers acting simultaneously on their own partitions.

  • Perform a reduction calculation on the workers across all partitions.

This example shows how to use partition to parallelize the reading of data from a datastore. It uses a small datastore of airline data provided in MATLAB®, and finds the mean of the non-NaN values from its 'ArrDelay' column.

A simple way to calculate the mean is to divide the sum of all the non-NaN values by the number of non-NaN values. The following code does this for the datastore first in a non-parallel way. To begin, you define a function to amass the count and sum. If you want to run this example, copy and save this function in a folder on the MATLAB command search path.


% Copyright 2015 The MathWorks, Inc.

function [total,count] = sumAndCountArrivalDelay(ds)
total = 0;
count = 0;
while hasdata(ds)
    data = read(ds);
    total = total + sum(data.ArrDelay,1,'OmitNaN');
    count = count + sum(~isnan(data.ArrDelay));
end
end

The following code creates a datastore, calls the function, and calculates the mean without any parallel execution. The tic and toc functions are used to time the execution, here and in the later parallel cases.

ds = datastore(repmat({'airlinesmall.csv'},20,1),'TreatAsMissing','NA');
ds.SelectedVariableNames = 'ArrDelay';

reset(ds);
tic
  [total,count] = sumAndCountArrivalDelay(ds)
sumtime = toc
mean = total/count
total =

    17211680


count =

     2417320


sumtime =

   12.2227


mean =

    7.1201

The partition function allows you to partition the datastore into smaller parts, each represented as a datastore itself. These smaller datastores work completely independently of each other, so that you can work with them inside of parallel language features such as parfor loops and spmd blocks.

The number of partitions in the following code is set by the numpartitions function, based on the datastore itself (ds) and the parallel pool (gcp) size. This does not necessarily equal the number of workers in the pool. In this case, the number of loop iterations is then set to the number of partitions (N).

The following code starts a parallel pool on a local cluster, then partitions the datastore among workers for iterating over the loop. Again, a separate function is called, which includes the parfor loop to amass the count and sum totals. Copy and save this function if you want to run the example.


% Copyright 2015 The MathWorks, Inc.

function [total, count] = parforSumAndCountArrivalDelay(ds)

N = numpartitions(ds,gcp);
total = 0;
count = 0;
parfor ii = 1:N
    % Get partition ii of the datastore.
    subds = partition(ds,N,ii);
    
    [localTotal,localCount] = sumAndCountArrivalDelay(subds);
    total = total + localTotal;
    count = count + localCount;
end
end

Now the MATLAB code calls this new function, so that the counting and summing of the non-NAN values can occur in parallel loop iterations.

p = parpool('local',4);

reset(ds);
tic
  [total,count] = parforSumAndCountArrivalDelay(ds)
parfortime = toc
mean = total/count
Starting parallel pool (parpool) using the 'local' profile ... connected to 4 workers.
Analyzing and transferring files to the workers ...done.

total =

    17211680


count =

     2417320


parfortime =

    7.5169


mean =

    7.1201

Rather than let the software calculate the number of partitions, you can explicitly set this value, so that the data can be appropriately partitioned to fit your algorithm. For example, to parallelize data from within an spmd block, you can specify the number of workers (numlabs) as the number of partitions to use. The following function uses an spmd block to perform a parallel read, and explicitly sets the number of partitions equal to the number of workers. To run this example, copy and save the function.


% Copyright 2015 The MathWorks, Inc.

function [total,count] = spmdSumAndCountArrivalDelay(ds)
spmd
    subds = partition(ds,numlabs,labindex);
    [total,count] = sumAndCountArrivalDelay(subds);
end
total = sum([total{:}]);
count = sum([count{:}]);
end

Now the MATLAB code calls the function that uses an spmd block.

reset(ds);
tic
  [total,count] = spmdSumAndCountArrivalDelay(ds)
spmdtime = toc
mean = total/count
Analyzing and transferring files to the workers ...done.

total =

    17211680


count =

     2417320


spmdtime =

    5.4566


mean =

    7.1201

delete(p);
Parallel pool using the 'local' profile is shutting down.

You might get some idea of modest performance improvements by comparing the times recorded in the variables sumtime, parfortime, and spmdtime. Your results might vary, as the performance can be affected by the datastore size, parallel pool size, hardware configuration, and other factors.

Was this topic helpful?