parallel.pool.DataQueue

Class that enables sending and listening for data between client and workers

Description

parallel.pool.DataQueue enables sending data or messages from workers back to the client in a parallel pool while a computation is carried out. For example, you can get intermediate values and an indication of the progress of the computation.

To send data from a parallel pool worker back to the client, first construct a DataQueue in the client. Pass this DataQueue into a parfor-loop or other parallel language construct, such as spmd. From the workers, call send to send data back to the client. At the client, register a function to be called each time data is received by using afterEach.

  • You can call send from the process that calls the constructor, if required.

  • You can construct the queue on the workers and send it back to the client to enable communication in the reverse direction. However, you cannot send a queue from one worker to another. Use spmd, labSend, or labReceive instead.

  • Unlike all other handle objects, DataQueue instances do remain connected when they are sent to workers.

Construction

q = parallel.pool.DataQueue

The constructor for a DataQueue takes no arguments and returns an object that can be used to send or listen for messages (or data) from different workers. You call the constructor only in the process where you want to receive the data. In the usual workflow, the workers should not be calling the constructor, but should be handed an existing DataQueue instance instead.

Properties

collapse all

Dependent property on the queue that indicates how many items of data are waiting to be removed from the queue.

q = parallel.pool.DataQueue;
% No messages in queue because nothing has been sent.
q.QueueLength
ans =

     0
q.send('sending a message')
% Now QueueLength = 1 because one message has been sent.
q.QueueLength
ans =

     1
% Add a callback to process the queue.
listener = q.afterEach(@disp);
sending a message
% Now QueueLength = 0 because there are no more pending messages.
q.QueueLength
ans =

     0
% Data sent now is immediately processed by the callback so that QueueLength remains 0.
q.send('sending message 2')
q.QueueLength
sending message 2

ans =

     0
% Deleting all callback listeners causes messages to build up in the queue again.
delete(listener)
q.send('sending message 3')
q.QueueLength
ans =

     1

Methods

A parallel.pool.DataQueue object has the following methods.

afterEachDefine a function to call when new data is received
sendSend data from worker to client using a data queue

Copy Semantics

Handle. To learn how handle classes affect copy operations, see Copying Objects (MATLAB).

Examples

collapse all

Construct a DataQueue, and call afterEach.

q = parallel.pool.DataQueue;
afterEach(q, @disp);
Start a parfor-loop, and send a message. The pending message is passed to the afterEach function, in this example @disp.

parfor i = 1:3
    send(q, i); 
end;
     1

     2

     3

For more details on listening for data using a DataQueue, see afterEach.

Create a DataQueue, and use afterEach to specify the function to execute each time the queue receives data. This example calls a subfunction that updates the wait bar.

Create a parfor-loop to carry out a computationally demanding task in MATLAB®. Use send to send some dummy data on each iteration of the parfor-loop. When the queue receives the data, afterEach calls nUpdateWaitbar in the client MATLAB, and you can observe the wait bar progress.

function a = parforWaitbar

D = parallel.pool.DataQueue;
h = waitbar(0, 'Please wait ...');
afterEach(D, @nUpdateWaitbar);

N = 200;
p = 1;

parfor i = 1:N
    a(i) = max(abs(eig(rand(400))));
    send(D, i);
end

    function nUpdateWaitbar(~)
        waitbar(p/N, h);
        p = p + 1;
    end
end

If you call afterEach and there are items on the queue waiting to be dispatched, these items are immediately dispatched to the function handle specified by afterEach. Call afterEach before sending data to the queue, to ensure that on send, the function handle @disp is called.

Construct a DataQueue and call afterEach.

q = parallel.pool.DataQueue;
afterEach(q, @disp);
If you then send messages to the queue, each message is passed to the function handle specified by afterEach immediately.

parfor i = 1:3
    send(q, i); 
end
send(q, 0);
     1

     3

     2

     0

If you send the data to the queue and then call afterEach, each of the pending messages are passed to the function handle specified by afterEach.

q = parallel.pool.DataQueue;
parfor i = 1:3
    send(q, i); 
end
afterEach(q, @disp);
       3

       1

       2

Introduced in R2017a

Was this topic helpful?