Source code for callgraph.recorder

"""Record function calls into a Graphviz diagraph."""

import functools
from functools import wraps
from itertools import starmap

from graphviz import Digraph

[docs]class CallGraphRecorder(object): """Record function calls into a Graphviz diagraph. Attributes ---------- graph : Digraph A :class:`graphviz.Digraph`. """ def __init__(self, equal=False, label_returns=False, graph_attrs=None): self.graph = Digraph(format='svg', strict=True) if graph_attrs: self.graph.graph_attr.update(**graph_attrs) self._options = {'equal': equal, 'label_returns': label_returns} self._next_call_idx = 0 self._callers = []
[docs] def wrap(self, fn): """Wrap fn with instrumentation to record calls to it. You probably want :func:`decorator` instead. """ @wraps(fn) def wrapper(*args, **kwargs): with self.record(fn, args, kwargs) as record_return: result = fn(*args, **kwargs) record_return(result) return result return wrapper
def _record(self, caller_id, call_id, fn, args, kwargs, result): """Record a function call.""" graph = self.graph label_returns = self._options['label_returns'] label = "{}({}{}{})".format(fn.__name__, ', '.join(map(repr, args)), ', ' if args and kwargs else '', ', '.join(starmap("{}={}".format, kwargs.items()))) if not (label_returns and caller_id): label += " ↦ {}".format(result) graph.node(call_id, label=label) if caller_id: if label_returns: graph.edge(caller_id, call_id, label=str(result), dir='back') else: graph.edge(caller_id, call_id) else: graph.node(call_id, penwidth='3')
[docs] def record(self, fn, args, kwargs): """Return a context manager that records a function call. Returns ------- CallGraphCallRecorder A context manager that records a function call. Examples -------- :: with recorder.record(fn, args, kwargs) as record_return: result = fn(*args, **kwargs) record_return(result) """ return CallGraphCallRecorder(self, fn, args, kwargs)
def _next_call_id(self, fn, args, kwargs): # if it's memoized, use the function name and arguments as a node id. if self._options['equal'] or isinstance(fn, functools._lru_cache_wrapper): return '{}{}{}'.format(getattr(fn, '__name__', str(fn)), args, kwargs) self._next_call_idx += 1 return str(self._next_call_idx)
class CallGraphCallRecorder(object): """Context manager for record callgraph function calls. A context manager that records a function call on its associated CallGraphRecorder. """ __slots__ = ['_recorder', '_fn', '_args', '_kwargs'] def __init__(self, recorder, fn, args, kwargs): self._recorder = recorder self._fn, self._args, self._kwargs = fn, args, kwargs def __enter__(self): fn, args, kwargs = self._fn, self._args, self._kwargs recorder = self._recorder stack = recorder._callers caller_id = (stack[-1] if stack else None) call_id = recorder._next_call_id(fn, args, kwargs) stack.append(call_id) def record_return(result): recorder._record(caller_id, call_id, fn, args, kwargs, result) return record_return def __exit__(self, _type, _value, _traceback): self._recorder._callers.pop()