1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """ Library that contains custom Synergy functionnlities: e.g
21 * Snapshotter that can snapshot unfrozen baselines
22 * Threaded snapshotter.
23 """
24 import ccm
25 import os
26 import shutil
27 import threading
28 import threadpool
29 import traceback
30 import sys
31 import logging
32 import xml.dom.minidom
33 from xml.dom.minidom import getDOMImplementation, parse
34
35
36
37 _logger = logging.getLogger('ccm.extra')
38
40 """ Exception raised by the methods of this module. """
44
45
46
47 -def Snapshot(project, targetdir, dir=None):
48 """ This function can snapshot anything from Synergy, even prep/working projects """
49 assert (project != None, "a project object must be supplied")
50 assert (project.type == "project", "project must be of project type")
51 if not dir:
52 dir = project.root_dir()
53 targetdir = os.path.join(targetdir, dir.name)
54 os.makedirs(targetdir)
55 for object in dir.children(project):
56 if object.type == 'dir':
57 Snapshot(project, targetdir, object)
58 elif object.type == 'project':
59 Snapshot(object, targetdir)
60 else:
61 object.to_file(os.path.join(targetdir, object.name))
62
65 self.object = object
66 self.targetdir = targetdir
68 _logger.info("Getting %s (%s)" % (os.path.join(self.targetdir, self.object.name), self.object))
69 self.object.to_file(os.path.join(self.targetdir, self.object.name))
70 return "Getting %s (%s)" % (os.path.join(self.targetdir, self.object.name), self.object)
71
77 _logger.info("Snapshotting '%s' under '%s'" % (self.project, self.targetdir))
78 status = self.project['status']
79 if status == 'released' or status == 'sqa' or status == 'test' or status == 'integrate':
80 _logger.info("Using Synergy wa_snapshot")
81 return self.project.snapshot(self.targetdir, True)
82 else:
83 _logger.info("Non static project, using custom snapshot.")
84 Snapshot(self.project, self.targetdir)
85 return ""
86
87 -def __FastSnapshot(pool, project, targetdir, callback, exc_hld, dir=None):
88 if not dir:
89 dir = project.root_dir()
90 targetdir = os.path.join(targetdir, dir.name)
91 os.makedirs(targetdir)
92 for object in dir.children(project):
93 if isinstance(object, ccm.Dir):
94 __FastSnapshot(pool, project, targetdir, callback, exc_hld, object)
95 elif isinstance(object, ccm.Project):
96 pool.addWork(__ProjectSnapshot(object, targetdir), callback=callback, exc_callback=exc_hld)
97 else:
98 _logger.info("Getting %s (%s)" % (os.path.join(targetdir, object.name), object))
99 object.to_file(os.path.join(targetdir, object.name))
100
101
102
104 """ Create snapshot running by running sbsnapshot concurently. """
105 assert (threads > 0, "Number of threads must be > 0.")
106 assert (project != None, "a project object must be supplied.")
107 assert (project.type == "project", "project must be of project type.")
108
109
110 exceptions = []
111 results = []
112 def handle_exception(request, exc_info):
113 _logger.error( "Exception occured in request #%s: %s" % (request.requestID, exc_info[1]))
114 exceptions.append(exc_info[1])
115
116 def handle_result(request, result):
117 results.append(result)
118
119 pool = threadpool.ThreadPool(threads)
120 __FastSnapshot(pool, project, targetdir, handle_result, handle_exception)
121 pool.wait()
122
123 if len(exceptions):
124 raise CCMExtraException("Errors occured during snapshot.", exceptions)
125
126 return "\n".join(results)
127
128
129
130 -def FastMaintainWorkArea(project, path, pst=None, threads=4, wat=False):
131 """ Maintain the workarea of a project in parallel. """
132 assert (threads > 0, "Number of threads must be > 0.")
133 assert (isinstance(project, ccm.Project), "a valid project object must be supplied.")
134
135
136 exceptions = []
137 results = []
138 def handle_exception(request, exc_info):
139 _logger.error( "Exception occured in request #%s: %s\n%s" % (request.requestID, exc_info[1], traceback.format_exception(exc_info[0], exc_info[1], exc_info[2])))
140 exceptions.append(exc_info[1])
141
142 def handle_result(request, result):
143 results.append(result)
144
145 class __MaintainProject:
146 def __init__(self, subproject, toplevel, wat=False):
147 self.subproject = subproject
148 self.toplevel = toplevel
149 self.wat = wat
150
151 def __call__(self):
152 output = ""
153 _logger.info("Maintaining project %s" % self.subproject)
154 for tuple in self.subproject.finduse():
155 if tuple['project'] == self.toplevel:
156 self.subproject['wa_path'] = os.path.join(self.toplevel['wa_path'], tuple['path'])
157 self.subproject["project_subdir_template"] = ""
158 _logger.info("Maintaining project %s under %s" % (self.subproject, self.subproject['wa_path']))
159 output = self.subproject.work_area(True, True, True, wat=self.wat)
160 _logger.info("Project %s maintained" % self.subproject)
161 return output
162
163 pool = threadpool.ThreadPool(threads)
164 project.work_area(True, False, True, path, pst, wat=wat)
165 for subproject in project.get_members(type="project"):
166 _logger.info("Adding project %s" % subproject)
167 pool.addWork(__MaintainProject(subproject, project, wat), callback=handle_result, exc_callback=handle_exception)
168 pool.wait()
169
170 if len(exceptions) > 0:
171 raise CCMExtraException("Errors occured during work area maintenance.", exceptions)
172
173 return "\n".join(results)
174
175
176
186
187
190 self._opener = opener
191 if self._opener is None:
192 self._opener = ccm.open_session
193
194 - def get(self, username=None, password=None, engine=None, dbpath=None, database=None, reuse=True):
195 _logger.debug("SessionProvider: Creating a new session.")
196 return self._opener(username, password, engine, dbpath, database, reuse)
197
199 _logger.info("Deleting the session provider.")
200 self.close()
201
204
205
207 """
208 <sessions>
209 <session database="foobar" ccmaddr="xxxx"/>
210 <session database="foobarx" ccmaddr="xxxx"/>
211 </sessions>
212 """
213
214 - def __init__(self, opener=None, cache=None):
215 """ Creates CachedSessionProvider, with a specific
216 opener and cache file.
217 """
218 SessionProvider.__init__(self, opener=opener)
219 _logger.info("Using CachedSessionProvider.")
220 self.__closed = False
221 self._lock = threading.Lock()
222 self.cacheXml = cache
223 self.cacheFree = {}
224 self.cacheUsed = []
225 self.load()
226
227
229 """ Closing the SessionProvider. """
230 _logger.info("Closing the CachedSessionProvider.")
231 self.save()
232 if self.cacheXml == None:
233 _logger.info("Cleaning up opened sessions.")
234 self._lock.acquire()
235 for dbname in self.cacheFree.keys():
236 while len(self.cacheFree[dbname]) > 0:
237 session = self.cacheFree[dbname].pop()
238 session.close_on_exit = True
239 session.close()
240 while len(self.cacheUsed) > 0:
241 session = self.cacheUsed.pop()
242 session.close_on_exit = True
243 self._lock.release()
244 self.__closed = True
245
247 if self.cacheXml is not None and not self.__closed:
248 _logger.info("Writing %s" % self.cacheXml)
249 impl = getDOMImplementation()
250 sessions = impl.createDocument(None, "sessions", None)
251 top_element = sessions.documentElement
252 self._lock.acquire()
253 def add_session(dbname, session):
254 sessionNode = sessions.createElement("session")
255 sessionNode.setAttribute("database", dbname)
256 sessionNode.setAttribute("ccmaddr", session.addr())
257 top_element.appendChild(sessionNode)
258 for dbname in self.cacheFree.keys():
259 for session in self.cacheFree[dbname]:
260 add_session(dbname, session)
261 for session in self.cacheUsed:
262 add_session(session.database(), session)
263 self._lock.release()
264 o = open(self.cacheXml, "w+")
265 o.write(sessions.toprettyxml())
266 o.close()
267 _logger.debug(sessions.toprettyxml())
268
269
271 if self.cacheXml is not None and os.path.exists(self.cacheXml):
272 _logger.info("Loading %s" % self.cacheXml)
273 doc = parse(open(self.cacheXml, 'r'))
274 sessions = doc.documentElement
275 self._lock.acquire()
276 try:
277 for child in sessions.childNodes:
278 if child.nodeType == child.ELEMENT_NODE and child.tagName == "session" and child.hasAttribute('database') and child.hasAttribute('ccmaddr'):
279 if child.getAttribute('database') not in self.cacheFree:
280 self.cacheFree[child.getAttribute('database')] = []
281 if ccm.session_exists(child.getAttribute('ccmaddr'), child.getAttribute('database')):
282 _logger.info(" + Session: database=%s, ccmaddr=%s" % (child.getAttribute('database'), child.getAttribute('ccmaddr')))
283 self.cacheFree[child.getAttribute('database')].append(ccm.Session(None, None, None, ccm_addr=child.getAttribute('ccmaddr'), close_on_exit=False))
284 else:
285 _logger.info(" - Session database=%s, ccmaddr=%s doesn't seem to be valid anymore." % (child.getAttribute('database'), child.getAttribute('ccmaddr')))
286 finally:
287 self._lock.release()
288
289
290 - def get(self, username=None, password=None, engine=None, dbpath=None, database=None, reuse=True):
291 if self.__closed:
292 raise Exception("Could not create further session the provider is closed.")
293 _logger.debug("CachedSessionProvider: Getting a session.")
294 if database is not None and database in self.cacheFree and len(self.cacheFree[database]) > 0:
295 _logger.info("CachedSessionProvider: Reusing session.")
296 self._lock.acquire()
297 s = self.cacheFree[database].pop()
298 self.cacheUsed.append(s)
299 self._lock.release()
300 return CachedProxySession(self, s)
301 else:
302 _logger.debug("CachedSessionProvider: Creating new session.")
303 session = SessionProvider.get(self, username, password, engine, dbpath, database, False)
304 session.close_on_exit = False
305 s = CachedProxySession(self, session)
306 db = s.database()
307 self._lock.acquire()
308 if db not in self.cacheFree:
309 self.cacheFree[db] = []
310 self.cacheUsed.append(session)
311 self._lock.release()
312 return s
313
314 - def free(self, session):
323
325 """ Proxy session which will cleanup the session and free it from the provider """
326
328 """ Constructor. """
329 self.__session = session
330 self.__provider = provider
331
333 """ Delegate attributes to the session object. """
334 _logger.debug("CachedProxySession.__getattr__(%s)" % attrib)
335 if attrib == "close":
336 return self.__close
337 return getattr(self.__session, attrib)
338
340 """ Overriding the session closing. """
341 _logger.debug("CachedProxySession.__close")
342 self.__provider.free(self.__session)
343 self.__session.close()
344
346 """ Free the session on destruction. """
347 _logger.debug("CachedProxySession.__del__")
348 self.__close()
349