|
1 # |
|
2 # Module to allow spawning of processes on foreign host |
|
3 # |
|
4 # Depends on `multiprocessing` package -- tested with `processing-0.60` |
|
5 # |
|
6 |
|
7 __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] |
|
8 |
|
9 # |
|
10 # Imports |
|
11 # |
|
12 |
|
13 import sys |
|
14 import os |
|
15 import tarfile |
|
16 import shutil |
|
17 import subprocess |
|
18 import logging |
|
19 import itertools |
|
20 import Queue |
|
21 |
|
22 try: |
|
23 import cPickle as pickle |
|
24 except ImportError: |
|
25 import pickle |
|
26 |
|
27 from multiprocessing import Process, current_process, cpu_count |
|
28 from multiprocessing import util, managers, connection, forking, pool |
|
29 |
|
30 # |
|
31 # Logging |
|
32 # |
|
33 |
|
34 def get_logger(): |
|
35 return _logger |
|
36 |
|
37 _logger = logging.getLogger('distributing') |
|
38 _logger.propogate = 0 |
|
39 |
|
40 _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) |
|
41 _handler = logging.StreamHandler() |
|
42 _handler.setFormatter(_formatter) |
|
43 _logger.addHandler(_handler) |
|
44 |
|
45 info = _logger.info |
|
46 debug = _logger.debug |
|
47 |
|
48 # |
|
49 # Get number of cpus |
|
50 # |
|
51 |
|
52 try: |
|
53 slot_count = cpu_count() |
|
54 except NotImplemented: |
|
55 slot_count = 1 |
|
56 |
|
57 # |
|
58 # Manager type which spawns subprocesses |
|
59 # |
|
60 |
|
61 class HostManager(managers.SyncManager): |
|
62 ''' |
|
63 Manager type used for spawning processes on a (presumably) foreign host |
|
64 ''' |
|
65 def __init__(self, address, authkey): |
|
66 managers.SyncManager.__init__(self, address, authkey) |
|
67 self._name = 'Host-unknown' |
|
68 |
|
69 def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
|
70 if hasattr(sys.modules['__main__'], '__file__'): |
|
71 main_path = os.path.basename(sys.modules['__main__'].__file__) |
|
72 else: |
|
73 main_path = None |
|
74 data = pickle.dumps((target, args, kwargs)) |
|
75 p = self._RemoteProcess(data, main_path) |
|
76 if name is None: |
|
77 temp = self._name.split('Host-')[-1] + '/Process-%s' |
|
78 name = temp % ':'.join(map(str, p.get_identity())) |
|
79 p.set_name(name) |
|
80 return p |
|
81 |
|
82 @classmethod |
|
83 def from_address(cls, address, authkey): |
|
84 manager = cls(address, authkey) |
|
85 managers.transact(address, authkey, 'dummy') |
|
86 manager._state.value = managers.State.STARTED |
|
87 manager._name = 'Host-%s:%s' % manager.address |
|
88 manager.shutdown = util.Finalize( |
|
89 manager, HostManager._finalize_host, |
|
90 args=(manager._address, manager._authkey, manager._name), |
|
91 exitpriority=-10 |
|
92 ) |
|
93 return manager |
|
94 |
|
95 @staticmethod |
|
96 def _finalize_host(address, authkey, name): |
|
97 managers.transact(address, authkey, 'shutdown') |
|
98 |
|
99 def __repr__(self): |
|
100 return '<Host(%s)>' % self._name |
|
101 |
|
102 # |
|
103 # Process subclass representing a process on (possibly) a remote machine |
|
104 # |
|
105 |
|
106 class RemoteProcess(Process): |
|
107 ''' |
|
108 Represents a process started on a remote host |
|
109 ''' |
|
110 def __init__(self, data, main_path): |
|
111 assert not main_path or os.path.basename(main_path) == main_path |
|
112 Process.__init__(self) |
|
113 self._data = data |
|
114 self._main_path = main_path |
|
115 |
|
116 def _bootstrap(self): |
|
117 forking.prepare({'main_path': self._main_path}) |
|
118 self._target, self._args, self._kwargs = pickle.loads(self._data) |
|
119 return Process._bootstrap(self) |
|
120 |
|
121 def get_identity(self): |
|
122 return self._identity |
|
123 |
|
124 HostManager.register('_RemoteProcess', RemoteProcess) |
|
125 |
|
126 # |
|
127 # A Pool class that uses a cluster |
|
128 # |
|
129 |
|
130 class DistributedPool(pool.Pool): |
|
131 |
|
132 def __init__(self, cluster, processes=None, initializer=None, initargs=()): |
|
133 self._cluster = cluster |
|
134 self.Process = cluster.Process |
|
135 pool.Pool.__init__(self, processes or len(cluster), |
|
136 initializer, initargs) |
|
137 |
|
138 def _setup_queues(self): |
|
139 self._inqueue = self._cluster._SettableQueue() |
|
140 self._outqueue = self._cluster._SettableQueue() |
|
141 self._quick_put = self._inqueue.put |
|
142 self._quick_get = self._outqueue.get |
|
143 |
|
144 @staticmethod |
|
145 def _help_stuff_finish(inqueue, task_handler, size): |
|
146 inqueue.set_contents([None] * size) |
|
147 |
|
148 # |
|
149 # Manager type which starts host managers on other machines |
|
150 # |
|
151 |
|
152 def LocalProcess(**kwds): |
|
153 p = Process(**kwds) |
|
154 p.set_name('localhost/' + p.name) |
|
155 return p |
|
156 |
|
157 class Cluster(managers.SyncManager): |
|
158 ''' |
|
159 Represents collection of slots running on various hosts. |
|
160 |
|
161 `Cluster` is a subclass of `SyncManager` so it allows creation of |
|
162 various types of shared objects. |
|
163 ''' |
|
164 def __init__(self, hostlist, modules): |
|
165 managers.SyncManager.__init__(self, address=('localhost', 0)) |
|
166 self._hostlist = hostlist |
|
167 self._modules = modules |
|
168 if __name__ not in modules: |
|
169 modules.append(__name__) |
|
170 files = [sys.modules[name].__file__ for name in modules] |
|
171 for i, file in enumerate(files): |
|
172 if file.endswith('.pyc') or file.endswith('.pyo'): |
|
173 files[i] = file[:-4] + '.py' |
|
174 self._files = [os.path.abspath(file) for file in files] |
|
175 |
|
176 def start(self): |
|
177 managers.SyncManager.start(self) |
|
178 |
|
179 l = connection.Listener(family='AF_INET', authkey=self._authkey) |
|
180 |
|
181 for i, host in enumerate(self._hostlist): |
|
182 host._start_manager(i, self._authkey, l.address, self._files) |
|
183 |
|
184 for host in self._hostlist: |
|
185 if host.hostname != 'localhost': |
|
186 conn = l.accept() |
|
187 i, address, cpus = conn.recv() |
|
188 conn.close() |
|
189 other_host = self._hostlist[i] |
|
190 other_host.manager = HostManager.from_address(address, |
|
191 self._authkey) |
|
192 other_host.slots = other_host.slots or cpus |
|
193 other_host.Process = other_host.manager.Process |
|
194 else: |
|
195 host.slots = host.slots or slot_count |
|
196 host.Process = LocalProcess |
|
197 |
|
198 self._slotlist = [ |
|
199 Slot(host) for host in self._hostlist for i in range(host.slots) |
|
200 ] |
|
201 self._slot_iterator = itertools.cycle(self._slotlist) |
|
202 self._base_shutdown = self.shutdown |
|
203 del self.shutdown |
|
204 |
|
205 def shutdown(self): |
|
206 for host in self._hostlist: |
|
207 if host.hostname != 'localhost': |
|
208 host.manager.shutdown() |
|
209 self._base_shutdown() |
|
210 |
|
211 def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
|
212 slot = self._slot_iterator.next() |
|
213 return slot.Process( |
|
214 group=group, target=target, name=name, args=args, kwargs=kwargs |
|
215 ) |
|
216 |
|
217 def Pool(self, processes=None, initializer=None, initargs=()): |
|
218 return DistributedPool(self, processes, initializer, initargs) |
|
219 |
|
220 def __getitem__(self, i): |
|
221 return self._slotlist[i] |
|
222 |
|
223 def __len__(self): |
|
224 return len(self._slotlist) |
|
225 |
|
226 def __iter__(self): |
|
227 return iter(self._slotlist) |
|
228 |
|
229 # |
|
230 # Queue subclass used by distributed pool |
|
231 # |
|
232 |
|
233 class SettableQueue(Queue.Queue): |
|
234 def empty(self): |
|
235 return not self.queue |
|
236 def full(self): |
|
237 return self.maxsize > 0 and len(self.queue) == self.maxsize |
|
238 def set_contents(self, contents): |
|
239 # length of contents must be at least as large as the number of |
|
240 # threads which have potentially called get() |
|
241 self.not_empty.acquire() |
|
242 try: |
|
243 self.queue.clear() |
|
244 self.queue.extend(contents) |
|
245 self.not_empty.notifyAll() |
|
246 finally: |
|
247 self.not_empty.release() |
|
248 |
|
249 Cluster.register('_SettableQueue', SettableQueue) |
|
250 |
|
251 # |
|
252 # Class representing a notional cpu in the cluster |
|
253 # |
|
254 |
|
255 class Slot(object): |
|
256 def __init__(self, host): |
|
257 self.host = host |
|
258 self.Process = host.Process |
|
259 |
|
260 # |
|
261 # Host |
|
262 # |
|
263 |
|
264 class Host(object): |
|
265 ''' |
|
266 Represents a host to use as a node in a cluster. |
|
267 |
|
268 `hostname` gives the name of the host. If hostname is not |
|
269 "localhost" then ssh is used to log in to the host. To log in as |
|
270 a different user use a host name of the form |
|
271 "username@somewhere.org" |
|
272 |
|
273 `slots` is used to specify the number of slots for processes on |
|
274 the host. This affects how often processes will be allocated to |
|
275 this host. Normally this should be equal to the number of cpus on |
|
276 that host. |
|
277 ''' |
|
278 def __init__(self, hostname, slots=None): |
|
279 self.hostname = hostname |
|
280 self.slots = slots |
|
281 |
|
282 def _start_manager(self, index, authkey, address, files): |
|
283 if self.hostname != 'localhost': |
|
284 tempdir = copy_to_remote_temporary_directory(self.hostname, files) |
|
285 debug('startup files copied to %s:%s', self.hostname, tempdir) |
|
286 p = subprocess.Popen( |
|
287 ['ssh', self.hostname, 'python', '-c', |
|
288 '"import os; os.chdir(%r); ' |
|
289 'from distributing import main; main()"' % tempdir], |
|
290 stdin=subprocess.PIPE |
|
291 ) |
|
292 data = dict( |
|
293 name='BoostrappingHost', index=index, |
|
294 dist_log_level=_logger.getEffectiveLevel(), |
|
295 dir=tempdir, authkey=str(authkey), parent_address=address |
|
296 ) |
|
297 pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) |
|
298 p.stdin.close() |
|
299 |
|
300 # |
|
301 # Copy files to remote directory, returning name of directory |
|
302 # |
|
303 |
|
304 unzip_code = '''" |
|
305 import tempfile, os, sys, tarfile |
|
306 tempdir = tempfile.mkdtemp(prefix='distrib-') |
|
307 os.chdir(tempdir) |
|
308 tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') |
|
309 for ti in tf: |
|
310 tf.extract(ti) |
|
311 print tempdir |
|
312 "''' |
|
313 |
|
314 def copy_to_remote_temporary_directory(host, files): |
|
315 p = subprocess.Popen( |
|
316 ['ssh', host, 'python', '-c', unzip_code], |
|
317 stdout=subprocess.PIPE, stdin=subprocess.PIPE |
|
318 ) |
|
319 tf = tarfile.open(fileobj=p.stdin, mode='w|gz') |
|
320 for name in files: |
|
321 tf.add(name, os.path.basename(name)) |
|
322 tf.close() |
|
323 p.stdin.close() |
|
324 return p.stdout.read().rstrip() |
|
325 |
|
326 # |
|
327 # Code which runs a host manager |
|
328 # |
|
329 |
|
330 def main(): |
|
331 # get data from parent over stdin |
|
332 data = pickle.load(sys.stdin) |
|
333 sys.stdin.close() |
|
334 |
|
335 # set some stuff |
|
336 _logger.setLevel(data['dist_log_level']) |
|
337 forking.prepare(data) |
|
338 |
|
339 # create server for a `HostManager` object |
|
340 server = managers.Server(HostManager._registry, ('', 0), data['authkey']) |
|
341 current_process()._server = server |
|
342 |
|
343 # report server address and number of cpus back to parent |
|
344 conn = connection.Client(data['parent_address'], authkey=data['authkey']) |
|
345 conn.send((data['index'], server.address, slot_count)) |
|
346 conn.close() |
|
347 |
|
348 # set name etc |
|
349 current_process().set_name('Host-%s:%s' % server.address) |
|
350 util._run_after_forkers() |
|
351 |
|
352 # register a cleanup function |
|
353 def cleanup(directory): |
|
354 debug('removing directory %s', directory) |
|
355 shutil.rmtree(directory) |
|
356 debug('shutting down host manager') |
|
357 util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) |
|
358 |
|
359 # start host manager |
|
360 debug('remote host manager starting in %s', data['dir']) |
|
361 server.serve_forever() |