Files
ext4recovery/scan_inodes.py
2026-04-30 11:04:05 +00:00

136 lines
4.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Stage 1 Scan ext4 inode tables and persist everything to a SQLite database.
This is the slow stage that reads the raw device sequentially. Run it once;
subsequent pipeline stages read from the database and never touch the device.
The scan is resumable: already-scanned groups are skipped on re-run.
Usage:
python3 scan_inodes.py [options]
Options:
--device DEV Block device or image file [/dev/dm-0]
--backup-sb BLOCK Block number of the backup superblock [32768]
--db PATH Output SQLite database [inodes.db]
--zeroed-groups N First N block groups are damaged/zeroed (skip scan,
mark parents in them as orphan triggers) [13]
--start-group N Start scanning from group N (override resume logic)
--end-group N Stop after group N (for partial scans / testing)
"""
import argparse, sys, time
import ext4lib
import ext4db
DEFAULT_DEV = '/dev/dm-0'
DEFAULT_BACKUP_SB = 32768
COMMIT_INTERVAL = 20 # commit every N groups
def scan_group(f, sb, gdt_data, grp, db):
inode_table_block = ext4lib.parse_gdt_entry(
gdt_data, grp * sb['desc_size'], sb['desc_size'])
if inode_table_block == 0:
ext4db.mark_group_scanned(db, grp)
return 0
inode_size = sb['inode_size']
inodes_per_block = ext4lib.BLOCK // inode_size
num_inode_blocks = (sb['inodes_per_group'] * inode_size + ext4lib.BLOCK - 1) // ext4lib.BLOCK
found = 0
for blk_off in range(num_inode_blocks):
try:
idata = ext4lib.read_at(
f, (inode_table_block + blk_off) * ext4lib.BLOCK, ext4lib.BLOCK)
except OSError:
continue
for slot_idx in range(inodes_per_block):
ino_off = slot_idx * inode_size
abs_inum = grp * sb['inodes_per_group'] + blk_off * inodes_per_block + slot_idx + 1
inode = ext4lib.parse_inode_full(idata, ino_off, sb)
if inode is None or inode['mode'] == 0:
continue
status = ext4lib.classify_inode(idata, ino_off)
ext4db.save_inode(db, abs_inum, grp, inode, status)
found += 1
# For directories with links, also read dir entries from disk
if inode['type'] == ext4lib.ITYPE_DIR and inode['links'] > 0:
try:
entries = ext4lib.read_dir_entries_raw(f, idata, ino_off)
for name, (child_inum, ftype) in entries.items():
ext4db.save_dir_entry(db, abs_inum, name, child_inum, ftype)
except Exception:
pass
ext4db.mark_group_scanned(db, grp)
return found
def main():
parser = argparse.ArgumentParser(description='Scan ext4 inodes into SQLite DB (Stage 1)')
parser.add_argument('--device', default=DEFAULT_DEV)
parser.add_argument('--backup-sb', type=int, default=DEFAULT_BACKUP_SB)
parser.add_argument('--db', default='inodes.db')
parser.add_argument('--zeroed-groups', type=int, default=13,
help='First N groups are damaged; skip scan, flag parents there as orphans')
parser.add_argument('--start-group', type=int, default=None,
help='Force start at this group (ignores resume state)')
parser.add_argument('--end-group', type=int, default=None)
args = parser.parse_args()
db = ext4db.open_db(args.db)
with open(args.device, 'rb') as f:
sb, gdt_data, num_groups = ext4lib.load_fs(f, args.backup_sb)
print(f"Geometry: {sb['blocks_per_group']} blk/grp, "
f"{sb['inodes_per_group']} ino/grp, "
f"inode_size={sb['inode_size']}, desc_size={sb['desc_size']}")
print(f"Total groups: {num_groups} | zeroed groups: 0{args.zeroed_groups - 1}")
ext4db.save_fs_meta(db, sb, args.device, args.backup_sb, args.zeroed_groups)
scanned = ext4db.get_scanned_groups(db)
end = args.end_group if args.end_group is not None else num_groups
if args.start_group is not None:
start = args.start_group
else:
start = args.zeroed_groups
total_found = 0
t0 = time.monotonic()
for grp in range(start, end):
if grp in scanned:
continue
found = scan_group(f, sb, gdt_data, grp, db)
total_found += found
if grp % COMMIT_INTERVAL == 0:
db.commit()
if grp % 100 == 0:
elapsed = time.monotonic() - t0
rate = (grp - start + 1) / elapsed if elapsed > 0 else 0
eta = (end - grp) / rate if rate > 0 else 0
print(f" group {grp:6d}/{end} inodes={total_found:,} "
f"{rate:.1f} grp/s ETA {eta:.0f}s",
end='\r', flush=True)
db.commit()
print(f"\nScan complete.")
ext4db.print_stats(db)
if __name__ == '__main__':
main()