from pandac.PandaModules import PStatCollector from direct.directnotify.DirectNotifyGlobal import directNotify from direct.showbase.PythonUtil import Queue, invertDictLossless, makeFlywheelGen from direct.showbase.PythonUtil import itype, serialNum, safeRepr, fastRepr from direct.showbase.Job import Job import types, weakref, random, __builtin__ def _createContainerLeak(): def leakContainer(task=None): base = getBase() if not hasattr(base, 'leakContainer'): base.leakContainer = {} # use tuples as keys since they can't be weakref'd, and use an instance # since it can't be repr/eval'd # that will force the leak detector to hold a normal 'non-weak' reference class LeakKey: pass base.leakContainer[(LeakKey(),)] = {} # test the non-weakref object reference handling if random.random() < .01: key = random.choice(base.leakContainer.keys()) ContainerLeakDetector.notify.debug( 'removing reference to leakContainer key %s so it will be garbage-collected' % safeRepr(key)) del base.leakContainer[key] taskMgr.doMethodLater(10, leakContainer, 'leakContainer-%s' % serialNum()) if task: return task.done leakContainer() def _createTaskLeak(): leakTaskName = uniqueName('leakedTask') leakDoLaterName = uniqueName('leakedDoLater') def nullTask(task=None): return task.cont def nullDoLater(task=None): return task.done def leakTask(task=None, leakTaskName=leakTaskName): base = getBase() taskMgr.add(nullTask, uniqueName(leakTaskName)) taskMgr.doMethodLater(1 << 31, nullDoLater, uniqueName(leakDoLaterName)) taskMgr.doMethodLater(10, leakTask, 'doLeakTask-%s' % serialNum()) if task: return task.done leakTask() class NoDictKey: pass class Indirection: """ Represents the indirection that brings you from a container to an element of the container. Stored as a string to be used as part of an eval, or as a key to be looked up in a dict. Each dictionary dereference is individually eval'd since the dict key might have been garbage-collected TODO: store string components that are duplicates of strings in the actual system so that Python will keep one copy and reduce memory usage """ def __init__(self, evalStr=None, dictKey=NoDictKey): # if this is a dictionary lookup, pass dictKey instead of evalStr self.evalStr = evalStr self.dictKey = NoDictKey # is the dictKey a weak reference? self._isWeakRef = False self._refCount = 0 if dictKey is not NoDictKey: # if we can repr/eval the key, store it as an evalStr keyRepr = safeRepr(dictKey) useEval = False try: keyEval = eval(keyRepr) useEval = True except: pass if useEval: # check to make sure the eval succeeded if hash(keyEval) != hash(dictKey): useEval = False if useEval: # eval/repr succeeded, store as an evalStr self.evalStr = '[%s]' % keyRepr else: try: # store a weakref to the key self.dictKey = weakref.ref(dictKey) self._isWeakRef = True except TypeError, e: ContainerLeakDetector.notify.debug('could not weakref dict key %s' % keyRepr) self.dictKey = dictKey self._isWeakRef = False def destroy(self): # re-entrant self.dictKey = NoDictKey def acquire(self): self._refCount += 1 def release(self): self._refCount -= 1 if self._refCount == 0: self.destroy() def isDictKey(self): # is this an indirection through a dictionary? return self.dictKey is not NoDictKey def _getNonWeakDictKey(self): if not self._isWeakRef: return self.dictKey else: key = self.dictKey() if key is None: return '' return key def dereferenceDictKey(self, parentDict): # look ourselves up in parentDict key = self._getNonWeakDictKey() # objects in __builtin__ will have parentDict==None if parentDict is None: return key return parentDict[key] def getString(self, prevIndirection=None, nextIndirection=None): # return our contribution to the full name of an object instanceDictStr = '.__dict__' if self.evalStr is not None: # if we're an instance dict, skip over this one (obj.__dict__[keyName] == obj.keyName) if nextIndirection is not None and self.evalStr[-len(instanceDictStr):] == instanceDictStr: return self.evalStr[:-len(instanceDictStr)] # if the previous indirection was an instance dict, change our syntax from ['key'] to .key if prevIndirection is not None and prevIndirection.evalStr is not None: if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr: return '.%s' % self.evalStr[2:-2] return self.evalStr # we're stored as a dict key keyRepr = safeRepr(self._getNonWeakDictKey()) # if the previous indirection was an instance dict, change our syntax from ['key'] to .key if prevIndirection is not None and prevIndirection.evalStr is not None: if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr: return '.%s' % keyRepr return '[%s]' % keyRepr def __repr__(self): return self.getString() class ObjectRef: """ stores a reference to a container in a way that does not prevent garbage collection of the container if possible stored as a series of 'indirections' (obj.foo -> '.foo', dict[key] -> '[key]', etc.) """ notify = directNotify.newCategory("ObjectRef") class FailedEval(Exception): pass def __init__(self, indirection, objId, other=None): self._indirections = [] # are we building off of an existing ref? if other is not None: for ind in other._indirections: self._indirections.append(ind) # make sure we're not storing a reference to the actual object, # that could cause a memory leak assert type(objId) in (types.IntType, types.LongType) # prevent cycles (i.e. base.loader.base.loader) assert not self.goesThrough(objId=objId) self._indirections.append(indirection) # make sure our indirections don't get destroyed while we're using them for ind in self._indirections: ind.acquire() self.notify.debug(repr(self)) def destroy(self): for indirection in self._indirections: indirection.release() del self._indirections def getNumIndirections(self): return len(self._indirections) def goesThroughGen(self, obj=None, objId=None): if obj is None: assert type(objId) in (types.IntType, types.LongType) else: objId = id(obj) o = None evalStr = '' curObj = None # make sure the indirections don't go away on us indirections = self._indirections for indirection in indirections: yield None indirection.acquire() for indirection in indirections: yield None if not indirection.isDictKey(): # build up a string to be eval'd evalStr += indirection.getString() else: curObj = self._getContainerByEval(evalStr, curObj=curObj) if curObj is None: raise FailedEval(evalStr) # try to look up this key in the curObj dictionary curObj = indirection.dereferenceDictKey(curObj) evalStr = '' yield None o = self._getContainerByEval(evalStr, curObj=curObj) if id(o) == objId: break for indirection in indirections: yield None indirection.release() yield id(o) == objId def goesThrough(self, obj=None, objId=None): # since we cache the ids involved in this reference, # this isn't perfect, for example if base.myObject is reassigned # to a different object after this Ref was created this would return # false, allowing a ref to base.myObject.otherObject.myObject for goesThrough in self.goesThroughGen(obj=obj, objId=objId): pass return goesThrough def _getContainerByEval(self, evalStr, curObj=None): if curObj is not None: # eval('curObj.foo.bar.someDict') evalStr = 'curObj%s' % evalStr else: # this eval is not based off of curObj, use the global__builtin__ namespace # put __builtin__ at the start if it's not already there bis = '__builtin__' if evalStr[:len(bis)] != bis: evalStr = '%s.%s' % (bis, evalStr) try: container = eval(evalStr) except NameError, ne: return None except AttributeError, ae: return None except KeyError, ke: return None return container def getContainerGen(self, getInstance=False): # try to get a handle on the container by eval'ing and looking things # up in dictionaries, depending on the type of each indirection # if getInstance is True, will return instance instead of instance dict #import pdb;pdb.set_trace() evalStr = '' curObj = None # make sure the indirections don't go away on us indirections = self._indirections for indirection in indirections: indirection.acquire() for indirection in indirections: yield None if not indirection.isDictKey(): # build up a string to be eval'd evalStr += indirection.getString() else: curObj = self._getContainerByEval(evalStr, curObj=curObj) if curObj is None: raise FailedEval(evalStr) # try to look up this key in the curObj dictionary curObj = indirection.dereferenceDictKey(curObj) evalStr = '' for indirection in indirections: yield None indirection.release() if getInstance: lenDict = len('.__dict__') if evalStr[-lenDict:] == '.__dict__': evalStr = evalStr[:-lenDict] # TODO: check that this is still the object we originally pointed to yield self._getContainerByEval(evalStr, curObj=curObj) def getEvalStrGen(self, getInstance=False): str = '' prevIndirection = None curIndirection = None nextIndirection = None # make sure the indirections don't go away on us indirections = self._indirections for indirection in indirections: indirection.acquire() for i in xrange(len(indirections)): yield None if i > 0: prevIndirection = indirections[i-1] else: prevIndirection = None curIndirection = indirections[i] if i < len(indirections)-1: nextIndirection = indirections[i+1] else: nextIndirection = None str += curIndirection.getString(prevIndirection=prevIndirection, nextIndirection=nextIndirection) if getInstance: lenDict = len('.__dict__') if str[-lenDict:] == '.__dict__': str = str[:-lenDict] for indirection in indirections: yield None indirection.release() yield str def getFinalIndirectionStr(self): prevIndirection = None if len(self._indirections) > 1: prevIndirection = self._indirections[-2] return self._indirections[-1].getString(prevIndirection=prevIndirection) def __repr__(self): for result in self.getEvalStrGen(): pass return result class FindContainers(Job): """ Explore the Python graph, looking for objects that support __len__() """ def __init__(self, name, leakDetector): Job.__init__(self, name) self._leakDetector = leakDetector self._id2ref = self._leakDetector._id2ref # these hold objects that we should start traversals from often and not-as-often, # respectively self._id2baseStartRef = {} self._id2discoveredStartRef = {} # these are working copies so that our iterations aren't disturbed by changes to the # definitive ref sets self._baseStartRefWorkingList = ScratchPad(refGen=nullGen(), source=self._id2baseStartRef) self._discoveredStartRefWorkingList = ScratchPad(refGen=nullGen(), source=self._id2discoveredStartRef) self.notify = self._leakDetector.notify ContainerLeakDetector.addPrivateObj(self.__dict__) # set up the base containers, the ones that hold most objects ref = ObjectRef(Indirection(evalStr='__builtin__.__dict__'), id(__builtin__.__dict__)) self._id2baseStartRef[id(__builtin__.__dict__)] = ref # container for objects that want to make sure they are found by # the object exploration algorithm, including objects that exist # just to measure things such as C++ memory usage, scene graph size, # framerate, etc. See LeakDetectors.py if not hasattr(__builtin__, "leakDetectors"): __builtin__.leakDetectors = {} ref = ObjectRef(Indirection(evalStr='leakDetectors'), id(leakDetectors)) self._id2baseStartRef[id(leakDetectors)] = ref for i in self._addContainerGen(__builtin__.__dict__, ref): pass try: base except: pass else: ref = ObjectRef(Indirection(evalStr='base.__dict__'), id(base.__dict__)) self._id2baseStartRef[id(base.__dict__)] = ref for i in self._addContainerGen(base.__dict__, ref): pass try: simbase except: pass else: ref = ObjectRef(Indirection(evalStr='simbase.__dict__'), id(simbase.__dict__)) self._id2baseStartRef[id(simbase.__dict__)] = ref for i in self._addContainerGen(simbase.__dict__, ref): pass def destroy(self): ContainerLeakDetector.removePrivateObj(self.__dict__) Job.destroy(self) def getPriority(self): return Job.Priorities.Low @staticmethod def getStartObjAffinity(startObj): # how good of a starting object is this object for traversing the object graph? try: return len(startObj) except: return 1 def _isDeadEnd(self, obj, objName=None): if type(obj) in (types.BooleanType, types.BuiltinFunctionType, types.BuiltinMethodType, types.ComplexType, types.FloatType, types.IntType, types.LongType, types.NoneType, types.NotImplementedType, types.TypeType, types.CodeType, types.FunctionType, types.StringType, types.UnicodeType, types.TupleType): return True # if it's an internal object, ignore it if id(obj) in ContainerLeakDetector.PrivateIds: return True # prevent crashes in objects that define __cmp__ and don't handle strings if type(objName) == types.StringType and objName in ('im_self', 'im_class'): return True try: className = obj.__class__.__name__ except: pass else: # prevent infinite recursion in built-in containers related to methods if className == 'method-wrapper': return True return False def _hasLength(self, obj): return hasattr(obj, '__len__') def _addContainerGen(self, cont, objRef): contId = id(cont) # if this container is new, or the objRef repr is shorter than what we already have, # put it in the table if contId in self._id2ref: for existingRepr in self._id2ref[contId].getEvalStrGen(): yield None for newRepr in objRef.getEvalStrGen(): yield None if contId not in self._id2ref or len(newRepr) < len(existingRepr): if contId in self._id2ref: self._leakDetector.removeContainerById(contId) self._id2ref[contId] = objRef def _addDiscoveredStartRef(self, obj, ref): # we've discovered an object that can be used to start an object graph traversal objId = id(obj) if objId in self._id2discoveredStartRef: existingRef = self._id2discoveredStartRef[objId] if type(existingRef) not in (types.IntType, types.LongType): if (existingRef.getNumIndirections() >= ref.getNumIndirections()): # the ref that we already have is more concise than the new ref return if objId in self._id2ref: if (self._id2ref[objId].getNumIndirections() >= ref.getNumIndirections()): # the ref that we already have is more concise than the new ref return storedItem = ref # if we already are storing a reference to this object, don't store a second reference if objId in self._id2ref: storedItem = objId self._id2discoveredStartRef[objId] = storedItem def run(self): try: # this yields a different set of start refs every time we start a new traversal # force creation of a new workingListSelector inside the while loop right off the bat workingListSelector = nullGen() # this holds the current step of the current traversal curObjRef = None while True: # yield up here instead of at the end, since we skip back to the # top of the while loop from various points yield None #import pdb;pdb.set_trace() if curObjRef is None: # choose an object to start a traversal from try: startRefWorkingList = workingListSelector.next() except StopIteration: # do relative # of traversals on each set based on how many refs it contains baseLen = len(self._baseStartRefWorkingList.source) discLen = len(self._discoveredStartRefWorkingList.source) minLen = float(max(1, min(baseLen, discLen))) # this will cut down the traversals of the larger set by 2/3 minLen *= 3. workingListSelector = flywheel([self._baseStartRefWorkingList, self._discoveredStartRefWorkingList], [baseLen/minLen, discLen/minLen]) yield None continue # grab the next start ref from this sequence and see if it's still valid while True: yield None try: curObjRef = startRefWorkingList.refGen.next() break except StopIteration: # we've run out of refs, grab a new set if len(startRefWorkingList.source) == 0: # ref set is empty, choose another break # make a generator that yields containers a # of times that is # proportional to their length for fw in makeFlywheelGen( startRefWorkingList.source.values(), countFunc=lambda x: self.getStartObjAffinity(x), scale=.05): yield None startRefWorkingList.refGen = fw if curObjRef is None: # this ref set is empty, choose another # the base set should never be empty (__builtin__ etc.) continue # do we need to go look up the object in _id2ref? sometimes we do that # to avoid storing multiple redundant refs to a single item if type(curObjRef) in (types.IntType, types.LongType): startId = curObjRef curObjRef = None try: for containerRef in self._leakDetector.getContainerByIdGen(startId): yield None except: # ref is invalid self.notify.debug('invalid startRef, stored as id %s' % startId) self._leakDetector.removeContainerById(startId) continue curObjRef = containerRef try: for curObj in curObjRef.getContainerGen(): yield None except: self.notify.debug('lost current container, ref.getContainerGen() failed') # that container is gone, try again curObjRef = None continue self.notify.debug('--> %s' % curObjRef) #import pdb;pdb.set_trace() # store a copy of the current objRef parentObjRef = curObjRef # if we hit a dead end, start over from another container curObjRef = None if hasattr(curObj, '__dict__'): child = curObj.__dict__ hasLength = self._hasLength(child) notDeadEnd = not self._isDeadEnd(child) if hasLength or notDeadEnd: # prevent cycles in the references (i.e. base.loader.base) for goesThrough in parentObjRef.goesThroughGen(child): # don't yield, container might lose this element pass if not goesThrough: objRef = ObjectRef(Indirection(evalStr='.__dict__'), id(child), parentObjRef) yield None if hasLength: for i in self._addContainerGen(child, objRef): yield None if notDeadEnd: self._addDiscoveredStartRef(child, objRef) curObjRef = objRef continue if type(curObj) is types.DictType: key = None attr = None keys = curObj.keys() # we will continue traversing the object graph via one key of the dict, # choose it at random without taking a big chunk of CPU time numKeysLeft = len(keys) + 1 for key in keys: yield None numKeysLeft -= 1 try: attr = curObj[key] except KeyError, e: # this is OK because we are yielding during the iteration self.notify.debug('could not index into %s with key %s' % ( parentObjRef, safeRepr(key))) continue hasLength = self._hasLength(attr) notDeadEnd = False # if we haven't picked the next ref, check if this one is a candidate if curObjRef is None: notDeadEnd = not self._isDeadEnd(attr, key) if hasLength or notDeadEnd: # prevent cycles in the references (i.e. base.loader.base) for goesThrough in parentObjRef.goesThroughGen(curObj[key]): # don't yield, container might lose this element pass if not goesThrough: if curObj is __builtin__.__dict__: objRef = ObjectRef(Indirection(evalStr='%s' % key), id(curObj[key])) else: objRef = ObjectRef(Indirection(dictKey=key), id(curObj[key]), parentObjRef) yield None if hasLength: for i in self._addContainerGen(attr, objRef): yield None if notDeadEnd: self._addDiscoveredStartRef(attr, objRef) if curObjRef is None and random.randrange(numKeysLeft) == 0: curObjRef = objRef del key del attr continue try: childNames = dir(curObj) except: pass else: try: index = -1 attrs = [] while 1: yield None try: attr = itr.next() except: # some custom classes don't do well when iterated attr = None break attrs.append(attr) # we will continue traversing the object graph via one attr, # choose it at random without taking a big chunk of CPU time numAttrsLeft = len(attrs) + 1 for attr in attrs: yield None index += 1 numAttrsLeft -= 1 hasLength = self._hasLength(attr) notDeadEnd = False if curObjRef is None: notDeadEnd = not self._isDeadEnd(attr) if hasLength or notDeadEnd: # prevent cycles in the references (i.e. base.loader.base) for goesThrough in parentObjRef.goesThrough(curObj[index]): # don't yield, container might lose this element pass if not goesThrough: objRef = ObjectRef(Indirection(evalStr='[%s]' % index), id(curObj[index]), parentObjRef) yield None if hasLength: for i in self._addContainerGen(attr, objRef): yield None if notDeadEnd: self._addDiscoveredStartRef(attr, objRef) if curObjRef is None and random.randrange(numAttrsLeft) == 0: curObjRef = objRef del attr except StopIteration, e: pass del itr continue except Exception, e: print 'FindContainers job caught exception: %s' % e if __dev__: raise yield Job.Done class CheckContainers(Job): """ Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector """ ReprItems = 5 def __init__(self, name, leakDetector, index): Job.__init__(self, name) self._leakDetector = leakDetector self.notify = self._leakDetector.notify self._index = index ContainerLeakDetector.addPrivateObj(self.__dict__) def destroy(self): ContainerLeakDetector.removePrivateObj(self.__dict__) Job.destroy(self) def getPriority(self): return Job.Priorities.Normal def run(self): try: self._leakDetector._index2containerId2len[self._index] = {} ids = self._leakDetector.getContainerIds() # record the current len of each container for objId in ids: yield None try: for result in self._leakDetector.getContainerByIdGen(objId): yield None container = result except Exception, e: # this container no longer exists if self.notify.getDebug(): for contName in self._leakDetector.getContainerNameByIdGen(objId): yield None self.notify.debug( '%s no longer exists; caught exception in getContainerById (%s)' % ( contName, e)) self._leakDetector.removeContainerById(objId) continue if container is None: # this container no longer exists if self.notify.getDebug(): for contName in self._leakDetector.getContainerNameByIdGen(objId): yield None self.notify.debug('%s no longer exists; getContainerById returned None' % contName) self._leakDetector.removeContainerById(objId) continue try: cLen = len(container) except Exception, e: # this container no longer exists if self.notify.getDebug(): for contName in self._leakDetector.getContainerNameByIdGen(objId): yield None self.notify.debug( '%s is no longer a container, it is now %s (%s)' % (contName, safeRepr(container), e)) self._leakDetector.removeContainerById(objId) continue self._leakDetector._index2containerId2len[self._index][objId] = cLen # compare the current len of each container to past lens if self._index > 0: idx2id2len = self._leakDetector._index2containerId2len for objId in idx2id2len[self._index]: yield None if objId in idx2id2len[self._index-1]: diff = idx2id2len[self._index][objId] - idx2id2len[self._index-1][objId] """ # this check is too spammy if diff > 20: if diff > idx2id2len[self._index-1][objId]: minutes = (self._leakDetector._index2delay[self._index] - self._leakDetector._index2delay[self._index-1]) / 60. name = self._leakDetector.getContainerNameById(objId) if idx2id2len[self._index-1][objId] != 0: percent = 100. * (float(diff) / float(idx2id2len[self._index-1][objId])) try: for container in self._leakDetector.getContainerByIdGen(objId): yield None except: # TODO self.notify.debug('caught exception in getContainerByIdGen (1)') else: self.notify.warning( '%s (%s) grew %.2f%% in %.2f minutes (%s items at last measurement, current contents: %s)' % ( name, itype(container), percent, minutes, idx2id2len[self._index][objId], fastRepr(container, maxLen=CheckContainers.ReprItems))) yield None """ if (self._index > 2 and objId in idx2id2len[self._index-2] and objId in idx2id2len[self._index-3]): diff2 = idx2id2len[self._index-1][objId] - idx2id2len[self._index-2][objId] diff3 = idx2id2len[self._index-2][objId] - idx2id2len[self._index-3][objId] if self._index <= 4: if diff > 0 and diff2 > 0 and diff3 > 0: name = self._leakDetector.getContainerNameById(objId) try: for container in self._leakDetector.getContainerByIdGen(objId): yield None except: # TODO self.notify.debug('caught exception in getContainerByIdGen (2)') else: msg = ('%s (%s) consistently increased in size over the last ' '3 periods (%s items at last measurement, current contents: %s)' % (name, itype(container), idx2id2len[self._index][objId], fastRepr(container, maxLen=CheckContainers.ReprItems))) self.notify.warning(msg) yield None elif (objId in idx2id2len[self._index-4] and objId in idx2id2len[self._index-5]): # if size has consistently increased over the last 5 checks, # send out a warning diff4 = idx2id2len[self._index-3][objId] - idx2id2len[self._index-4][objId] diff5 = idx2id2len[self._index-4][objId] - idx2id2len[self._index-5][objId] if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0: name = self._leakDetector.getContainerNameById(objId) try: for container in self._leakDetector.getContainerByIdGen(objId): yield None except: # TODO self.notify.debug('caught exception in getContainerByIdGen (3)') else: msg = ('leak detected: %s (%s) consistently increased in size over the last ' '5 periods (%s items at last measurement, current contents: %s)' % (name, itype(container), idx2id2len[self._index][objId], fastRepr(container, maxLen=CheckContainers.ReprItems))) self.notify.warning(msg) yield None messenger.send(self._leakDetector.getLeakEvent(), [container, name]) if config.GetBool('pdb-on-leak-detect', 0): import pdb;pdb.set_trace() pass except Exception, e: print 'CheckContainers job caught exception: %s' % e if __dev__: raise yield Job.Done class FPTObjsOfType(Job): def __init__(self, name, leakDetector, otn, doneCallback=None): Job.__init__(self, name) self._leakDetector = leakDetector self.notify = self._leakDetector.notify self._otn = otn self._doneCallback = doneCallback self._ldde = self._leakDetector._getDestroyEvent() self.accept(self._ldde, self._handleLDDestroy) ContainerLeakDetector.addPrivateObj(self.__dict__) def destroy(self): self.ignore(self._ldde) self._leakDetector = None self._doneCallback = None ContainerLeakDetector.removePrivateObj(self.__dict__) Job.destroy(self) def _handleLDDestroy(self): self.destroy() def getPriority(self): return Job.Priorities.High def run(self): ids = self._leakDetector.getContainerIds() try: for id in ids: getInstance = (self._otn.lower() not in 'dict') yield None try: for container in self._leakDetector.getContainerByIdGen( id, getInstance=getInstance): yield None except: pass else: if hasattr(container, '__class__'): cName = container.__class__.__name__ else: cName = container.__name__ if (self._otn.lower() in cName.lower()): try: for ptc in self._leakDetector.getContainerNameByIdGen( id, getInstance=getInstance): yield None except: pass else: print 'GPTC(' + self._otn + '):' + self.getJobName() + ': ' + ptc except Exception, e: print 'FPTObjsOfType job caught exception: %s' % e if __dev__: raise yield Job.Done def finished(self): if self._doneCallback: self._doneCallback(self) class FPTObjsNamed(Job): def __init__(self, name, leakDetector, on, doneCallback=None): Job.__init__(self, name) self._leakDetector = leakDetector self.notify = self._leakDetector.notify self._on = on self._doneCallback = doneCallback self._ldde = self._leakDetector._getDestroyEvent() self.accept(self._ldde, self._handleLDDestroy) ContainerLeakDetector.addPrivateObj(self.__dict__) def destroy(self): self.ignore(self._ldde) self._leakDetector = None self._doneCallback = None ContainerLeakDetector.removePrivateObj(self.__dict__) Job.destroy(self) def _handleLDDestroy(self): self.destroy() def getPriority(self): return Job.Priorities.High def run(self): ids = self._leakDetector.getContainerIds() try: for id in ids: yield None try: for container in self._leakDetector.getContainerByIdGen(id): yield None except: pass else: name = self._leakDetector._id2ref[id].getFinalIndirectionStr() if self._on.lower() in name.lower(): try: for ptc in self._leakDetector.getContainerNameByIdGen(id): yield None except: pass else: print 'GPTCN(' + self._on + '):' + self.getJobName() + ': ' + ptc except Exception, e: print 'FPTObjsNamed job caught exception: %s' % e if __dev__: raise yield Job.Done def finished(self): if self._doneCallback: self._doneCallback(self) class PruneObjectRefs(Job): """ Job to destroy any container refs that are no longer valid. Checks validity by asking for each container """ def __init__(self, name, leakDetector): Job.__init__(self, name) self._leakDetector = leakDetector self.notify = self._leakDetector.notify ContainerLeakDetector.addPrivateObj(self.__dict__) def destroy(self): ContainerLeakDetector.removePrivateObj(self.__dict__) Job.destroy(self) def getPriority(self): return Job.Priorities.Normal def run(self): try: ids = self._leakDetector.getContainerIds() for id in ids: yield None try: for container in self._leakDetector.getContainerByIdGen(id): yield None except: # reference is invalid, remove it self._leakDetector.removeContainerById(id) _id2baseStartRef = self._leakDetector._findContainersJob._id2baseStartRef ids = _id2baseStartRef.keys() for id in ids: yield None try: for container in _id2baseStartRef[id].getContainerGen(): yield None except: # reference is invalid, remove it del _id2baseStartRef[id] _id2discoveredStartRef = self._leakDetector._findContainersJob._id2discoveredStartRef ids = _id2discoveredStartRef.keys() for id in ids: yield None try: for container in _id2discoveredStartRef[id].getContainerGen(): yield None except: # reference is invalid, remove it del _id2discoveredStartRef[id] except Exception, e: print 'PruneObjectRefs job caught exception: %s' % e if __dev__: raise yield Job.Done class ContainerLeakDetector(Job): """ Low-priority Python object-graph walker that looks for leaking containers. To reduce memory usage, this does a random walk of the Python objects to discover containers rather than keep a set of all visited objects; it may visit the same object many times but eventually it will discover every object. Checks container sizes at ever-increasing intervals. """ notify = directNotify.newCategory("ContainerLeakDetector") # set of containers that should not be examined PrivateIds = set() def __init__(self, name, firstCheckDelay = None): Job.__init__(self, name) self._serialNum = serialNum() self._findContainersJob = None self._checkContainersJob = None self._pruneContainersJob = None if firstCheckDelay is None: firstCheckDelay = 60. * 15. # divide by two, since the first check just takes length measurements and # doesn't check for leaks self._nextCheckDelay = firstCheckDelay/2. self._checkDelayScale = config.GetFloat('leak-detector-check-delay-scale', 1.5) self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.) # main dict of id(container)->containerRef self._id2ref = {} # storage for results of check-container job self._index2containerId2len = {} self._index2delay = {} if config.GetBool('leak-container', 0): _createContainerLeak() if config.GetBool('leak-tasks', 0): _createTaskLeak() # don't check our own tables for leaks ContainerLeakDetector.addPrivateObj(ContainerLeakDetector.PrivateIds) ContainerLeakDetector.addPrivateObj(self.__dict__) self.setPriority(Job.Priorities.Min) jobMgr.add(self) def destroy(self): messenger.send(self._getDestroyEvent()) self.ignoreAll() if self._pruneContainersJob is not None: jobMgr.remove(self._pruneContainersJob) self._pruneContainersJob = None if self._checkContainersJob is not None: jobMgr.remove(self._checkContainersJob) self._checkContainersJob = None jobMgr.remove(self._findContainersJob) self._findContainersJob = None del self._id2ref del self._index2containerId2len del self._index2delay def _getDestroyEvent(self): # sent when leak detector is about to be destroyed return 'cldDestroy-%s' % self._serialNum def getLeakEvent(self): # sent when a leak is detected # passes description string as argument return 'containerLeakDetected-%s' % self._serialNum @classmethod def addPrivateObj(cls, obj): cls.PrivateIds.add(id(obj)) @classmethod def removePrivateObj(cls, obj): cls.PrivateIds.remove(id(obj)) def _getCheckTaskName(self): return 'checkForLeakingContainers-%s' % self._serialNum def _getPruneTaskName(self): return 'pruneLeakingContainerRefs-%s' % self._serialNum def getContainerIds(self): return self._id2ref.keys() def getContainerByIdGen(self, id, **kwArgs): # return a generator to look up a container return self._id2ref[id].getContainerGen(**kwArgs) def getContainerById(self, id): for result in self._id2ref[id].getContainerGen(): pass return result def getContainerNameByIdGen(self, id, **kwArgs): return self._id2ref[id].getEvalStrGen(**kwArgs) def getContainerNameById(self, id): if id in self._id2ref: return repr(self._id2ref[id]) return '' def removeContainerById(self, id): if id in self._id2ref: self._id2ref[id].destroy() del self._id2ref[id] def run(self): # start looking for containers self._findContainersJob = FindContainers( '%s-findContainers' % self.getJobName(), self) jobMgr.add(self._findContainersJob) self._scheduleNextLeakCheck() self._scheduleNextPruning() while True: yield Job.Sleep def getPathsToContainers(self, name, ot, doneCallback=None): j = FPTObjsOfType(name, self, ot, doneCallback) jobMgr.add(j) return j def getPathsToContainersNamed(self, name, on, doneCallback=None): j = FPTObjsNamed(name, self, on, doneCallback) jobMgr.add(j) return j def _scheduleNextLeakCheck(self): taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks, self._getCheckTaskName()) # delay between checks # fib: 1 1 2 3 5 8 13 21 34 55 89 # * 2.: 1 2 4 8 16 32 64 128 256 512 1024 # * 1.5: 1 1.5 2.3 3.4 5.1 7.6 11.4 17.1 25.6 38.4 57.7 # # delay from job start # fib: 1 2 4 7 12 20 33 54 88 143 232 # * 2.: 1 3 7 15 31 63 127 255 511 1023 2047 # * 1.5: 1 2.5 4.75 8.1 13.2 20.8 32.2 49.3 74.9 113.3 171 self._nextCheckDelay = self._nextCheckDelay * self._checkDelayScale def _checkForLeaks(self, task=None): self._index2delay[len(self._index2containerId2len)] = self._nextCheckDelay self._checkContainersJob = CheckContainers( '%s-checkForLeaks' % self.getJobName(), self, len(self._index2containerId2len)) self.acceptOnce(self._checkContainersJob.getFinishedEvent(), self._scheduleNextLeakCheck) jobMgr.add(self._checkContainersJob) return task.done def _scheduleNextPruning(self): taskMgr.doMethodLater(self._pruneTaskPeriod, self._pruneObjectRefs, self._getPruneTaskName()) def _pruneObjectRefs(self, task=None): self._pruneContainersJob = PruneObjectRefs( '%s-pruneObjectRefs' % self.getJobName(), self) self.acceptOnce(self._pruneContainersJob.getFinishedEvent(), self._scheduleNextPruning) jobMgr.add(self._pruneContainersJob) return task.done