#!/usr/bin/python3
# pyright: strict
# --------------------------------------------------------------------
# --- Cachegrind's annotator. cg_annotate.in ---
# --------------------------------------------------------------------
# This file is part of Cachegrind, a Valgrind tool for cache
# profiling programs.
#
# Copyright (C) 2002-2023 Nicholas Nethercote
# njn@valgrind.org
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see .
#
# The GNU General Public License is contained in the file COPYING.
# This script reads Cachegrind output files and produces human-readable output.
#
# Use `make pyann` to "build" this script with `auxprogs/pybuild.rs` every time
# it is changed. This runs the formatters, type-checkers, and linters on
# `cg_annotate.in` and then generates `cg_annotate`.
from __future__ import annotations
import filecmp
import os
import re
import sys
from argparse import ArgumentParser, BooleanOptionalAction, Namespace
from collections import defaultdict
from typing import Callable, DefaultDict, NoReturn, TextIO
def die(msg: str) -> NoReturn:
print("cg_annotate: error:", msg, file=sys.stderr)
sys.exit(1)
SearchAndReplace = Callable[[str], str]
# A typed wrapper for parsed args.
class Args(Namespace):
# None of these fields are modified after arg parsing finishes.
diff: bool
mod_filename: SearchAndReplace
mod_funcname: SearchAndReplace
show: list[str]
sort: list[str]
threshold: float # a percentage
show_percs: bool
annotate: bool
context: int
cgout_filename: list[str]
@staticmethod
def parse() -> Args:
# We support Perl-style `s/old/new/flags` search-and-replace
# expressions, because that's how this option was implemented in the
# old Perl version of `cg_diff`. This requires conversion from
# `s/old/new/` style to `re.sub`. The conversion isn't a perfect
# emulation of Perl regexps (e.g. Python uses `\1` rather than `$1` for
# using captures in the `new` part), but it should be close enough. The
# only supported flags are `g` (global) and `i` (ignore case).
def search_and_replace(regex: str | None) -> SearchAndReplace:
if regex is None:
return lambda s: s
# Extract the parts of an `s/old/new/tail` regex. `(? list[str]:
return values.split(",")
def threshold(n: str) -> float:
f = float(n)
if 0 <= f <= 20:
return f
raise ValueError
# Add a bool argument that defaults to true.
#
# Supports these forms: `--foo`, `--no-foo`, `--foo=yes`, `--foo=no`.
# The latter two were the forms supported by the old Perl version of
# `cg_annotate`, and are now deprecated.
def add_bool_argument(
p: ArgumentParser, new_name: str, old_name: str, help_: str
) -> None:
new_flag = "--" + new_name
old_flag = "--" + old_name
dest = new_name.replace("-", "_")
# Note: the default value is always printed with `BooleanOptionalAction`,
# due to an argparse bug: https://github.com/python/cpython/issues/83137.
p.add_argument(
new_flag,
default=True,
action=BooleanOptionalAction,
help=help_,
)
p.add_argument(
f"{old_flag}=yes",
dest=dest,
action="store_true",
help=f"(deprecated) same as --{new_name}",
)
p.add_argument(
f"{old_flag}=no",
dest=dest,
action="store_false",
help=f"(deprecated) same as --no-{new_name}",
)
p = ArgumentParser(description="Process one or more Cachegrind output files.")
p.add_argument("--version", action="version", version="%(prog)s-3.21.0")
p.add_argument(
"--diff",
default=False,
action="store_true",
help="perform a diff between two Cachegrind output files",
)
p.add_argument(
"--mod-filename",
type=search_and_replace,
metavar="REGEX",
default=search_and_replace(None),
help="a search-and-replace regex applied to filenames, e.g. "
"`s/prog[0-9]/progN/`",
)
p.add_argument(
"--mod-funcname",
type=search_and_replace,
metavar="REGEX",
default=search_and_replace(None),
help="like --mod-filename, but for function names",
)
p.add_argument(
"--show",
type=comma_separated_list,
metavar="A,B,C",
help="only show figures for events A,B,C (default: all events)",
)
p.add_argument(
"--sort",
type=comma_separated_list,
metavar="A,B,C",
help="sort functions by events A,B,C (default: event column order)",
)
p.add_argument(
"--threshold",
type=threshold,
default=0.1,
metavar="N:[0,20]",
help="only show file:function/function:file pairs with more than "
"N%% of primary sort event counts (default: %(default)s)",
)
add_bool_argument(
p,
"show-percs",
"show-percs",
"show a percentage for each non-zero count",
)
add_bool_argument(
p,
"annotate",
"auto",
"annotate all source files containing functions that reached the "
"event count threshold",
)
p.add_argument(
"--context",
type=int,
default=8,
metavar="N",
help="print N lines of context before and after annotated lines "
"(default: %(default)s)",
)
p.add_argument(
"cgout_filename",
nargs="+",
metavar="cachegrind-out-file",
help="file produced by Cachegrind",
)
# `args0` name used to avoid shadowing the global `args`, which pylint
# doesn't like.
args0 = p.parse_args(namespace=Args())
if args0.diff and len(args0.cgout_filename) != 2:
p.print_usage(file=sys.stderr)
die("argument --diff: requires exactly two Cachegrind output files")
return args0
# Args are stored in a global for easy access.
args = Args.parse()
# A single instance of this class is constructed, from `args` and the `events:`
# line in the cgout file.
class Events:
# The event names.
events: list[str]
# Equal to `len(self.events)`.
num_events: int
# The order in which we must traverse events for --show. Can be shorter
# than `events`.
show_events: list[str]
# Like `show_events`, but indices into `events`, rather than names.
show_indices: list[int]
# The order in which we must traverse events for --sort. Can be shorter
# than `events`.
sort_events: list[str]
# Like `sort_events`, but indices into `events`, rather than names.
sort_indices: list[int]
def __init__(self) -> None:
# All fields are left uninitialized here, and set instead in `init`.
pass
def init(self, text: str) -> None:
self.events = text.split()
self.num_events = len(self.events)
# A temporary dict mapping events to indices, [0, n-1].
event_indices = {event: n for n, event in enumerate(self.events)}
# If --show is given, check it is valid. If --show is not given,
# default to all events in the standard order.
if args.show:
for event in args.show:
if event not in event_indices:
die(f"--show event `{event}` did not appear in `events:` line")
self.show_events = args.show
else:
self.show_events = self.events
self.show_indices = [event_indices[event] for event in self.show_events]
# Likewise for --sort.
if args.sort:
for event in args.sort:
if event not in event_indices:
die(f"--sort event `{event}` did not appear in `events:` line")
self.sort_events = args.sort
else:
self.sort_events = self.events
self.sort_indices = [event_indices[event] for event in self.sort_events]
# Raises a `ValueError` exception on syntax error.
def mk_cc(self, str_counts: list[str]) -> Cc:
# This is slightly faster than a list comprehension.
counts = list(map(int, str_counts))
if len(counts) == self.num_events:
pass
elif len(counts) < self.num_events:
# Add zeroes at the end for any missing numbers.
counts.extend([0] * (self.num_events - len(counts)))
else:
raise ValueError
return counts
def mk_empty_cc(self) -> Cc:
# This is much faster than a list comprehension.
return [0] * self.num_events
def mk_empty_dcc(self) -> Dcc:
return Dcc(self.mk_empty_cc(), defaultdict(self.mk_empty_cc))
# A "cost centre", which is a dumb container for counts. Always the same length
# as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and
# `Events.mk_empty_cc` are used for construction.
#
# This used to be a class with a single field `counts: list[int]`, but this
# type is very hot and just using a type alias is much faster.
Cc = list[int]
# Add the counts in `a_cc` to `b_cc`.
def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None:
for i, a_count in enumerate(a_cc):
b_cc[i] += a_count
# Subtract the counts in `a_cc` from `b_cc`.
def sub_cc_from_cc(a_cc: Cc, b_cc: Cc) -> None:
for i, a_count in enumerate(a_cc):
b_cc[i] -= a_count
# Unrolled version of `add_cc_to_cc`, for speed.
def add_cc_to_ccs(
a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc
) -> None:
for i, a_count in enumerate(a_cc):
b_cc1[i] += a_count
b_cc2[i] += a_count
b_cc3[i] += a_count
b_cc4[i] += a_count
b_cc5[i] += a_count
total_cc[i] += a_count
# Unrolled version of `sub_cc_from_cc`, for speed. Note that the last one,
# `total_cc`, is added.
def sub_cc_from_ccs(
a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc
) -> None:
for i, a_count in enumerate(a_cc):
b_cc1[i] -= a_count
b_cc2[i] -= a_count
b_cc3[i] -= a_count
b_cc4[i] -= a_count
b_cc5[i] -= a_count
total_cc[i] += a_count
# Update `min_cc` and `max_cc` with `self`.
def update_cc_extremes(self: Cc, min_cc: Cc, max_cc: Cc) -> None:
for i, count in enumerate(self):
if count > max_cc[i]:
max_cc[i] = count
elif count < min_cc[i]:
min_cc[i] = count
# Note: some abbrevations used below:
# - Ofl/ofl: original filename, as mentioned in a cgout file.
# - Ofn/ofn: original function name, as mentioned in a cgout file.
# - Mfl/mfl: modified filename, the result of passing an Ofl through
# `--mod-filename`.
# - Mfn/mfn: modified function name, the result of passing an Ofn through
# `--mod-funcname`.
# - Mname/mname: modified name, used for what could be an Mfl or an Mfn.
# A deep cost centre with a dict for the inner mnames and CCs.
class Dcc:
outer_cc: Cc
inner_dict_mname_cc: DictMnameCc
def __init__(self, outer_cc: Cc, inner_dict_mname_cc: DictMnameCc) -> None:
self.outer_cc = outer_cc
self.inner_dict_mname_cc = inner_dict_mname_cc
# A deep cost centre with a list for the inner mnames and CCs. Used during
# filtering and sorting.
class Lcc:
outer_cc: Cc
inner_list_mname_cc: ListMnameCc
def __init__(self, outer_cc: Cc, inner_list_mname_cc: ListMnameCc) -> None:
self.outer_cc = outer_cc
self.inner_list_mname_cc = inner_list_mname_cc
# Per-Mfl/Mfn CCs. The list version is used during filtering and sorting.
DictMnameCc = DefaultDict[str, Cc]
ListMnameCc = list[tuple[str, Cc]]
# Per-Mfl/Mfn DCCs. The outer Mnames are Mfls and the inner Mnames are Mfns, or
# vice versa. The list version is used during filtering and sorting.
DictMnameDcc = DefaultDict[str, Dcc]
ListMnameLcc = list[tuple[str, Lcc]]
# Per-line CCs, organised by Mfl and line number.
DictLineCc = DefaultDict[int, Cc]
DictMflDictLineCc = DefaultDict[str, DictLineCc]
# A dictionary tracking how Ofls get mapped to Mfls by `--mod-filename`. If
# `--mod-filename` isn't used, each entry will be the identity mapping: ("foo"
# -> set(["foo"])).
DictMflOfls = DefaultDict[str, set[str]]
def read_cgout_file(
cgout_filename: str,
is_first_file: bool,
descs: list[str],
cmds: list[str],
events: Events,
dict_mfl_ofls: DictMflOfls,
dict_mfl_dcc: DictMnameDcc,
dict_mfn_dcc: DictMnameDcc,
dict_mfl_dict_line_cc: DictMflDictLineCc,
summary_cc: Cc,
) -> None:
# The file format is described in Cachegrind's manual.
try:
cgout_file = open(cgout_filename, "r", encoding="utf-8")
except OSError as err:
die(f"{err}")
with cgout_file:
cgout_line_num = 0
def parse_die(msg: str) -> NoReturn:
die(f"{cgout_file.name}:{cgout_line_num}: {msg}")
def readline() -> str:
nonlocal cgout_line_num
cgout_line_num += 1
return cgout_file.readline()
# Read "desc:" lines.
desc = ""
while line := readline():
if m := re.match(r"desc:\s+(.*)", line):
desc += m.group(1) + "\n"
else:
break
descs.append(desc)
# Read "cmd:" line. (`line` is already set from the "desc:" loop.)
if m := re.match(r"cmd:\s+(.*)", line):
cmds.append(m.group(1))
else:
parse_die("missing a `command:` line")
# Read "events:" line.
line = readline()
if m := re.match(r"events:\s+(.*)", line):
if is_first_file:
events.init(m.group(1))
else:
events2 = Events()
events2.init(m.group(1))
if events.events != events2.events:
die("events in data files don't match")
else:
parse_die("missing an `events:` line")
def mk_empty_dict_line_cc() -> DictLineCc:
return defaultdict(events.mk_empty_cc)
# The current Mfl and Mfn.
mfl = ""
mfn = ""
# These values are passed in by reference and are modified by this
# function. But they can't be properly initialized until the `events:`
# line of the first file is read and the number of events is known. So
# we initialize them in an invalid state in `main`, and then
# reinitialize them properly here, before their first use.
if is_first_file:
dict_mfl_dcc.default_factory = events.mk_empty_dcc
dict_mfn_dcc.default_factory = events.mk_empty_dcc
dict_mfl_dict_line_cc.default_factory = mk_empty_dict_line_cc
summary_cc.extend(events.mk_empty_cc())
# These are refs into the dicts above, used to avoid repeated lookups.
# They are all overwritten before first use.
mfl_dcc = events.mk_empty_dcc()
mfn_dcc = events.mk_empty_dcc()
mfl_dcc_inner_mfn_cc = events.mk_empty_cc()
mfn_dcc_inner_mfl_cc = events.mk_empty_cc()
dict_line_cc = mk_empty_dict_line_cc()
total_cc = events.mk_empty_cc()
# When diffing, we negate the first cgout file's counts to effectively
# achieve `cgout2 - cgout1`.
if args.diff and is_first_file:
combine_cc_with_cc = sub_cc_from_cc
combine_cc_with_ccs = sub_cc_from_ccs
else:
combine_cc_with_cc = add_cc_to_cc
combine_cc_with_ccs = add_cc_to_ccs
summary_cc_present = False
# Line matching is done in order of pattern frequency, for speed.
while line := readline():
if line[0].isdigit():
split_line = line.split()
try:
line_num = int(split_line[0])
cc = events.mk_cc(split_line[1:])
except ValueError:
parse_die("malformed or too many event counts")
# Record this CC at various levels.
combine_cc_with_ccs(
cc,
mfl_dcc.outer_cc,
mfn_dcc.outer_cc,
mfl_dcc_inner_mfn_cc,
mfn_dcc_inner_mfl_cc,
dict_line_cc[line_num],
total_cc,
)
elif line.startswith("fn="):
ofn = line[3:-1]
mfn = args.mod_funcname(ofn)
# `mfl_dcc` is unchanged.
mfn_dcc = dict_mfn_dcc[mfn]
mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn]
mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl]
elif line.startswith("fl="):
ofl = line[3:-1]
mfl = args.mod_filename(ofl)
dict_mfl_ofls[mfl].add(ofl)
# A `fn=` line should follow, overwriting the function name.
mfn = ""
mfl_dcc = dict_mfl_dcc[mfl]
mfn_dcc = dict_mfn_dcc[mfn]
mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn]
mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl]
dict_line_cc = dict_mfl_dict_line_cc[mfl]
elif m := re.match(r"summary:\s+(.*)", line):
summary_cc_present = True
try:
this_summary_cc = events.mk_cc(m.group(1).split())
combine_cc_with_cc(this_summary_cc, summary_cc)
# Check summary is correct. Note that `total_cc` doesn't
# get negated for the first file in a diff, unlike the
# other CCs, because it's only used here as a sanity check.
if this_summary_cc != total_cc:
msg = (
"`summary:` line doesn't match computed total\n"
f"- summary: {this_summary_cc}\n"
f"- computed: {total_cc}"
)
parse_die(msg)
except ValueError:
parse_die("malformed or too many event counts")
elif line == "\n" or line.startswith("#"):
# Skip empty lines and comment lines.
pass
else:
parse_die(f"malformed line: {line[:-1]}")
# Check if summary line was present.
if not summary_cc_present:
parse_die("missing `summary:` line, aborting")
# The width of a column, in three parts.
class Width:
# Width of the widest commified event count.
count: int
# Width of the widest first percentage, of the form ` (n.n%)` or ` (n.n%,`.
perc1: int
# Width of the widest second percentage, of the form ` n.n%)`.
perc2: int
def __init__(self, count: int, perc1: int, perc2: int) -> None:
self.count = count
self.perc1 = perc1
self.perc2 = perc2
class CcPrinter:
# Note: every `CcPrinter` gets the same `Events` object.
events: Events
# Note: every `CcPrinter` gets the same summary CC.
summary_cc: Cc
# String to print before the event names.
events_prefix: str
# The widths of each event column. For simplicity, its length matches
# `events.events`, even though not all events are necessarily shown.
widths: list[Width]
# Text of a missing CC, which can be computed in advance.
missing_cc_str: str
# Must call `init_ccs` or `init_list_mname_lcc` after this.
def __init__(self, events: Events, summary_cc: Cc) -> None:
self.events = events
self.summary_cc = summary_cc
# Other fields initialized in `init_*`.
def init_ccs(self, ccs: list[Cc]) -> None:
self.events_prefix = ""
# Find min and max count for each event. One of them will be the widest
# value.
min_cc = self.events.mk_empty_cc()
max_cc = self.events.mk_empty_cc()
for cc in ccs:
update_cc_extremes(cc, min_cc, max_cc)
self.init_widths(min_cc, max_cc, None, None)
def init_list_mname_lcc(self, list_mname_lcc: ListMnameLcc) -> None:
self.events_prefix = " "
cumul_cc = self.events.mk_empty_cc()
# Find min and max value for each event. One of them will be the widest
# value. Likewise for the cumulative counts.
min_cc = self.events.mk_empty_cc()
max_cc = self.events.mk_empty_cc()
min_cumul_cc = self.events.mk_empty_cc()
max_cumul_cc = self.events.mk_empty_cc()
for _, lcc in list_mname_lcc:
# Consider both outer and inner CCs for `count` and `perc1`.
update_cc_extremes(lcc.outer_cc, min_cc, max_cc)
for _, inner_cc in lcc.inner_list_mname_cc:
update_cc_extremes(inner_cc, min_cc, max_cc)
# Consider only outer CCs for `perc2`.
add_cc_to_cc(lcc.outer_cc, cumul_cc)
update_cc_extremes(cumul_cc, min_cumul_cc, max_cumul_cc)
self.init_widths(min_cc, max_cc, min_cumul_cc, max_cumul_cc)
def init_widths(
self, min_cc1: Cc, max_cc1: Cc, min_cc2: Cc | None, max_cc2: Cc | None
) -> None:
self.widths = [Width(0, 0, 0)] * self.events.num_events
for i in range(len(self.events.events)):
# Get count and percs widths of the min and max CCs.
(min_count, min_perc1, min_perc2) = self.count_and_percs_strs(
min_cc1, min_cc2, i
)
(max_count, max_perc1, max_perc2) = self.count_and_percs_strs(
max_cc1, max_cc2, i
)
self.widths[i] = Width(
max(len(min_count), len(max_count)),
max(len(min_perc1), len(max_perc1)),
max(len(min_perc2), len(max_perc2)),
)
self.missing_cc_str = ""
for i in self.events.show_indices:
self.missing_cc_str += self.count_and_percs_str(i, ".", "", "")
# Get the count and perc string for `cc1[i]` and the perc string for
# `cc2[i]`. (Unless `cc2` is `None`, in which case `perc2` will be "".)
def count_and_percs_strs(
self, cc1: Cc, cc2: Cc | None, i: int
) -> tuple[str, str, str]:
count = f"{cc1[i]:,d}" # commify
if args.show_percs:
summary_count = self.summary_cc[i]
if cc2 is None:
# A plain or inner CC, with a single percentage.
if cc1[i] == 0:
# Don't show percentages for "0" entries, it's just clutter.
perc1 = ""
elif summary_count == 0:
# Avoid dividing by zero.
perc1 = " (n/a)"
else:
perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%)"
perc2 = ""
else:
# An outer CC, with two percentages.
if summary_count == 0:
# Avoid dividing by zero.
perc1 = " (n/a,"
perc2 = " n/a)"
else:
perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%,"
perc2 = f" {cc2[i] * 100 / summary_count:.1f}%)"
else:
perc1 = ""
perc2 = ""
return (count, perc1, perc2)
def count_and_percs_str(self, i: int, count: str, perc1: str, perc2: str) -> str:
event_w = len(self.events.events[i])
count_w = self.widths[i].count
perc1_w = self.widths[i].perc1
perc2_w = self.widths[i].perc2
pre_w = max(0, event_w - count_w - perc1_w - perc2_w)
return f"{'':>{pre_w}}{count:>{count_w}}{perc1:>{perc1_w}}{perc2:>{perc2_w}} "
def print_events(self, suffix: str) -> None:
print(self.events_prefix, end="")
for i in self.events.show_indices:
event = self.events.events[i]
event_w = len(event)
count_w = self.widths[i].count
perc1_w = self.widths[i].perc1
perc2_w = self.widths[i].perc2
print(f"{event:_<{max(event_w, count_w + perc1_w + perc2_w)}} ", end="")
print(suffix)
def print_lcc(self, indent: str, lcc: Lcc, outer_mname: str, cumul_cc: Cc) -> None:
print(indent, end="")
if (
len(lcc.inner_list_mname_cc) == 1
and lcc.outer_cc == lcc.inner_list_mname_cc[0][1]
):
# There is only one inner CC, it met the threshold, and it is equal
# to the outer CC. Print the inner CC and outer CC in a single
# line, because they are the same.
inner_mname = lcc.inner_list_mname_cc[0][0]
self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:{inner_mname}")
else:
# There are multiple inner CCs, and at least one met the threshold.
# Print the outer CC and then the inner CCs, indented.
self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:")
for inner_mname, inner_cc in lcc.inner_list_mname_cc:
print(" ", end="")
self.print_cc(inner_cc, None, f" {inner_mname}")
print()
# If `cc2` is `None`, it's a vanilla CC or inner CC. Otherwise, it's an
# outer CC.
def print_cc(self, cc: Cc, cc2: Cc | None, suffix: str) -> None:
for i in self.events.show_indices:
(count, perc1, perc2) = self.count_and_percs_strs(cc, cc2, i)
print(self.count_and_percs_str(i, count, perc1, perc2), end="")
print("", suffix)
def print_missing_cc(self, suffix: str) -> None:
print(self.missing_cc_str, suffix)
# Used in various places in the output.
def print_fancy(text: str) -> None:
fancy = "-" * 80
print(fancy)
print("--", text)
print(fancy)
def print_metadata(descs: list[str], cmds: list[str], events: Events) -> None:
print_fancy("Metadata")
def all_the_same(strs: list[str]) -> bool:
for i in range(len(strs) - 1):
if strs[i] != strs[i + 1]:
return False
return True
print("Invocation: ", *sys.argv)
# When there are multiple descriptions, they are usually all the same. Only
# print the description once in that case.
if all_the_same(descs):
print(descs[0], end="")
else:
for i, desc in enumerate(descs):
print(f"Description {i+1}:")
print(desc, end="")
# Commands are sometimes the same, sometimes not. Always print them
# individually, but refer to the previous one when appropriate.
if len(cmds) == 1:
print("Command: ", cmds[0])
else:
for i, cmd in enumerate(cmds):
if i > 0 and cmds[i - 1] == cmd:
print(f"Command {i+1}: (same as Command {i})")
else:
print(f"Command {i+1}: ", cmd)
print("Events recorded: ", *events.events)
print("Events shown: ", *events.show_events)
print("Event sort order:", *events.sort_events)
print("Threshold: ", args.threshold, "%", sep="")
print("Annotation: ", "on" if args.annotate else "off")
print()
def print_summary(events: Events, summary_cc: Cc) -> None:
printer = CcPrinter(events, summary_cc)
printer.init_ccs([summary_cc])
print_fancy("Summary")
printer.print_events("")
print()
printer.print_cc(summary_cc, None, "PROGRAM TOTALS")
print()
def print_mname_summary(
kind: str, indent: str, events: Events, dict_mname_dcc: DictMnameDcc, summary_cc: Cc
) -> set[str]:
# The primary sort event is used for the threshold.
threshold_index = events.sort_indices[0]
# Convert the threshold from a percentage to an event count.
threshold = args.threshold * abs(summary_cc[threshold_index]) / 100
def meets_threshold(mname_and_cc: tuple[str, Cc]) -> bool:
cc = mname_and_cc[1]
return abs(cc[threshold_index]) >= threshold
# Create a list with the outer CC counts in sort order, so that
# left-to-right list comparison does the right thing. Plus the outer name
# at the end for deterministic output when all the event counts are
# identical in two CCs.
def key_mname_and_lcc(mname_and_lcc: tuple[str, Lcc]) -> tuple[list[int], str]:
(outer_mname, lcc) = mname_and_lcc
return (
[abs(lcc.outer_cc[i]) for i in events.sort_indices],
outer_mname,
)
# Similar to `key_mname_and_lcc`.
def key_mname_and_cc(mname_and_cc: tuple[str, Cc]) -> tuple[list[int], str]:
(mname, cc) = mname_and_cc
return ([abs(cc[i]) for i in events.sort_indices], mname)
# This is a `filter_map` operation, which Python doesn't directly support.
list_mname_lcc: ListMnameLcc = []
for outer_mname, dcc in dict_mname_dcc.items():
# Filter out inner CCs for which the primary sort event count is below the
# threshold, and sort the remainder.
inner_list_mname_cc = sorted(
filter(meets_threshold, dcc.inner_dict_mname_cc.items()),
key=key_mname_and_cc,
reverse=True,
)
# If no inner CCs meet the threshold, ignore the entire DCC, even if
# the outer CC meets the threshold.
if len(inner_list_mname_cc) == 0:
continue
list_mname_lcc.append((outer_mname, Lcc(dcc.outer_cc, inner_list_mname_cc)))
list_mname_lcc = sorted(list_mname_lcc, key=key_mname_and_lcc, reverse=True)
printer = CcPrinter(events, summary_cc)
printer.init_list_mname_lcc(list_mname_lcc)
print_fancy(kind + " summary")
printer.print_events(" " + kind.lower())
print()
# Print LCCs.
threshold_mnames = set([])
cumul_cc = events.mk_empty_cc()
for mname, lcc in list_mname_lcc:
add_cc_to_cc(lcc.outer_cc, cumul_cc)
printer.print_lcc(indent, lcc, mname, cumul_cc)
threshold_mnames.add(mname)
return threshold_mnames
class AnnotatedCcs:
line_nums_known_cc: Cc
line_nums_unknown_cc: Cc
non_identical_cc: Cc
unreadable_cc: Cc
below_threshold_cc: Cc
files_unknown_cc: Cc
labels = [
" annotated: files known & above threshold & readable, line numbers known",
" annotated: files known & above threshold & readable, line numbers unknown",
"unannotated: files known & above threshold & two or more non-identical",
"unannotated: files known & above threshold & unreadable ",
"unannotated: files known & below threshold",
"unannotated: files unknown",
]
def __init__(self, events: Events) -> None:
self.line_nums_known_cc = events.mk_empty_cc()
self.line_nums_unknown_cc = events.mk_empty_cc()
self.non_identical_cc = events.mk_empty_cc()
self.unreadable_cc = events.mk_empty_cc()
self.below_threshold_cc = events.mk_empty_cc()
self.files_unknown_cc = events.mk_empty_cc()
def ccs(self) -> list[Cc]:
return [
self.line_nums_known_cc,
self.line_nums_unknown_cc,
self.non_identical_cc,
self.unreadable_cc,
self.below_threshold_cc,
self.files_unknown_cc,
]
def mk_warning(msg: str) -> str:
return f"""\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
{msg}\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
"""
def warn_ofls_are_all_newer(ofls: list[str], cgout_filename: str) -> None:
s = "".join([f"@ - {ofl}\n" for ofl in ofls])
msg = f"""\
@ Original source files are all newer than data file '{cgout_filename}':
{s}@ Annotations may not be correct.
"""
print(mk_warning(msg))
def warn_bogus_lines(src_filename: str) -> None:
msg = f"""\
@@ Information recorded about lines past the end of '{src_filename}'.
"""
print(mk_warning(msg), end="")
def print_annotated_src_file(
events: Events,
dict_line_cc: DictLineCc,
src_file: TextIO,
annotated_ccs: AnnotatedCcs,
summary_cc: Cc,
) -> None:
printer = CcPrinter(events, summary_cc)
printer.init_ccs(list(dict_line_cc.values()))
# The starting fancy has already been printed by the caller.
printer.print_events("")
print()
# The CC for line 0 is special, holding counts attributed to the source
# file but not to any particular line (due to incomplete debug info).
# Annotate the start of the file with this info, if present.
line0_cc = dict_line_cc.pop(0, None)
if line0_cc:
suffix = ""
printer.print_cc(line0_cc, None, suffix)
add_cc_to_cc(line0_cc, annotated_ccs.line_nums_unknown_cc)
print()
# Find interesting line ranges: all lines with a CC, and all lines within
# `args.context` lines of a line with a CC.
line_nums = list(sorted(dict_line_cc.keys()))
pairs: list[tuple[int, int]] = []
n = len(line_nums)
i = 0
context = args.context
while i < n:
lo = max(line_nums[i] - context, 1) # `max` to prevent negatives
while i < n - 1 and line_nums[i] + 2 * context >= line_nums[i + 1]:
i += 1
hi = line_nums[i] + context
pairs.append((lo, hi))
i += 1
def print_lines(pairs: list[tuple[int, int]]) -> None:
line_num = 0
while pairs:
src_line = ""
(lo, hi) = pairs.pop(0)
while line_num < lo - 1:
src_line = src_file.readline()
line_num += 1
if not src_line:
return # EOF
# Print line number, unless start of file.
if lo != 1:
print("-- line", lo, "-" * 40)
while line_num < hi:
src_line = src_file.readline()
line_num += 1
if not src_line:
return # EOF
if line_nums and line_num == line_nums[0]:
printer.print_cc(dict_line_cc[line_num], None, src_line[:-1])
add_cc_to_cc(
dict_line_cc[line_num], annotated_ccs.line_nums_known_cc
)
del line_nums[0]
else:
printer.print_missing_cc(src_line[:-1])
# Print line number.
print("-- line", hi, "-" * 40)
# Annotate chosen lines, tracking total annotated counts.
if pairs:
print_lines(pairs)
# If there was info on lines past the end of the file, warn.
if line_nums:
print()
for line_num in line_nums:
printer.print_cc(
dict_line_cc[line_num], None, f""
)
add_cc_to_cc(dict_line_cc[line_num], annotated_ccs.line_nums_known_cc)
print()
warn_bogus_lines(src_file.name)
print()
# This partially consumes `dict_mfl_dict_line_cc`, and fully consumes
# `dict_mfl_olfs`.
def print_annotated_src_files(
ann_mfls: set[str],
events: Events,
dict_mfl_ofls: DictMflOfls,
dict_mfl_dict_line_cc: DictMflDictLineCc,
summary_cc: Cc,
) -> AnnotatedCcs:
annotated_ccs = AnnotatedCcs(events)
def add_dict_line_cc_to_cc(dict_line_cc: DictLineCc, accum_cc: Cc) -> None:
for line_cc in dict_line_cc.values():
add_cc_to_cc(line_cc, accum_cc)
# Exclude the unknown ("???") file, which is unannotatable.
ann_mfls.discard("???")
if "???" in dict_mfl_dict_line_cc:
dict_line_cc = dict_mfl_dict_line_cc.pop("???")
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.files_unknown_cc)
def print_ann_fancy(mfl: str) -> None:
print_fancy(f"Annotated source file: {mfl}")
# This can raise an `OSError`.
def all_ofl_contents_identical(ofls: list[str]) -> bool:
for i in range(len(ofls) - 1):
if not filecmp.cmp(ofls[i], ofls[i + 1], shallow=False):
return False
return True
for mfl in sorted(ann_mfls):
ofls = sorted(dict_mfl_ofls.pop(mfl))
first_ofl = ofls[0]
try:
if all_ofl_contents_identical(ofls):
# All the Ofls that map to this Mfl are identical, which means we
# can annotate, and it doesn't matter which Ofl we use.
with open(first_ofl, "r", encoding="utf-8") as src_file:
dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
print_ann_fancy(mfl)
# Because all the Ofls are identical, we can treat their
# mtimes as if they are all as early as the earliest one.
# Therefore, we warn only if the earliest source file is
# more recent than the cgout file.
min_ofl_st_mtime_ns = min(
[os.stat(ofl).st_mtime_ns for ofl in ofls]
)
for cgout_filename in args.cgout_filename:
if min_ofl_st_mtime_ns > os.stat(cgout_filename).st_mtime_ns:
warn_ofls_are_all_newer(ofls, cgout_filename)
print_annotated_src_file(
events,
dict_line_cc,
src_file,
annotated_ccs,
summary_cc,
)
else:
dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.non_identical_cc)
# We could potentially do better here.
# - Annotate until the first line where the src files diverge.
# - Also, heuristic resyncing, e.g. by looking for matching
# lines (of sufficient complexity) after a divergence.
print_ann_fancy(mfl)
print(
"Unannotated because two or more of these original files are not "
"identical:",
*ofls,
sep="\n- ",
)
print()
except OSError:
dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.unreadable_cc)
print_ann_fancy(mfl)
print(
"Unannotated because one or more of these original files are "
"unreadable:",
*ofls,
sep="\n- ",
)
print()
# Sum the CCs remaining in `dict_mfl_dict_line_cc`, which are all in files
# below the threshold.
for dict_line_cc in dict_mfl_dict_line_cc.values():
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.below_threshold_cc)
return annotated_ccs
def print_annotation_summary(
events: Events,
annotated_ccs: AnnotatedCcs,
summary_cc: Cc,
) -> None:
# Show how many events were covered by annotated lines above.
printer = CcPrinter(events, summary_cc)
printer.init_ccs(annotated_ccs.ccs())
print_fancy("Annotation summary")
printer.print_events("")
print()
total_cc = events.mk_empty_cc()
for (cc, label) in zip(annotated_ccs.ccs(), AnnotatedCcs.labels):
printer.print_cc(cc, None, label)
add_cc_to_cc(cc, total_cc)
print()
# Internal sanity check.
if summary_cc != total_cc:
msg = (
"`summary:` line doesn't match computed annotated counts\n"
f"- summary: {summary_cc}\n"
f"- annotated: {total_cc}"
)
die(msg)
def main() -> None:
# Metadata, initialized to empty states.
descs: list[str] = []
cmds: list[str] = []
events = Events()
# For tracking original filenames to modified filenames.
dict_mfl_ofls: DictMflOfls = defaultdict(set)
# Different places where we accumulate CC data. Initialized to invalid
# states prior to the number of events being known.
dict_mfl_dcc: DictMnameDcc = defaultdict(None)
dict_mfn_dcc: DictMnameDcc = defaultdict(None)
dict_mfl_dict_line_cc: DictMflDictLineCc = defaultdict(None)
summary_cc: Cc = []
for n, filename in enumerate(args.cgout_filename):
is_first_file = n == 0
read_cgout_file(
filename,
is_first_file,
descs,
cmds,
events,
dict_mfl_ofls,
dict_mfl_dcc,
dict_mfn_dcc,
dict_mfl_dict_line_cc,
summary_cc,
)
# Each of the following calls prints a section of the output.
print_metadata(descs, cmds, events)
print_summary(events, summary_cc)
ann_mfls = print_mname_summary(
"File:function", "< ", events, dict_mfl_dcc, summary_cc
)
print_mname_summary("Function:file", "> ", events, dict_mfn_dcc, summary_cc)
if args.annotate:
annotated_ccs = print_annotated_src_files(
ann_mfls, events, dict_mfl_ofls, dict_mfl_dict_line_cc, summary_cc
)
print_annotation_summary(events, annotated_ccs, summary_cc)
if __name__ == "__main__":
main()