#!/usr/bin/python3 # pyright: strict # -------------------------------------------------------------------- # --- Cachegrind's annotator. cg_annotate.in --- # -------------------------------------------------------------------- # This file is part of Cachegrind, a Valgrind tool for cache # profiling programs. # # Copyright (C) 2002-2023 Nicholas Nethercote # njn@valgrind.org # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . # # The GNU General Public License is contained in the file COPYING. # This script reads Cachegrind output files and produces human-readable output. # # Use `make pyann` to "build" this script with `auxprogs/pybuild.rs` every time # it is changed. This runs the formatters, type-checkers, and linters on # `cg_annotate.in` and then generates `cg_annotate`. from __future__ import annotations import filecmp import os import re import sys from argparse import ArgumentParser, BooleanOptionalAction, Namespace from collections import defaultdict from typing import Callable, DefaultDict, NoReturn, TextIO def die(msg: str) -> NoReturn: print("cg_annotate: error:", msg, file=sys.stderr) sys.exit(1) SearchAndReplace = Callable[[str], str] # A typed wrapper for parsed args. class Args(Namespace): # None of these fields are modified after arg parsing finishes. diff: bool mod_filename: SearchAndReplace mod_funcname: SearchAndReplace show: list[str] sort: list[str] threshold: float # a percentage show_percs: bool annotate: bool context: int cgout_filename: list[str] @staticmethod def parse() -> Args: # We support Perl-style `s/old/new/flags` search-and-replace # expressions, because that's how this option was implemented in the # old Perl version of `cg_diff`. This requires conversion from # `s/old/new/` style to `re.sub`. The conversion isn't a perfect # emulation of Perl regexps (e.g. Python uses `\1` rather than `$1` for # using captures in the `new` part), but it should be close enough. The # only supported flags are `g` (global) and `i` (ignore case). def search_and_replace(regex: str | None) -> SearchAndReplace: if regex is None: return lambda s: s # Extract the parts of an `s/old/new/tail` regex. `(? list[str]: return values.split(",") def threshold(n: str) -> float: f = float(n) if 0 <= f <= 20: return f raise ValueError # Add a bool argument that defaults to true. # # Supports these forms: `--foo`, `--no-foo`, `--foo=yes`, `--foo=no`. # The latter two were the forms supported by the old Perl version of # `cg_annotate`, and are now deprecated. def add_bool_argument( p: ArgumentParser, new_name: str, old_name: str, help_: str ) -> None: new_flag = "--" + new_name old_flag = "--" + old_name dest = new_name.replace("-", "_") # Note: the default value is always printed with `BooleanOptionalAction`, # due to an argparse bug: https://github.com/python/cpython/issues/83137. p.add_argument( new_flag, default=True, action=BooleanOptionalAction, help=help_, ) p.add_argument( f"{old_flag}=yes", dest=dest, action="store_true", help=f"(deprecated) same as --{new_name}", ) p.add_argument( f"{old_flag}=no", dest=dest, action="store_false", help=f"(deprecated) same as --no-{new_name}", ) p = ArgumentParser(description="Process one or more Cachegrind output files.") p.add_argument("--version", action="version", version="%(prog)s-3.21.0") p.add_argument( "--diff", default=False, action="store_true", help="perform a diff between two Cachegrind output files", ) p.add_argument( "--mod-filename", type=search_and_replace, metavar="REGEX", default=search_and_replace(None), help="a search-and-replace regex applied to filenames, e.g. " "`s/prog[0-9]/progN/`", ) p.add_argument( "--mod-funcname", type=search_and_replace, metavar="REGEX", default=search_and_replace(None), help="like --mod-filename, but for function names", ) p.add_argument( "--show", type=comma_separated_list, metavar="A,B,C", help="only show figures for events A,B,C (default: all events)", ) p.add_argument( "--sort", type=comma_separated_list, metavar="A,B,C", help="sort functions by events A,B,C (default: event column order)", ) p.add_argument( "--threshold", type=threshold, default=0.1, metavar="N:[0,20]", help="only show file:function/function:file pairs with more than " "N%% of primary sort event counts (default: %(default)s)", ) add_bool_argument( p, "show-percs", "show-percs", "show a percentage for each non-zero count", ) add_bool_argument( p, "annotate", "auto", "annotate all source files containing functions that reached the " "event count threshold", ) p.add_argument( "--context", type=int, default=8, metavar="N", help="print N lines of context before and after annotated lines " "(default: %(default)s)", ) p.add_argument( "cgout_filename", nargs="+", metavar="cachegrind-out-file", help="file produced by Cachegrind", ) # `args0` name used to avoid shadowing the global `args`, which pylint # doesn't like. args0 = p.parse_args(namespace=Args()) if args0.diff and len(args0.cgout_filename) != 2: p.print_usage(file=sys.stderr) die("argument --diff: requires exactly two Cachegrind output files") return args0 # Args are stored in a global for easy access. args = Args.parse() # A single instance of this class is constructed, from `args` and the `events:` # line in the cgout file. class Events: # The event names. events: list[str] # Equal to `len(self.events)`. num_events: int # The order in which we must traverse events for --show. Can be shorter # than `events`. show_events: list[str] # Like `show_events`, but indices into `events`, rather than names. show_indices: list[int] # The order in which we must traverse events for --sort. Can be shorter # than `events`. sort_events: list[str] # Like `sort_events`, but indices into `events`, rather than names. sort_indices: list[int] def __init__(self) -> None: # All fields are left uninitialized here, and set instead in `init`. pass def init(self, text: str) -> None: self.events = text.split() self.num_events = len(self.events) # A temporary dict mapping events to indices, [0, n-1]. event_indices = {event: n for n, event in enumerate(self.events)} # If --show is given, check it is valid. If --show is not given, # default to all events in the standard order. if args.show: for event in args.show: if event not in event_indices: die(f"--show event `{event}` did not appear in `events:` line") self.show_events = args.show else: self.show_events = self.events self.show_indices = [event_indices[event] for event in self.show_events] # Likewise for --sort. if args.sort: for event in args.sort: if event not in event_indices: die(f"--sort event `{event}` did not appear in `events:` line") self.sort_events = args.sort else: self.sort_events = self.events self.sort_indices = [event_indices[event] for event in self.sort_events] # Raises a `ValueError` exception on syntax error. def mk_cc(self, str_counts: list[str]) -> Cc: # This is slightly faster than a list comprehension. counts = list(map(int, str_counts)) if len(counts) == self.num_events: pass elif len(counts) < self.num_events: # Add zeroes at the end for any missing numbers. counts.extend([0] * (self.num_events - len(counts))) else: raise ValueError return counts def mk_empty_cc(self) -> Cc: # This is much faster than a list comprehension. return [0] * self.num_events def mk_empty_dcc(self) -> Dcc: return Dcc(self.mk_empty_cc(), defaultdict(self.mk_empty_cc)) # A "cost centre", which is a dumb container for counts. Always the same length # as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and # `Events.mk_empty_cc` are used for construction. # # This used to be a class with a single field `counts: list[int]`, but this # type is very hot and just using a type alias is much faster. Cc = list[int] # Add the counts in `a_cc` to `b_cc`. def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None: for i, a_count in enumerate(a_cc): b_cc[i] += a_count # Subtract the counts in `a_cc` from `b_cc`. def sub_cc_from_cc(a_cc: Cc, b_cc: Cc) -> None: for i, a_count in enumerate(a_cc): b_cc[i] -= a_count # Unrolled version of `add_cc_to_cc`, for speed. def add_cc_to_ccs( a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc ) -> None: for i, a_count in enumerate(a_cc): b_cc1[i] += a_count b_cc2[i] += a_count b_cc3[i] += a_count b_cc4[i] += a_count b_cc5[i] += a_count total_cc[i] += a_count # Unrolled version of `sub_cc_from_cc`, for speed. Note that the last one, # `total_cc`, is added. def sub_cc_from_ccs( a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc ) -> None: for i, a_count in enumerate(a_cc): b_cc1[i] -= a_count b_cc2[i] -= a_count b_cc3[i] -= a_count b_cc4[i] -= a_count b_cc5[i] -= a_count total_cc[i] += a_count # Update `min_cc` and `max_cc` with `self`. def update_cc_extremes(self: Cc, min_cc: Cc, max_cc: Cc) -> None: for i, count in enumerate(self): if count > max_cc[i]: max_cc[i] = count elif count < min_cc[i]: min_cc[i] = count # Note: some abbrevations used below: # - Ofl/ofl: original filename, as mentioned in a cgout file. # - Ofn/ofn: original function name, as mentioned in a cgout file. # - Mfl/mfl: modified filename, the result of passing an Ofl through # `--mod-filename`. # - Mfn/mfn: modified function name, the result of passing an Ofn through # `--mod-funcname`. # - Mname/mname: modified name, used for what could be an Mfl or an Mfn. # A deep cost centre with a dict for the inner mnames and CCs. class Dcc: outer_cc: Cc inner_dict_mname_cc: DictMnameCc def __init__(self, outer_cc: Cc, inner_dict_mname_cc: DictMnameCc) -> None: self.outer_cc = outer_cc self.inner_dict_mname_cc = inner_dict_mname_cc # A deep cost centre with a list for the inner mnames and CCs. Used during # filtering and sorting. class Lcc: outer_cc: Cc inner_list_mname_cc: ListMnameCc def __init__(self, outer_cc: Cc, inner_list_mname_cc: ListMnameCc) -> None: self.outer_cc = outer_cc self.inner_list_mname_cc = inner_list_mname_cc # Per-Mfl/Mfn CCs. The list version is used during filtering and sorting. DictMnameCc = DefaultDict[str, Cc] ListMnameCc = list[tuple[str, Cc]] # Per-Mfl/Mfn DCCs. The outer Mnames are Mfls and the inner Mnames are Mfns, or # vice versa. The list version is used during filtering and sorting. DictMnameDcc = DefaultDict[str, Dcc] ListMnameLcc = list[tuple[str, Lcc]] # Per-line CCs, organised by Mfl and line number. DictLineCc = DefaultDict[int, Cc] DictMflDictLineCc = DefaultDict[str, DictLineCc] # A dictionary tracking how Ofls get mapped to Mfls by `--mod-filename`. If # `--mod-filename` isn't used, each entry will be the identity mapping: ("foo" # -> set(["foo"])). DictMflOfls = DefaultDict[str, set[str]] def read_cgout_file( cgout_filename: str, is_first_file: bool, descs: list[str], cmds: list[str], events: Events, dict_mfl_ofls: DictMflOfls, dict_mfl_dcc: DictMnameDcc, dict_mfn_dcc: DictMnameDcc, dict_mfl_dict_line_cc: DictMflDictLineCc, summary_cc: Cc, ) -> None: # The file format is described in Cachegrind's manual. try: cgout_file = open(cgout_filename, "r", encoding="utf-8") except OSError as err: die(f"{err}") with cgout_file: cgout_line_num = 0 def parse_die(msg: str) -> NoReturn: die(f"{cgout_file.name}:{cgout_line_num}: {msg}") def readline() -> str: nonlocal cgout_line_num cgout_line_num += 1 return cgout_file.readline() # Read "desc:" lines. desc = "" while line := readline(): if m := re.match(r"desc:\s+(.*)", line): desc += m.group(1) + "\n" else: break descs.append(desc) # Read "cmd:" line. (`line` is already set from the "desc:" loop.) if m := re.match(r"cmd:\s+(.*)", line): cmds.append(m.group(1)) else: parse_die("missing a `command:` line") # Read "events:" line. line = readline() if m := re.match(r"events:\s+(.*)", line): if is_first_file: events.init(m.group(1)) else: events2 = Events() events2.init(m.group(1)) if events.events != events2.events: die("events in data files don't match") else: parse_die("missing an `events:` line") def mk_empty_dict_line_cc() -> DictLineCc: return defaultdict(events.mk_empty_cc) # The current Mfl and Mfn. mfl = "" mfn = "" # These values are passed in by reference and are modified by this # function. But they can't be properly initialized until the `events:` # line of the first file is read and the number of events is known. So # we initialize them in an invalid state in `main`, and then # reinitialize them properly here, before their first use. if is_first_file: dict_mfl_dcc.default_factory = events.mk_empty_dcc dict_mfn_dcc.default_factory = events.mk_empty_dcc dict_mfl_dict_line_cc.default_factory = mk_empty_dict_line_cc summary_cc.extend(events.mk_empty_cc()) # These are refs into the dicts above, used to avoid repeated lookups. # They are all overwritten before first use. mfl_dcc = events.mk_empty_dcc() mfn_dcc = events.mk_empty_dcc() mfl_dcc_inner_mfn_cc = events.mk_empty_cc() mfn_dcc_inner_mfl_cc = events.mk_empty_cc() dict_line_cc = mk_empty_dict_line_cc() total_cc = events.mk_empty_cc() # When diffing, we negate the first cgout file's counts to effectively # achieve `cgout2 - cgout1`. if args.diff and is_first_file: combine_cc_with_cc = sub_cc_from_cc combine_cc_with_ccs = sub_cc_from_ccs else: combine_cc_with_cc = add_cc_to_cc combine_cc_with_ccs = add_cc_to_ccs summary_cc_present = False # Line matching is done in order of pattern frequency, for speed. while line := readline(): if line[0].isdigit(): split_line = line.split() try: line_num = int(split_line[0]) cc = events.mk_cc(split_line[1:]) except ValueError: parse_die("malformed or too many event counts") # Record this CC at various levels. combine_cc_with_ccs( cc, mfl_dcc.outer_cc, mfn_dcc.outer_cc, mfl_dcc_inner_mfn_cc, mfn_dcc_inner_mfl_cc, dict_line_cc[line_num], total_cc, ) elif line.startswith("fn="): ofn = line[3:-1] mfn = args.mod_funcname(ofn) # `mfl_dcc` is unchanged. mfn_dcc = dict_mfn_dcc[mfn] mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn] mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl] elif line.startswith("fl="): ofl = line[3:-1] mfl = args.mod_filename(ofl) dict_mfl_ofls[mfl].add(ofl) # A `fn=` line should follow, overwriting the function name. mfn = "" mfl_dcc = dict_mfl_dcc[mfl] mfn_dcc = dict_mfn_dcc[mfn] mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn] mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl] dict_line_cc = dict_mfl_dict_line_cc[mfl] elif m := re.match(r"summary:\s+(.*)", line): summary_cc_present = True try: this_summary_cc = events.mk_cc(m.group(1).split()) combine_cc_with_cc(this_summary_cc, summary_cc) # Check summary is correct. Note that `total_cc` doesn't # get negated for the first file in a diff, unlike the # other CCs, because it's only used here as a sanity check. if this_summary_cc != total_cc: msg = ( "`summary:` line doesn't match computed total\n" f"- summary: {this_summary_cc}\n" f"- computed: {total_cc}" ) parse_die(msg) except ValueError: parse_die("malformed or too many event counts") elif line == "\n" or line.startswith("#"): # Skip empty lines and comment lines. pass else: parse_die(f"malformed line: {line[:-1]}") # Check if summary line was present. if not summary_cc_present: parse_die("missing `summary:` line, aborting") # The width of a column, in three parts. class Width: # Width of the widest commified event count. count: int # Width of the widest first percentage, of the form ` (n.n%)` or ` (n.n%,`. perc1: int # Width of the widest second percentage, of the form ` n.n%)`. perc2: int def __init__(self, count: int, perc1: int, perc2: int) -> None: self.count = count self.perc1 = perc1 self.perc2 = perc2 class CcPrinter: # Note: every `CcPrinter` gets the same `Events` object. events: Events # Note: every `CcPrinter` gets the same summary CC. summary_cc: Cc # String to print before the event names. events_prefix: str # The widths of each event column. For simplicity, its length matches # `events.events`, even though not all events are necessarily shown. widths: list[Width] # Text of a missing CC, which can be computed in advance. missing_cc_str: str # Must call `init_ccs` or `init_list_mname_lcc` after this. def __init__(self, events: Events, summary_cc: Cc) -> None: self.events = events self.summary_cc = summary_cc # Other fields initialized in `init_*`. def init_ccs(self, ccs: list[Cc]) -> None: self.events_prefix = "" # Find min and max count for each event. One of them will be the widest # value. min_cc = self.events.mk_empty_cc() max_cc = self.events.mk_empty_cc() for cc in ccs: update_cc_extremes(cc, min_cc, max_cc) self.init_widths(min_cc, max_cc, None, None) def init_list_mname_lcc(self, list_mname_lcc: ListMnameLcc) -> None: self.events_prefix = " " cumul_cc = self.events.mk_empty_cc() # Find min and max value for each event. One of them will be the widest # value. Likewise for the cumulative counts. min_cc = self.events.mk_empty_cc() max_cc = self.events.mk_empty_cc() min_cumul_cc = self.events.mk_empty_cc() max_cumul_cc = self.events.mk_empty_cc() for _, lcc in list_mname_lcc: # Consider both outer and inner CCs for `count` and `perc1`. update_cc_extremes(lcc.outer_cc, min_cc, max_cc) for _, inner_cc in lcc.inner_list_mname_cc: update_cc_extremes(inner_cc, min_cc, max_cc) # Consider only outer CCs for `perc2`. add_cc_to_cc(lcc.outer_cc, cumul_cc) update_cc_extremes(cumul_cc, min_cumul_cc, max_cumul_cc) self.init_widths(min_cc, max_cc, min_cumul_cc, max_cumul_cc) def init_widths( self, min_cc1: Cc, max_cc1: Cc, min_cc2: Cc | None, max_cc2: Cc | None ) -> None: self.widths = [Width(0, 0, 0)] * self.events.num_events for i in range(len(self.events.events)): # Get count and percs widths of the min and max CCs. (min_count, min_perc1, min_perc2) = self.count_and_percs_strs( min_cc1, min_cc2, i ) (max_count, max_perc1, max_perc2) = self.count_and_percs_strs( max_cc1, max_cc2, i ) self.widths[i] = Width( max(len(min_count), len(max_count)), max(len(min_perc1), len(max_perc1)), max(len(min_perc2), len(max_perc2)), ) self.missing_cc_str = "" for i in self.events.show_indices: self.missing_cc_str += self.count_and_percs_str(i, ".", "", "") # Get the count and perc string for `cc1[i]` and the perc string for # `cc2[i]`. (Unless `cc2` is `None`, in which case `perc2` will be "".) def count_and_percs_strs( self, cc1: Cc, cc2: Cc | None, i: int ) -> tuple[str, str, str]: count = f"{cc1[i]:,d}" # commify if args.show_percs: summary_count = self.summary_cc[i] if cc2 is None: # A plain or inner CC, with a single percentage. if cc1[i] == 0: # Don't show percentages for "0" entries, it's just clutter. perc1 = "" elif summary_count == 0: # Avoid dividing by zero. perc1 = " (n/a)" else: perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%)" perc2 = "" else: # An outer CC, with two percentages. if summary_count == 0: # Avoid dividing by zero. perc1 = " (n/a," perc2 = " n/a)" else: perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%," perc2 = f" {cc2[i] * 100 / summary_count:.1f}%)" else: perc1 = "" perc2 = "" return (count, perc1, perc2) def count_and_percs_str(self, i: int, count: str, perc1: str, perc2: str) -> str: event_w = len(self.events.events[i]) count_w = self.widths[i].count perc1_w = self.widths[i].perc1 perc2_w = self.widths[i].perc2 pre_w = max(0, event_w - count_w - perc1_w - perc2_w) return f"{'':>{pre_w}}{count:>{count_w}}{perc1:>{perc1_w}}{perc2:>{perc2_w}} " def print_events(self, suffix: str) -> None: print(self.events_prefix, end="") for i in self.events.show_indices: event = self.events.events[i] event_w = len(event) count_w = self.widths[i].count perc1_w = self.widths[i].perc1 perc2_w = self.widths[i].perc2 print(f"{event:_<{max(event_w, count_w + perc1_w + perc2_w)}} ", end="") print(suffix) def print_lcc(self, indent: str, lcc: Lcc, outer_mname: str, cumul_cc: Cc) -> None: print(indent, end="") if ( len(lcc.inner_list_mname_cc) == 1 and lcc.outer_cc == lcc.inner_list_mname_cc[0][1] ): # There is only one inner CC, it met the threshold, and it is equal # to the outer CC. Print the inner CC and outer CC in a single # line, because they are the same. inner_mname = lcc.inner_list_mname_cc[0][0] self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:{inner_mname}") else: # There are multiple inner CCs, and at least one met the threshold. # Print the outer CC and then the inner CCs, indented. self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:") for inner_mname, inner_cc in lcc.inner_list_mname_cc: print(" ", end="") self.print_cc(inner_cc, None, f" {inner_mname}") print() # If `cc2` is `None`, it's a vanilla CC or inner CC. Otherwise, it's an # outer CC. def print_cc(self, cc: Cc, cc2: Cc | None, suffix: str) -> None: for i in self.events.show_indices: (count, perc1, perc2) = self.count_and_percs_strs(cc, cc2, i) print(self.count_and_percs_str(i, count, perc1, perc2), end="") print("", suffix) def print_missing_cc(self, suffix: str) -> None: print(self.missing_cc_str, suffix) # Used in various places in the output. def print_fancy(text: str) -> None: fancy = "-" * 80 print(fancy) print("--", text) print(fancy) def print_metadata(descs: list[str], cmds: list[str], events: Events) -> None: print_fancy("Metadata") def all_the_same(strs: list[str]) -> bool: for i in range(len(strs) - 1): if strs[i] != strs[i + 1]: return False return True print("Invocation: ", *sys.argv) # When there are multiple descriptions, they are usually all the same. Only # print the description once in that case. if all_the_same(descs): print(descs[0], end="") else: for i, desc in enumerate(descs): print(f"Description {i+1}:") print(desc, end="") # Commands are sometimes the same, sometimes not. Always print them # individually, but refer to the previous one when appropriate. if len(cmds) == 1: print("Command: ", cmds[0]) else: for i, cmd in enumerate(cmds): if i > 0 and cmds[i - 1] == cmd: print(f"Command {i+1}: (same as Command {i})") else: print(f"Command {i+1}: ", cmd) print("Events recorded: ", *events.events) print("Events shown: ", *events.show_events) print("Event sort order:", *events.sort_events) print("Threshold: ", args.threshold, "%", sep="") print("Annotation: ", "on" if args.annotate else "off") print() def print_summary(events: Events, summary_cc: Cc) -> None: printer = CcPrinter(events, summary_cc) printer.init_ccs([summary_cc]) print_fancy("Summary") printer.print_events("") print() printer.print_cc(summary_cc, None, "PROGRAM TOTALS") print() def print_mname_summary( kind: str, indent: str, events: Events, dict_mname_dcc: DictMnameDcc, summary_cc: Cc ) -> set[str]: # The primary sort event is used for the threshold. threshold_index = events.sort_indices[0] # Convert the threshold from a percentage to an event count. threshold = args.threshold * abs(summary_cc[threshold_index]) / 100 def meets_threshold(mname_and_cc: tuple[str, Cc]) -> bool: cc = mname_and_cc[1] return abs(cc[threshold_index]) >= threshold # Create a list with the outer CC counts in sort order, so that # left-to-right list comparison does the right thing. Plus the outer name # at the end for deterministic output when all the event counts are # identical in two CCs. def key_mname_and_lcc(mname_and_lcc: tuple[str, Lcc]) -> tuple[list[int], str]: (outer_mname, lcc) = mname_and_lcc return ( [abs(lcc.outer_cc[i]) for i in events.sort_indices], outer_mname, ) # Similar to `key_mname_and_lcc`. def key_mname_and_cc(mname_and_cc: tuple[str, Cc]) -> tuple[list[int], str]: (mname, cc) = mname_and_cc return ([abs(cc[i]) for i in events.sort_indices], mname) # This is a `filter_map` operation, which Python doesn't directly support. list_mname_lcc: ListMnameLcc = [] for outer_mname, dcc in dict_mname_dcc.items(): # Filter out inner CCs for which the primary sort event count is below the # threshold, and sort the remainder. inner_list_mname_cc = sorted( filter(meets_threshold, dcc.inner_dict_mname_cc.items()), key=key_mname_and_cc, reverse=True, ) # If no inner CCs meet the threshold, ignore the entire DCC, even if # the outer CC meets the threshold. if len(inner_list_mname_cc) == 0: continue list_mname_lcc.append((outer_mname, Lcc(dcc.outer_cc, inner_list_mname_cc))) list_mname_lcc = sorted(list_mname_lcc, key=key_mname_and_lcc, reverse=True) printer = CcPrinter(events, summary_cc) printer.init_list_mname_lcc(list_mname_lcc) print_fancy(kind + " summary") printer.print_events(" " + kind.lower()) print() # Print LCCs. threshold_mnames = set([]) cumul_cc = events.mk_empty_cc() for mname, lcc in list_mname_lcc: add_cc_to_cc(lcc.outer_cc, cumul_cc) printer.print_lcc(indent, lcc, mname, cumul_cc) threshold_mnames.add(mname) return threshold_mnames class AnnotatedCcs: line_nums_known_cc: Cc line_nums_unknown_cc: Cc non_identical_cc: Cc unreadable_cc: Cc below_threshold_cc: Cc files_unknown_cc: Cc labels = [ " annotated: files known & above threshold & readable, line numbers known", " annotated: files known & above threshold & readable, line numbers unknown", "unannotated: files known & above threshold & two or more non-identical", "unannotated: files known & above threshold & unreadable ", "unannotated: files known & below threshold", "unannotated: files unknown", ] def __init__(self, events: Events) -> None: self.line_nums_known_cc = events.mk_empty_cc() self.line_nums_unknown_cc = events.mk_empty_cc() self.non_identical_cc = events.mk_empty_cc() self.unreadable_cc = events.mk_empty_cc() self.below_threshold_cc = events.mk_empty_cc() self.files_unknown_cc = events.mk_empty_cc() def ccs(self) -> list[Cc]: return [ self.line_nums_known_cc, self.line_nums_unknown_cc, self.non_identical_cc, self.unreadable_cc, self.below_threshold_cc, self.files_unknown_cc, ] def mk_warning(msg: str) -> str: return f"""\ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ {msg}\ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ """ def warn_ofls_are_all_newer(ofls: list[str], cgout_filename: str) -> None: s = "".join([f"@ - {ofl}\n" for ofl in ofls]) msg = f"""\ @ Original source files are all newer than data file '{cgout_filename}': {s}@ Annotations may not be correct. """ print(mk_warning(msg)) def warn_bogus_lines(src_filename: str) -> None: msg = f"""\ @@ Information recorded about lines past the end of '{src_filename}'. """ print(mk_warning(msg), end="") def print_annotated_src_file( events: Events, dict_line_cc: DictLineCc, src_file: TextIO, annotated_ccs: AnnotatedCcs, summary_cc: Cc, ) -> None: printer = CcPrinter(events, summary_cc) printer.init_ccs(list(dict_line_cc.values())) # The starting fancy has already been printed by the caller. printer.print_events("") print() # The CC for line 0 is special, holding counts attributed to the source # file but not to any particular line (due to incomplete debug info). # Annotate the start of the file with this info, if present. line0_cc = dict_line_cc.pop(0, None) if line0_cc: suffix = "" printer.print_cc(line0_cc, None, suffix) add_cc_to_cc(line0_cc, annotated_ccs.line_nums_unknown_cc) print() # Find interesting line ranges: all lines with a CC, and all lines within # `args.context` lines of a line with a CC. line_nums = list(sorted(dict_line_cc.keys())) pairs: list[tuple[int, int]] = [] n = len(line_nums) i = 0 context = args.context while i < n: lo = max(line_nums[i] - context, 1) # `max` to prevent negatives while i < n - 1 and line_nums[i] + 2 * context >= line_nums[i + 1]: i += 1 hi = line_nums[i] + context pairs.append((lo, hi)) i += 1 def print_lines(pairs: list[tuple[int, int]]) -> None: line_num = 0 while pairs: src_line = "" (lo, hi) = pairs.pop(0) while line_num < lo - 1: src_line = src_file.readline() line_num += 1 if not src_line: return # EOF # Print line number, unless start of file. if lo != 1: print("-- line", lo, "-" * 40) while line_num < hi: src_line = src_file.readline() line_num += 1 if not src_line: return # EOF if line_nums and line_num == line_nums[0]: printer.print_cc(dict_line_cc[line_num], None, src_line[:-1]) add_cc_to_cc( dict_line_cc[line_num], annotated_ccs.line_nums_known_cc ) del line_nums[0] else: printer.print_missing_cc(src_line[:-1]) # Print line number. print("-- line", hi, "-" * 40) # Annotate chosen lines, tracking total annotated counts. if pairs: print_lines(pairs) # If there was info on lines past the end of the file, warn. if line_nums: print() for line_num in line_nums: printer.print_cc( dict_line_cc[line_num], None, f"" ) add_cc_to_cc(dict_line_cc[line_num], annotated_ccs.line_nums_known_cc) print() warn_bogus_lines(src_file.name) print() # This partially consumes `dict_mfl_dict_line_cc`, and fully consumes # `dict_mfl_olfs`. def print_annotated_src_files( ann_mfls: set[str], events: Events, dict_mfl_ofls: DictMflOfls, dict_mfl_dict_line_cc: DictMflDictLineCc, summary_cc: Cc, ) -> AnnotatedCcs: annotated_ccs = AnnotatedCcs(events) def add_dict_line_cc_to_cc(dict_line_cc: DictLineCc, accum_cc: Cc) -> None: for line_cc in dict_line_cc.values(): add_cc_to_cc(line_cc, accum_cc) # Exclude the unknown ("???") file, which is unannotatable. ann_mfls.discard("???") if "???" in dict_mfl_dict_line_cc: dict_line_cc = dict_mfl_dict_line_cc.pop("???") add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.files_unknown_cc) def print_ann_fancy(mfl: str) -> None: print_fancy(f"Annotated source file: {mfl}") # This can raise an `OSError`. def all_ofl_contents_identical(ofls: list[str]) -> bool: for i in range(len(ofls) - 1): if not filecmp.cmp(ofls[i], ofls[i + 1], shallow=False): return False return True for mfl in sorted(ann_mfls): ofls = sorted(dict_mfl_ofls.pop(mfl)) first_ofl = ofls[0] try: if all_ofl_contents_identical(ofls): # All the Ofls that map to this Mfl are identical, which means we # can annotate, and it doesn't matter which Ofl we use. with open(first_ofl, "r", encoding="utf-8") as src_file: dict_line_cc = dict_mfl_dict_line_cc.pop(mfl) print_ann_fancy(mfl) # Because all the Ofls are identical, we can treat their # mtimes as if they are all as early as the earliest one. # Therefore, we warn only if the earliest source file is # more recent than the cgout file. min_ofl_st_mtime_ns = min( [os.stat(ofl).st_mtime_ns for ofl in ofls] ) for cgout_filename in args.cgout_filename: if min_ofl_st_mtime_ns > os.stat(cgout_filename).st_mtime_ns: warn_ofls_are_all_newer(ofls, cgout_filename) print_annotated_src_file( events, dict_line_cc, src_file, annotated_ccs, summary_cc, ) else: dict_line_cc = dict_mfl_dict_line_cc.pop(mfl) add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.non_identical_cc) # We could potentially do better here. # - Annotate until the first line where the src files diverge. # - Also, heuristic resyncing, e.g. by looking for matching # lines (of sufficient complexity) after a divergence. print_ann_fancy(mfl) print( "Unannotated because two or more of these original files are not " "identical:", *ofls, sep="\n- ", ) print() except OSError: dict_line_cc = dict_mfl_dict_line_cc.pop(mfl) add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.unreadable_cc) print_ann_fancy(mfl) print( "Unannotated because one or more of these original files are " "unreadable:", *ofls, sep="\n- ", ) print() # Sum the CCs remaining in `dict_mfl_dict_line_cc`, which are all in files # below the threshold. for dict_line_cc in dict_mfl_dict_line_cc.values(): add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.below_threshold_cc) return annotated_ccs def print_annotation_summary( events: Events, annotated_ccs: AnnotatedCcs, summary_cc: Cc, ) -> None: # Show how many events were covered by annotated lines above. printer = CcPrinter(events, summary_cc) printer.init_ccs(annotated_ccs.ccs()) print_fancy("Annotation summary") printer.print_events("") print() total_cc = events.mk_empty_cc() for (cc, label) in zip(annotated_ccs.ccs(), AnnotatedCcs.labels): printer.print_cc(cc, None, label) add_cc_to_cc(cc, total_cc) print() # Internal sanity check. if summary_cc != total_cc: msg = ( "`summary:` line doesn't match computed annotated counts\n" f"- summary: {summary_cc}\n" f"- annotated: {total_cc}" ) die(msg) def main() -> None: # Metadata, initialized to empty states. descs: list[str] = [] cmds: list[str] = [] events = Events() # For tracking original filenames to modified filenames. dict_mfl_ofls: DictMflOfls = defaultdict(set) # Different places where we accumulate CC data. Initialized to invalid # states prior to the number of events being known. dict_mfl_dcc: DictMnameDcc = defaultdict(None) dict_mfn_dcc: DictMnameDcc = defaultdict(None) dict_mfl_dict_line_cc: DictMflDictLineCc = defaultdict(None) summary_cc: Cc = [] for n, filename in enumerate(args.cgout_filename): is_first_file = n == 0 read_cgout_file( filename, is_first_file, descs, cmds, events, dict_mfl_ofls, dict_mfl_dcc, dict_mfn_dcc, dict_mfl_dict_line_cc, summary_cc, ) # Each of the following calls prints a section of the output. print_metadata(descs, cmds, events) print_summary(events, summary_cc) ann_mfls = print_mname_summary( "File:function", "< ", events, dict_mfl_dcc, summary_cc ) print_mname_summary("Function:file", "> ", events, dict_mfn_dcc, summary_cc) if args.annotate: annotated_ccs = print_annotated_src_files( ann_mfls, events, dict_mfl_ofls, dict_mfl_dict_line_cc, summary_cc ) print_annotation_summary(events, annotated_ccs, summary_cc) if __name__ == "__main__": main()