"""
Parse and analyze valgrind output and suppressions.
"""
import collections
import os
import re
import sys
CHECKED_BYTES_RE = re.compile(r'^==.*== Checked [\d,]+ bytes')
LOG_LINE_RE = re.compile(r'^==.*== ')
COMMAND_RE = re.compile(r'^==.*== Command:\W(.*)')
# If a leak has text (key), summarize it as (value)
DESCRIPTION_TO_SUMMARY = {
'definitely lost': 'memory definitely lost',
'possibly lost': 'memory possibly lost',
'still reachable': "memory still reachable",
'indirectly lost': 'memory indirectly lost',
'Conditional jump or move': 'uninitialized value used in logic',
'Use of uninitialized': 'uninitialized value used'
}
[docs]class Suppression:
"""Represent a suppression for a leak found by valgrind."""
[docs] @classmethod
def read(cls, fh, ignore_existing_title=True):
lines = []
for line in fh:
line = line.strip()
if '}' == line:
if ignore_existing_title:
lines[0] = ''
return cls(lines[1], lines[2:], lines[0])
lines.append(line)
[docs] def __init__(self, error_type, suppressions, title=''):
self.error_type = error_type
self.suppressions = suppressions
self.title = title
def __lt__(self, other):
"""Used in sorting."""
for ssup, osup in zip(self.suppressions, other.suppressions):
if ssup != osup:
return ssup < osup
return len(self.suppressions) < len(other.suppressions)
def __le__(self, other):
"""Used in sorting."""
for ssup, osup in zip(self.suppressions, other.suppressions):
if ssup != osup:
return ssup < osup
return len(self.suppressions) <= len(other.suppressions)
def __eq__(self, other):
"""Used in discarding duplicates."""
for ssup, osup in zip(self.suppressions, other.suppressions):
if ssup != osup:
return False
return True
def __repr__(self):
title = ['{', self.title, self.error_type]
text = '\n '.join(title + self.suppressions)
return text + '\n}'
[docs]class Leak:
"""Represent a Valgrind Leak."""
[docs] @classmethod
def read(cls, description, fh, command=None, filename=None):
backtrace = []
for line in fh:
line = LOG_LINE_RE.sub('', line)
line = line.strip()
if line == '{':
suppression = Suppression.read(fh)
return cls(filename, command, description, backtrace,
suppression)
elif line:
backtrace.append(line)
[docs] def __init__(self,
filename,
command,
description,
backtrace=None,
suppression=None):
self.filename = os.path.normpath(filename)
self.directory = os.path.dirname(self.filename)
self.command = command
self.description = description
self.short_description = self._getShortDescription()
self.backtrace = backtrace or tuple()
self.suppression = suppression
self.duplicates = []
def _getShortDescription(self):
for search, summary in DESCRIPTION_TO_SUMMARY.items():
if search in self.description:
return summary
# I believe this is only:
# Invalid write
# Invalid read
# Mismatched free() / delete / delete []
return self.description.lower()
[docs]def read_valgrind_log(filename, fh):
"""Read a valgrind log"""
leaks = []
command = None
skip_lines = 0
in_summary_block = False
for line in fh:
# There may be up to one block of errors per pthread,
# and we should parse them all, skipping any lines
# outside these blocks.
if skip_lines > 0:
skip_lines -= 1
continue
elif 'HEAP SUMMARY:' in line:
skip_lines = 2 # Skip the summary
continue
elif 'LEAK SUMMARY:' in line:
# this summary may vary in length, depending on the
# types of 'still reachable' allocation. It should
# always end with a "ERROR SUMMARY".
in_summary_block = True
continue
elif 'ERROR SUMMARY:' in line:
in_summary_block = False
continue
elif "Searching for pointers to" in line:
continue
elif CHECKED_BYTES_RE.search(line):
continue
cmd = COMMAND_RE.search(line)
if cmd:
command = cmd.group(1)
skip_lines = 1 # Skip the pid line
elif command and not in_summary_block:
line = LOG_LINE_RE.sub('', line)
if line and line.strip() and line[0] != ' ':
line = line.strip()
leak = Leak.read(line, fh, command, filename)
if leak:
# We want valgrind to report 'Still reachable' leaks,
# but we don't want them to be considered as errors.
if leak.short_description == 'memory still reachable':
continue
else:
leaks.append(leak)
return leaks
[docs]def discover_leaks(directory, verbose=False):
"""
Search a directory for valgrind log files.
:rtype: dict
:return: Key: directory path, Value: list of unique leaks seen in that
directory
"""
leaks = collections.defaultdict(list)
for root, dirs, files in os.walk(directory):
reldirectory = os.path.relpath(root, directory)
for filename in files:
if not ('valgrind' in filename and filename.endswith('log')):
continue
absname = os.path.join(root, filename)
relname = os.path.relpath(absname, directory)
if verbose:
print(f'finding leaks in {relname}')
with open(absname) as fh:
more_leaks = read_valgrind_log(relname, fh)
if more_leaks:
leaks[reldirectory].extend(more_leaks)
return leaks
[docs]def simplify(suppression):
"""Remove test code from suppression. Wildcard any absolute paths."""
for index, sup in enumerate(suppression.suppressions):
# wildcard absolute paths
if sup.startswith('obj:/'):
suppression.suppressions[index] = 'obj:*'
# Ignore test code
if (sup.endswith(
'test_methodEv'
) or 'fun:_ZN5boost9unit_test9ut_detail7invokerINS1_6unusedEE6invokeIPFvvEEES3_RT_'
in sup or 'fun:main' in sup or 'fun:__libc_start_main' in sup or
'fun:_ZN13MM_TestDriver4testEiPPcP15MM_TestCallback' in sup):
suppression.suppressions = suppression.suppressions[:index]
return
[docs]def uniquify_leaks(leaks):
"""
Given a list of Leak objects, remove any duplicates. Duplicates include
leaks that are wholey included in another leak.
"""
for leak in leaks:
simplify(leak.suppression)
leaks.sort(key=lambda l: l.suppression)
i = 1
while (i < len(leaks)):
if leaks[i].suppression == leaks[i - 1].suppression:
leaks[i - 1].duplicates.append(leaks[i])
leaks[i - 1].duplicates.extend(leaks[i].duplicates)
del leaks[i]
else:
i += 1
[docs]def uniquify_suppression_titles(leaks):
"""
Make sure that each suppression has a unique title.
"""
titles = set()
for leak in leaks:
suppression = leak.suppression
if suppression.title in titles:
split_title = suppression.title.split()
try:
index = int(split_title[-1])
prefix = ' '.join(split_title[:-1])
except ValueError:
index = 0
prefix = suppression.title
title = f'{prefix} {index}'
while title in titles:
index += 1
title = f'{prefix} {index}'
suppression.title = title
titles.add(suppression.title)
[docs]def read_and_uniquify(filename):
"""Read a suppression file and make the suppressions in it unique."""
suppressions = []
leaks = []
with open(filename) as fh:
index = 0
for line in fh:
if line.startswith('{'):
suppression = Suppression.read(fh, False)
simplify(suppression)
suppressions.append(suppression)
leaks.append(Leak(filename, index, '', suppression=suppression))
index += 1
sys.stderr.write('{} suppressions found\n'.format(len(leaks)))
uniquify_leaks(leaks)
sys.stderr.write('{} unique suppressions found\n'.format(len(leaks)))
# This puts the unique suppressions in the order of the original
# suppressions. Command is overloaded to hold order.
leaks.sort(key=lambda l: l.command)
uniquify_suppression_titles(leaks)
return leaks