#!/usr/bin/python3 # pyright: strict # -------------------------------------------------------------------- # --- Cachegrind's merger. cg_merge.in --- # -------------------------------------------------------------------- # This file is part of Cachegrind, a Valgrind tool for cache # profiling programs. # # Copyright (C) 2002-2023 Nicholas Nethercote # njn@valgrind.org # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . # # The GNU General Public License is contained in the file COPYING. # This script merges Cachegrind output files. # # Use `make pymerge` to "build" this script every time it is changed. This runs # the formatters, type-checkers, and linters on `cg_merge.in` and then # generates `cg_merge`. # # This is a cut-down version of `cg_annotate.in`. from __future__ import annotations import re import sys from argparse import ArgumentParser, Namespace from collections import defaultdict from typing import DefaultDict, NoReturn, TextIO # A typed wrapper for parsed args. class Args(Namespace): # None of these fields are modified after arg parsing finishes. output: str cgout_filename: list[str] @staticmethod def parse() -> Args: desc = ( "Merge multiple Cachegrind output files. Deprecated; use " "`cg_annotate` with multiple Cachegrind output files instead." ) p = ArgumentParser(description=desc) p.add_argument("--version", action="version", version="%(prog)s-3.21.0") p.add_argument( "-o", dest="output", type=str, metavar="FILE", help="output file (default: stdout)", ) p.add_argument( "cgout_filename", nargs="+", metavar="cachegrind-out-file", help="file produced by Cachegrind", ) return p.parse_args(namespace=Args()) # Args are stored in a global for easy access. args = Args.parse() # A single instance of this class is constructed, from `args` and the `events:` # line in the cgout file. class Events: # The event names. events: list[str] def __init__(self, text: str) -> None: self.events = text.split() self.num_events = len(self.events) # Raises a `ValueError` exception on syntax error. def mk_cc(self, str_counts: list[str]) -> Cc: # This is slightly faster than a list comprehension. counts = list(map(int, str_counts)) if len(counts) == self.num_events: pass elif len(counts) < self.num_events: # Add zeroes at the end for any missing numbers. counts.extend([0] * (self.num_events - len(counts))) else: raise ValueError return counts def mk_empty_cc(self) -> Cc: # This is much faster than a list comprehension. return [0] * self.num_events # A "cost centre", which is a dumb container for counts. Always the same length # as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and # `Events.mk_empty_cc` are used for construction. # # This used to be a class with a single field `counts: list[int]`, but this # type is very hot and just using a type alias is much faster. Cc = list[int] # Add the counts in `a_cc` to `b_cc`. def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None: for i, a_count in enumerate(a_cc): b_cc[i] += a_count # Per-line CCs, organised by filename, function name, and line number. DictLineCc = DefaultDict[int, Cc] DictFnDictLineCc = DefaultDict[str, DictLineCc] DictFlDictFnDictLineCc = DefaultDict[str, DictFnDictLineCc] def die(msg: str) -> NoReturn: print("cg_merge: error:", msg, file=sys.stderr) sys.exit(1) def read_cgout_file( cgout_filename: str, is_first_file: bool, cumul_dict_fl_dict_fn_dict_line_cc: DictFlDictFnDictLineCc, cumul_summary_cc: Cc, ) -> tuple[list[str], str, Events]: # The file format is described in Cachegrind's manual. try: cgout_file = open(cgout_filename, "r", encoding="utf-8") except OSError as err: die(f"{err}") with cgout_file: cgout_line_num = 0 def parse_die(msg: str) -> NoReturn: die(f"{cgout_file.name}:{cgout_line_num}: {msg}") def readline() -> str: nonlocal cgout_line_num cgout_line_num += 1 return cgout_file.readline() # Read "desc:" lines. desc: list[str] = [] while line := readline(): if m := re.match(r"desc:\s+(.*)", line): desc.append(m.group(1)) else: break # Read "cmd:" line. (`line` is already set from the "desc:" loop.) if m := re.match(r"cmd:\s+(.*)", line): cmd = m.group(1) else: parse_die("missing a `command:` line") # Read "events:" line. line = readline() if m := re.match(r"events:\s+(.*)", line): events = Events(m.group(1)) else: parse_die("missing an `events:` line") def mk_empty_dict_line_cc() -> DictLineCc: return defaultdict(events.mk_empty_cc) def mk_empty_dict_fn_dict_line_cc() -> DictFnDictLineCc: return defaultdict(mk_empty_dict_line_cc) summary_cc_present = False fl = "" fn = "" # The `cumul_*` values are passed in by reference and are modified by # this function. But they can't be properly initialized until the # `events:` line of the first file is read and the number of events is # known. So we initialize them in an invalid state, and then # reinitialize them properly here, before their first use. if is_first_file: cumul_dict_fl_dict_fn_dict_line_cc.default_factory = ( mk_empty_dict_fn_dict_line_cc ) cumul_summary_cc.extend(events.mk_empty_cc()) # Line matching is done in order of pattern frequency, for speed. while line := readline(): if line[0].isdigit(): split_line = line.split() try: line_num = int(split_line[0]) cc = events.mk_cc(split_line[1:]) except ValueError: parse_die("malformed or too many event counts") # Record this CC at the file/func/line level. add_cc_to_cc(cc, cumul_dict_fl_dict_fn_dict_line_cc[fl][fn][line_num]) elif line.startswith("fn="): fn = line[3:-1] elif line.startswith("fl="): fl = line[3:-1] # A `fn=` line should follow, overwriting the "???". fn = "???" elif m := re.match(r"summary:\s+(.*)", line): summary_cc_present = True try: add_cc_to_cc(events.mk_cc(m.group(1).split()), cumul_summary_cc) except ValueError: parse_die("malformed or too many event counts") elif line == "\n" or line.startswith("#"): # Skip empty lines and comment lines. pass else: parse_die(f"malformed line: {line[:-1]}") # Check if summary line was present. if not summary_cc_present: parse_die("missing `summary:` line, aborting") # In `cg_annotate.in` and `cg_diff.in` we check that the file's summary CC # matches the totals of the file's individual CCs, but not here. That's # because in this script we don't collect the file's CCs in isolation, # instead we just add them to the accumulated CCs, for speed. This makes it # difficult to do the per-file checking. return (desc, cmd, events) def main() -> None: desc1: list[str] | None = None cmd1 = None events1 = None # Different places where we accumulate CC data. Initialized to invalid # states prior to the number of events being known. cumul_dict_fl_dict_fn_dict_line_cc: DictFlDictFnDictLineCc = defaultdict(None) cumul_summary_cc: Cc = [] for n, filename in enumerate(args.cgout_filename): is_first_file = n == 0 (desc_n, cmd_n, events_n) = read_cgout_file( filename, is_first_file, cumul_dict_fl_dict_fn_dict_line_cc, cumul_summary_cc, ) # We reuse the description and command from the first file, like the # the old C version of `cg_merge`. if is_first_file: desc1 = desc_n cmd1 = cmd_n events1 = events_n else: assert events1 if events1.events != events_n.events: die("events in data files don't match") def write_output(f: TextIO) -> None: # These assertions hold because the loop above executes at least twice. assert desc1 assert events1 assert cumul_dict_fl_dict_fn_dict_line_cc is not None assert cumul_summary_cc for desc_line in desc1: print("desc:", desc_line, file=f) print("cmd:", cmd1, file=f) print("events:", *events1.events, sep=" ", file=f) for fl, dict_fn_dict_line_cc in cumul_dict_fl_dict_fn_dict_line_cc.items(): print(f"fl={fl}", file=f) for fn, dict_line_cc in dict_fn_dict_line_cc.items(): print(f"fn={fn}", file=f) for line, cc in dict_line_cc.items(): print(line, *cc, file=f) print("summary:", *cumul_summary_cc, sep=" ", file=f) if args.output: try: with open(args.output, "w", encoding="utf-8") as f: write_output(f) except OSError as err: die(f"{err}") else: write_output(sys.stdout) if __name__ == "__main__": main()