Package SloppyCell :: Module RunInParallel
[hide private]

Source Code for Module SloppyCell.RunInParallel

 1  import logging 
 2  logger = logging.getLogger('Parallel') 
 3   
 4  import sys, traceback 
 5   
 6  import SloppyCell 
 7  import Collections 
 8   
 9  from SloppyCell import num_procs, my_rank, my_host, HAVE_PYPAR 
10  if HAVE_PYPAR: 
11      import pypar 
12   
13  import SloppyCell.Utility as Utility 
14   
15 -class Statement:
16 """ 17 Class for sending Python statements to workers. 18 """
19 - def __init__(self, statement, locals={}):
20 self.statement = statement 21 self.locals = locals
22 23 while my_rank != 0: 24 # Wait for a message 25 message = pypar.receive(source=0) 26 27 # If the message is a SystemExit exception, exit the code. 28 if isinstance(message, SystemExit): 29 sys.exit() 30 31 # Exception handling: 32 # If we catch a SloppyCellException during a eval(), it's probably just 33 # a numerical issue so we just pass it back to the master to deal with. 34 # Note that we don't catch SloppyCellExceptions for exec'd things. 35 # This is because exec'd things shouldn't return anything, thus the 36 # master won't be waiting for a reply. 37 # If we catch any other exception, it's probably a bug in the code. Print 38 # a nice traceback, save results, and exit the code. 39 try: 40 if isinstance(message, Statement): 41 command, msg_locals = message.statement, message.locals 42 locals().update(msg_locals) 43 exec(command) 44 else: 45 command, msg_locals = message 46 locals().update(msg_locals) 47 try: 48 result = eval(command) 49 pypar.send(result, 0) 50 except Utility.SloppyCellException, X: 51 pypar.send(X, 0) 52 except: 53 # Assemble and print a nice traceback 54 tb = traceback.format_exception(sys.exc_type, sys.exc_value, 55 sys.exc_traceback) 56 logger.critical(('node %i:'%my_rank).join(tb)) 57 save_to = '.SloppyCell/node_%i_crash.bp' % my_rank 58 logger.critical("node %i: Command being run was: %s." 59 % (my_rank, command)) 60 Utility.save(msg_locals, save_to) 61 logger.critical("node %i: Corresponding locals saved to %s." 62 % (my_rank, save_to)) 63 sys.exit() 64
65 -def stop_workers():
66 """ 67 Send all workers the command to exit the program. 68 """ 69 for worker in range(1, num_procs): 70 pypar.send(SystemExit(), worker)
71 72 if my_rank == 0: 73 import atexit 74 atexit.register(stop_workers) 75
76 -def statement_to_all_workers(statement, locals={}):
77 """ 78 Send a Python statement to all workers for execution. 79 """ 80 for worker in range(1, num_procs): 81 pypar.send(Statement(statement, locals), worker)
82