updating dictionary values in mpi4py

How can we update one global dictionary in MPI (specifically mpi4py) across different processors. The issue that I am encountering now after broadcasting is that different processors cannot see the changes (update) on the dictionary by the other processors.

for example the input data is as follows:

   col1  col2
   -----------
    a      1
    a      1
    b      2
    c      3
    c      1

the output dictionary should be as follows:

  {'a': 2, 'b': 2, 'c': 4}

which means the col2 in the input are added together and created the value for the keys (col1). The dictionary initially is empty and is getting updated during the parallel processing by all the processors (at least this is what we’re trying to do).

Answer

How can we update one global dictionary in MPI (specifically mpi4py) across different processors. The issue that i am encountering now after broadcasting is that different processors cannot see the changes (update) on the dictionary by the other processors.

First, you need to understand that in MPI, each MPI process runs a complete copy of the program. Consequently, all the data allocated on that program is private to each process.

Let us look at the following example:

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    dictionary = {'a': 1, 'c': 3}
    for i in range(1, size, 1):
        data = comm.recv(source=i, tag=11)
        for key in data:
            if key in dictionary:
               dictionary[key] = dictionary[key] + data[key]
            else:
               dictionary[key] = data[key] 
    print(dictionary)
else:
    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

In this code, the process with rank=0 allocates a dictionary, which is private to that process, in the same way, that data = {'a': 1, 'b': 2, 'c': 1} is private to each of the other processes. If (for instance) a process changes the variable size, that change will not be visible to the other processes.

In this code, all processes send their dictionary copy:

    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

to the process 0, which calls comm.recv for each of the other processes:

for i in range(1, size, 1):
    data = comm.recv(source=i, tag=11)

and merges the data received (from the other processes) into its own dictionary:

    for key in data:
        if key in dictionary:
           dictionary[key] = dictionary[key] + data[key]
        else:
           dictionary[key] = data[key] 

in the end, only the process 0 has the complete dictionary. The same has happened to you when you did the broadcasting. Nevertheless, MPI does have routines (i.e., comm.Allgather) that would allow you to have the entire dictionary in all the processes.

An example of such code (you just need to adapt to a dictionary):

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)

print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$ 

The dictionary initially is empty and is getting updated during the parallel processing by all the processors (at least this is what we’re trying to do).

With the aforementioned model (i.e., distributed-memory paradigm) you would need to explicitly communicate with all the processes every time one of them would change the dictionary. This means that you would have to know beforehand the points in the code where you should make those communications.

However, based on your text, you seem to want a shared-memory approach, where a process would update the dictionary, for instance as follows :

    if key in dictionary:
       dictionary[key] = dictionary[key] + data[key]
    else:
       dictionary[key] = data[key] 

and those changes would be immediately visible to all processes. Just like what happens in a multithreading code.

MPI 3.0 introduces the concept of shared-memory where one can actually achieve that.

Here is an example using arrays:

from mpi4py import MPI 
import numpy as np 

comm = MPI.COMM_WORLD 

size = 1000 
itemsize = MPI.DOUBLE.Get_size() 
if comm.Get_rank() == 0: 
   nbytes = size * itemsize 
else: 
   nbytes = 0 

win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm) 

buf, itemsize = win.Shared_query(0) 
assert itemsize == MPI.DOUBLE.Get_size() 
buf = np.array(buf, dtype='B', copy=False) 
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,)) 

if comm.rank == 1: 
  ary[:5] = np.arange(5) 
 
comm.Barrier() 
if comm.rank == 0: 
  print(ary[:10])

The code is not mine, it comes from here.