首页 > 代码库 > [个人备份]mpi.py: 将文件分发到各机器执行
[个人备份]mpi.py: 将文件分发到各机器执行
#!/usr/bin/env python3
# e.g. : mpirun -ppn 4 -f hostfile mpi.py -t i
import mpi4py.MPI as mpi
import time
import sys
import getopt
import datetime
import subprocess
import os
import glob
class mpiComm(mpi.Intracomm):
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1):
self = super(mpiComm, cls).__new__(cls, comm.Dup())
self._recvSleep = recvSleep
return self
def recv(self, source=0, tag=0, status=None):
sts = mpi.Status()
whilenot self.Iprobe(source=source, tag=tag, status=sts):
time.sleep(self._recvSleep)
return super(mpiComm, self).recv(source=sts.source, tag=sts.tag, status=status)
def UpdateProgress(t, r):
sys.stdout.write(‘\rEvaluated : %d - %d‘ % (t, r))
sys.stdout.flush()
def mpiRUN(exe):
global runType, toLog
if ( (runType == ‘i‘) and (os.path.isfile(exe+‘.out‘)) ): pass
elif ( (runType == ‘i‘) and (os.path.isfile(‘MAX-‘+exe+‘.out‘)) ): pass
elif ( (runType == ‘i‘) and (os.path.isfile(‘ERR-‘+exe+‘.out‘)) ): pass
elif ( (runType == ‘l‘) and (os.path.isfile(os.getcwd() + ‘/../H/‘ + exe+‘.ini‘)) ): pass
elif (runType == ‘i‘) or (runType == ‘l‘):
if (not os.path.isfile(exe)):
print(‘Error: ‘, exe, ‘Not Existed!‘)
return None
if (toLog):
flog = open(exe + ‘.log‘, ‘w‘)
run=subprocess.Popen(os.getcwd() + ‘/‘ + exe, stdout=flog.fileno(), stderr=flog.fileno())
run.wait()
run = None
flog.close()
if (os.path.isfile(exe + ‘.log‘)): os.remove(exe + ‘.log‘)
else: os.system(‘./‘ + exe + ‘ > /dev/null‘)
elif (runType == ‘m‘):
os.system(‘./mpimake ‘+exe)
#==============================================
# Main Run #
#==============================================
runType = ‘i‘
toLog = False
comm = mpiComm()
num_procs = comm.Get_size()
rank = comm.Get_rank()
stat = mpi.Status()
if (num_procs < 2):
print("2 Processes at least!")
os._exit(0)
opts, args = getopt.getopt(sys.argv[1:], "t:h")
try:
for o, v in opts:
if o in (‘-t‘):
runType = v
elif o in (‘-h‘):
print(‘-h : To Show This Message‘)
print(‘-t : l - Lambda, i - IntPack, m - Make‘)
os._exit(0)
except getopt.GetoptError as err:
print(‘Option ERROR: ‘ + err)
os._exit(0)
if (rank == 0):
print("mpi.py - Created By Feng")
print(‘-----------------------------------‘)
print(‘@‘, datetime.datetime.now())
if(runType == ‘l‘):
if (not os.path.exists(‘../H‘)): os.makedirs(‘../H‘)
RunList = None
if (runType == ‘i‘) or (runType == ‘l‘):
RunList = glob.glob(‘*-*‘)
RunList = [fn for fn in RunList if ‘.‘ not in fn]
elif (runType == ‘m‘):
RunList = glob.glob(‘*.f90‘)
RunList.sort()
TotalIndex = len(RunList)
UpdateProgress(TotalIndex, 0)
RunIndex = 0
for i in range(1, num_procs):
if (RunIndex < TotalIndex):
comm.send(RunList[RunIndex], dest=i)
RunIndex = RunIndex + 1
else: comm.send(‘exit‘, dest=i)
RunCount = 0
while RunCount < TotalIndex:
res = comm.recv(source=mpi.ANY_SOURCE,status=stat)
if (res == ‘OK‘):
RunCount = RunCount + 1
UpdateProgress(TotalIndex, RunCount)
if (RunIndex < TotalIndex):
comm.send(RunList[RunIndex], dest=stat.source)
RunIndex = RunIndex + 1
else: comm.send(‘exit‘, dest=stat.source)
UpdateProgress(TotalIndex, RunCount)
for i in range(1, num_procs):
comm.send(None, dest=i, tag=1)
print(‘‘)
print(‘@‘, datetime.datetime.now())
else:
while True:
exe = comm.recv(source=0)
if (exe == ‘exit‘): break
mpiRUN(exe)
comm.send(‘OK‘, dest=0)
comm.recv(source=0, tag=1)
[个人备份]mpi.py: 将文件分发到各机器执行