Files
ext4recovery/misc_tools/scan_zeros.py
2026-04-30 07:51:39 -04:00

169 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Scan for zeroed regions on the raw disks (via RAID0 striping).
Scans the md0 virtual byte space in the vicinity of LV_START
(where the ext4 filesystem begins) and reports contiguous zero regions.
Reads at SCAN_BLOCK granularity, reports transitions zero→nonzero and vice versa.
Usage:
sudo python3 misc_tools/scan_zeros.py
sudo python3 misc_tools/scan_zeros.py --start-mb 0 --end-mb 50 # whole pre-LV area
sudo python3 misc_tools/scan_zeros.py --full-lv # scan entire ext4 FS
"""
import sys, os, argparse
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
DISKS = ['/dev/sda', '/dev/sde', '/dev/sdd', '/dev/sdc']
NUM_DISKS = 4
CHUNK_BYTES = 128 * 512 # 64 KB
DISK_DATA_START = 0
LV_START_BYTES = 5120000 * 512 # 2,621,440,000 (~2.44 GB into md0)
LV_SIZE_BYTES = 9365766144 * 512 # ~4.55 TB (full ext4 virtual size)
SCAN_BLOCK = 64 * 1024 # granularity: 64 KB (one stripe chunk)
DISK_FDS = []
def md0_read(md0_offset: int, length: int) -> bytes:
result = bytearray(length)
pos = md0_offset
remaining = length
while remaining > 0:
chunk_num = pos // CHUNK_BYTES
intra = pos % CHUNK_BYTES
disk_idx = chunk_num % NUM_DISKS
chunk_on_disk = chunk_num // NUM_DISKS
seg_len = min(CHUNK_BYTES - intra, remaining)
dst_off = pos - md0_offset
disk_byte = DISK_DATA_START + chunk_on_disk * CHUNK_BYTES + intra
fd = DISK_FDS[disk_idx]
fd.seek(disk_byte)
data = fd.read(seg_len)
result[dst_off:dst_off + len(data)] = data
pos += seg_len
remaining -= seg_len
return bytes(result)
def fmt_bytes(b):
if b >= 1024**3:
return f'{b/1024**3:.3f} GB'
if b >= 1024**2:
return f'{b/1024**2:.2f} MB'
return f'{b/1024:.1f} KB'
def fmt_md0(md0_off):
"""Show md0 offset both in absolute bytes and relative to LV_START."""
rel = md0_off - LV_START_BYTES
sign = '+' if rel >= 0 else '-'
return (f'md0:{md0_off} (LV_START{sign}{fmt_bytes(abs(rel))})')
def scan(start_md0: int, end_md0: int, label: str):
print(f'\nScanning {label}')
print(f' Range: {fmt_bytes(start_md0)} {fmt_bytes(end_md0)} in md0')
print(f' ({fmt_bytes(end_md0 - start_md0)} total, {SCAN_BLOCK//1024}KB blocks)')
print()
in_zero_run = None # start of current zero run, or None
zero_runs = []
last_report = start_md0
pos = start_md0
while pos < end_md0:
length = min(SCAN_BLOCK, end_md0 - pos)
try:
data = md0_read(pos, length)
except Exception as e:
print(f' READ ERROR at {fmt_md0(pos)}: {e}')
pos += length
continue
is_zero = not any(data)
if is_zero and in_zero_run is None:
in_zero_run = pos
elif not is_zero and in_zero_run is not None:
run_len = pos - in_zero_run
zero_runs.append((in_zero_run, pos))
print(f' ZERO {fmt_md0(in_zero_run)} len={fmt_bytes(run_len)}')
in_zero_run = None
# Progress every 64 MB
if pos - last_report >= 64 * 1024 * 1024:
pct = 100 * (pos - start_md0) / (end_md0 - start_md0)
print(f' ... {fmt_bytes(pos)} ({pct:.0f}%)', flush=True)
last_report = pos
pos += length
if in_zero_run is not None:
run_len = end_md0 - in_zero_run
zero_runs.append((in_zero_run, end_md0))
print(f' ZERO {fmt_md0(in_zero_run)} len={fmt_bytes(run_len)} (extends to scan end)')
print()
if zero_runs:
total_zero = sum(e - s for s, e in zero_runs)
print(f' Summary: {len(zero_runs)} zero region(s), {fmt_bytes(total_zero)} total')
else:
print(f' Summary: no zero regions found in this range')
return zero_runs
def main():
global DISK_DATA_START, DISK_FDS
parser = argparse.ArgumentParser()
parser.add_argument('--disk-offset', type=int, default=0)
parser.add_argument('--start-mb', type=float, default=None,
help='scan start as MD0 offset in MB (default: 64MB before LV_START)')
parser.add_argument('--end-mb', type=float, default=None,
help='scan end as MD0 offset in MB (default: LV_START + 64MB)')
parser.add_argument('--full-lv', action='store_true',
help='scan the entire ext4 LV (slow — ~4.5TB)')
parser.add_argument('--block-kb', type=int, default=64,
help='scan block size in KB (default 64)')
args = parser.parse_args()
DISK_DATA_START = args.disk_offset
global SCAN_BLOCK
SCAN_BLOCK = args.block_kb * 1024
print('Opening disks (read-only)...')
for path in DISKS:
fd = open(path, 'rb')
DISK_FDS.append(fd)
print(f' {path} OK')
if args.full_lv:
scan(LV_START_BYTES, LV_START_BYTES + LV_SIZE_BYTES, 'full ext4 LV')
else:
if args.start_mb is not None:
start = int(args.start_mb * 1024 * 1024)
else:
# Default: 64MB before LV_START to catch anything before the FS
start = max(0, LV_START_BYTES - 64 * 1024 * 1024)
if args.end_mb is not None:
end = int(args.end_mb * 1024 * 1024)
else:
# Default: 64MB into the ext4 FS (covers first ~16 block groups)
end = LV_START_BYTES + 64 * 1024 * 1024
scan(start, end, 'LV boundary region')
for fd in DISK_FDS:
fd.close()
if __name__ == '__main__':
main()