Как разбросать и собрать список python объектов в mpi4py

1

У меня есть список из 100 000 объектов python, которые я хотел бы разбросать и собрать в mpi4py.

Когда я пытаюсь использовать 8 процессоров, я получаю:

SystemError: отрицательный размер передан в PyBytes_FromStringAndSize

о рассеянии.

Когда я пытаюсь использовать 64 процессора, я получаю ту же ошибку, но на сборке.

Когда я пытаюсь сделать массив объектов из списка и использовать Gather и Scatter, я получаю ошибку, которая в основном утверждает, что dtype массива не может быть объектом.

Как я могу заставить это работать? Или что-нибудь еще, что я могу использовать, кроме MPI?
Я запускаю это на компьютере с 8 узлами и 64-ппп.

Теги:
parallel-processing
cluster-computing
hpc
mpi4py

2 ответа

2

Используя разброс и сбор, пример разделения массива numpy с 100000 элементами.

import numpy as np
from mpi4py import MPI
from pprint import pprint
comm = MPI.COMM_WORLD

pprint("-" * 78)
pprint(" Running on %d cores" % comm.size)
pprint("-" * 78)

N = 100000
my_N = N // 8

if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)
else:
    A = np.empty(N, dtype=np.float64)

my_A = np.empty(my_N, dtype=np.float64)

# Scatter data 
comm.Scatter([A, MPI.DOUBLE], [my_A, MPI.DOUBLE])

pprint("After Scatter:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(my_A)))
    comm.Barrier()

# Allgather data into A
comm.Allgather([my_A, MPI.DOUBLE], [A, MPI.DOUBLE])

pprint("After Allgather:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(A)))
    comm.Barrier()

Также вы можете проверить scatterv и gatherv, больше примеров здесь и здесь.

0

Я не уверен, что это ответ, и я не уверен, что вы все еще ищете ответ, но...

Таким образом, у вас есть 100 000 объектов python. Если эти объекты являются регулярными данными (наборами данных), а не экземпляром какого-либо класса, передают данные как строку json. Что-то вроде этого:

#!/usr/bin/env python

import json
import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD

if comm.rank == 0:
    tasks = [
        json.dumps( { 'a':1,'x':2,'b':3 } ),
        json.dumps( { 'a':3,'x':1,'b':2 } ),
        json.dumps( { 'a':2,'x':3,'b':1 } )
    ]
else:
    tasks = None


# Scatter paramters arrays
unit = comm.scatter(tasks, root=0)

p = json.loads(unit)
print "-"*18
print("-- I'm rank %d in %d size task" % (comm.rank,comm.size) )
print("-- My paramters are: {}".format(p))
print "-"*18

comm.Barrier()

calc = p['a']*p['x']**2+p['b']

# gather results
result = comm.gather(calc, root=0)
# do something with result

if comm.rank == 0:
    print "the result is ", result
else:
    result = None

обратите внимание, что если у вас всего 8 узлов/ядер, вам нужно создать 8 записей в списке tasks и последовательно разбросать и собрать все 100 000 наборов данных. Если весь ваш набор данных находится в списке ALLDATA, код может выглядеть так:

def calc(a=0,x=0,b=0):
    return a*x**2+b

if comm.rank == 0: collector = []
for xset in zip(*(iter(ALLDATA),) * comm.size):
    task = [ json.dumps(s) for s in xset ]
    comm.Barrier()
    unit = comm.scatter(task if comm.rank == 0 else None, root=0)
    p = json.loads(unit)
    res = json.dumps( calc(**p) )
    totres = comm.gather(res, root=0)
    if comm.rank == 0:
        collector += [ json.loads(x) for x in  totres  ]



if comm.rank == 0:
    print "the result is ", collector

Ещё вопросы

Сообщество Overcoder
Наверх
Меню