From 17c496c3af636da6addf857c05676cf7e791937c Mon Sep 17 00:00:00 2001 From: Riff Date: Mon, 18 Dec 2023 08:56:52 -0800 Subject: [PATCH] Dump memory usage when listing arenas, and add summary option (#1024) --- docs/commands/heap.md | 8 ++++ gef.py | 93 ++++++++++++++++++++++++++++++++++-------- tests/commands/heap.py | 8 ++++ 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/docs/commands/heap.md b/docs/commands/heap.md index c6992358d..835312656 100644 --- a/docs/commands/heap.md +++ b/docs/commands/heap.md @@ -62,6 +62,14 @@ re-aligns the chunks data start addresses to match Glibc's behavior. To be able chunks as well, you can disable this with the `--allow-unaligned` flag. Note that this might result in incorrect output. +To get a higher level overview of the chunks you can use the `--summary` flag too. + +```text +gef➤ heap chunks --summary +``` + +![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png) + ### `heap chunk` command This command gives visual information of a Glibc malloc-ed chunked. Simply provide the address to diff --git a/gef.py b/gef.py index dd10a526e..382abcb23 100644 --- a/gef.py +++ b/gef.py @@ -1375,7 +1375,8 @@ def __eq__(self, other: "GlibcArena") -> bool: def __str__(self) -> str: properties = f"base={self.__address:#x}, top={self.top:#x}, " \ - f"last_remainder={self.last_remainder:#x}, next={self.next:#x}" + f"last_remainder={self.last_remainder:#x}, next={self.next:#x}, " \ + f"mem={self.system_mem}, mempeak={self.max_system_mem}" return (f"{Color.colorify('Arena', 'blue bold underline')}({properties})") def __repr__(self) -> str: @@ -6287,13 +6288,58 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None: return +class GlibcHeapChunkSummary: + def __init__(self): + self.count = 0 + self.total_bytes = 0 + + def process_chunk(self, chunk: GlibcChunk) -> None: + self.count += 1 + self.total_bytes += chunk.size + + +class GlibcHeapArenaSummary: + def __init__(self) -> None: + self.size_distribution = {} + self.flag_distribution = { + "PREV_INUSE": GlibcHeapChunkSummary(), + "IS_MMAPPED": GlibcHeapChunkSummary(), + "NON_MAIN_ARENA": GlibcHeapChunkSummary() + } + + def process_chunk(self, chunk: GlibcChunk) -> None: + per_size_summary = self.size_distribution.get(chunk.size, None) + if per_size_summary is None: + per_size_summary = GlibcHeapChunkSummary() + self.size_distribution[chunk.size] = per_size_summary + per_size_summary.process_chunk(chunk) + + if chunk.has_p_bit(): + self.flag_distribution["PREV_INUSE"].process_chunk(chunk) + if chunk.has_m_bit(): + self.flag_distribution["IS_MAPPED"].process_chunk(chunk) + if chunk.has_n_bit(): + self.flag_distribution["NON_MAIN_ARENA"].process_chunk(chunk) + + def print(self) -> None: + gef_print("== Chunk distribution by size ==") + gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes")) + for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True): + gef_print("{:<10d}\t{:<10d}\t{: None: self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)") return - @parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True}) + @parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, ("--summary", "-s"): True}) @only_if_gdb_running def do_invoke(self, _: List[str], **kwargs: Any) -> None: args = kwargs["arguments"] if args.all or not args.arena_address: for arena in gef.heap.arenas: - self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned) + self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, summary=args.summary) if not args.all: return try: arena_addr = parse_address(args.arena_address) arena = GlibcArena(f"*{arena_addr:#x}") - self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned) + self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, summary=args.summary) except gdb.error: err("Invalid arena") return - def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False) -> None: + def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, summary: bool = False) -> None: heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned) if heap_addr is None: err("Could not find heap for arena") @@ -6328,31 +6374,42 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_ gef_print(str(arena)) if arena.is_main_arena(): heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size - self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned) + self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, summary=summary) else: heap_info_structs = arena.get_heap_info_list() or [] for heap_info in heap_info_structs: - if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned): + if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, summary=summary): break return - def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False) -> bool: + def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, summary: bool = False) -> bool: nb = self["peek_nb_byte"] chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned) + heap_summary = GlibcHeapArenaSummary() for chunk in chunk_iterator: - if chunk.base_address == arena.top: - gef_print( - f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}") - break + heap_corrupted = chunk.base_address > end + + if not summary: + if chunk.base_address == arena.top: + gef_print( + f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}") + break - if chunk.base_address > end: + if heap_corrupted: err("Corrupted heap, cannot continue.") return False - line = str(chunk) - if nb: - line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]" - gef_print(line) + if summary: + heap_summary.process_chunk(chunk) + else: + line = str(chunk) + if nb: + line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]" + gef_print(line) + + if summary: + heap_summary.print() + return True diff --git a/tests/commands/heap.py b/tests/commands/heap.py index 983d4a0ea..0f4d4ac1f 100644 --- a/tests/commands/heap.py +++ b/tests/commands/heap.py @@ -94,6 +94,14 @@ def test_cmd_heap_chunks_mult_heaps(self): self.assertIn("Chunk(addr=", res) self.assertIn("top chunk", res) + def test_cmd_heap_chunks_summary(self): + cmd = "heap chunks --summary" + target = _target("heap") + self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target)) + res = gdb_run_silent_cmd(cmd, target=target) + self.assertNoException(res) + self.assertIn("== Chunk distribution by size", res) + self.assertIn("== Chunk distribution by flag", res) def test_cmd_heap_bins_fast(self): cmd = "heap bins fast"