|
1 # |
|
2 # Module which supports allocation of memory from an mmap |
|
3 # |
|
4 # multiprocessing/heap.py |
|
5 # |
|
6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 import bisect |
|
10 import mmap |
|
11 import tempfile |
|
12 import os |
|
13 import sys |
|
14 import threading |
|
15 import itertools |
|
16 |
|
17 import _multiprocessing |
|
18 from multiprocessing.util import Finalize, info |
|
19 from multiprocessing.forking import assert_spawning |
|
20 |
|
21 __all__ = ['BufferWrapper'] |
|
22 |
|
23 # |
|
24 # Inheirtable class which wraps an mmap, and from which blocks can be allocated |
|
25 # |
|
26 |
|
27 if sys.platform == 'win32': |
|
28 |
|
29 from ._multiprocessing import win32 |
|
30 |
|
31 class Arena(object): |
|
32 |
|
33 _counter = itertools.count() |
|
34 |
|
35 def __init__(self, size): |
|
36 self.size = size |
|
37 self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next()) |
|
38 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) |
|
39 assert win32.GetLastError() == 0, 'tagname already in use' |
|
40 self._state = (self.size, self.name) |
|
41 |
|
42 def __getstate__(self): |
|
43 assert_spawning(self) |
|
44 return self._state |
|
45 |
|
46 def __setstate__(self, state): |
|
47 self.size, self.name = self._state = state |
|
48 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) |
|
49 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS |
|
50 |
|
51 else: |
|
52 |
|
53 class Arena(object): |
|
54 |
|
55 def __init__(self, size): |
|
56 self.buffer = mmap.mmap(-1, size) |
|
57 self.size = size |
|
58 self.name = None |
|
59 |
|
60 # |
|
61 # Class allowing allocation of chunks of memory from arenas |
|
62 # |
|
63 |
|
64 class Heap(object): |
|
65 |
|
66 _alignment = 8 |
|
67 |
|
68 def __init__(self, size=mmap.PAGESIZE): |
|
69 self._lastpid = os.getpid() |
|
70 self._lock = threading.Lock() |
|
71 self._size = size |
|
72 self._lengths = [] |
|
73 self._len_to_seq = {} |
|
74 self._start_to_block = {} |
|
75 self._stop_to_block = {} |
|
76 self._allocated_blocks = set() |
|
77 self._arenas = [] |
|
78 |
|
79 @staticmethod |
|
80 def _roundup(n, alignment): |
|
81 # alignment must be a power of 2 |
|
82 mask = alignment - 1 |
|
83 return (n + mask) & ~mask |
|
84 |
|
85 def _malloc(self, size): |
|
86 # returns a large enough block -- it might be much larger |
|
87 i = bisect.bisect_left(self._lengths, size) |
|
88 if i == len(self._lengths): |
|
89 length = self._roundup(max(self._size, size), mmap.PAGESIZE) |
|
90 self._size *= 2 |
|
91 info('allocating a new mmap of length %d', length) |
|
92 arena = Arena(length) |
|
93 self._arenas.append(arena) |
|
94 return (arena, 0, length) |
|
95 else: |
|
96 length = self._lengths[i] |
|
97 seq = self._len_to_seq[length] |
|
98 block = seq.pop() |
|
99 if not seq: |
|
100 del self._len_to_seq[length], self._lengths[i] |
|
101 |
|
102 (arena, start, stop) = block |
|
103 del self._start_to_block[(arena, start)] |
|
104 del self._stop_to_block[(arena, stop)] |
|
105 return block |
|
106 |
|
107 def _free(self, block): |
|
108 # free location and try to merge with neighbours |
|
109 (arena, start, stop) = block |
|
110 |
|
111 try: |
|
112 prev_block = self._stop_to_block[(arena, start)] |
|
113 except KeyError: |
|
114 pass |
|
115 else: |
|
116 start, _ = self._absorb(prev_block) |
|
117 |
|
118 try: |
|
119 next_block = self._start_to_block[(arena, stop)] |
|
120 except KeyError: |
|
121 pass |
|
122 else: |
|
123 _, stop = self._absorb(next_block) |
|
124 |
|
125 block = (arena, start, stop) |
|
126 length = stop - start |
|
127 |
|
128 try: |
|
129 self._len_to_seq[length].append(block) |
|
130 except KeyError: |
|
131 self._len_to_seq[length] = [block] |
|
132 bisect.insort(self._lengths, length) |
|
133 |
|
134 self._start_to_block[(arena, start)] = block |
|
135 self._stop_to_block[(arena, stop)] = block |
|
136 |
|
137 def _absorb(self, block): |
|
138 # deregister this block so it can be merged with a neighbour |
|
139 (arena, start, stop) = block |
|
140 del self._start_to_block[(arena, start)] |
|
141 del self._stop_to_block[(arena, stop)] |
|
142 |
|
143 length = stop - start |
|
144 seq = self._len_to_seq[length] |
|
145 seq.remove(block) |
|
146 if not seq: |
|
147 del self._len_to_seq[length] |
|
148 self._lengths.remove(length) |
|
149 |
|
150 return start, stop |
|
151 |
|
152 def free(self, block): |
|
153 # free a block returned by malloc() |
|
154 assert os.getpid() == self._lastpid |
|
155 self._lock.acquire() |
|
156 try: |
|
157 self._allocated_blocks.remove(block) |
|
158 self._free(block) |
|
159 finally: |
|
160 self._lock.release() |
|
161 |
|
162 def malloc(self, size): |
|
163 # return a block of right size (possibly rounded up) |
|
164 assert 0 <= size < sys.maxint |
|
165 if os.getpid() != self._lastpid: |
|
166 self.__init__() # reinitialize after fork |
|
167 self._lock.acquire() |
|
168 try: |
|
169 size = self._roundup(max(size,1), self._alignment) |
|
170 (arena, start, stop) = self._malloc(size) |
|
171 new_stop = start + size |
|
172 if new_stop < stop: |
|
173 self._free((arena, new_stop, stop)) |
|
174 block = (arena, start, new_stop) |
|
175 self._allocated_blocks.add(block) |
|
176 return block |
|
177 finally: |
|
178 self._lock.release() |
|
179 |
|
180 # |
|
181 # Class representing a chunk of an mmap -- can be inherited |
|
182 # |
|
183 |
|
184 class BufferWrapper(object): |
|
185 |
|
186 _heap = Heap() |
|
187 |
|
188 def __init__(self, size): |
|
189 assert 0 <= size < sys.maxint |
|
190 block = BufferWrapper._heap.malloc(size) |
|
191 self._state = (block, size) |
|
192 Finalize(self, BufferWrapper._heap.free, args=(block,)) |
|
193 |
|
194 def get_address(self): |
|
195 (arena, start, stop), size = self._state |
|
196 address, length = _multiprocessing.address_of_buffer(arena.buffer) |
|
197 assert size <= length |
|
198 return address + start |
|
199 |
|
200 def get_size(self): |
|
201 return self._state[1] |