Files
ext4recovery/extract_tree.py
2026-04-30 11:04:05 +00:00

211 lines
7.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Stage 3 Extract directory trees from the device to a destination path.
Reads the inode→file mapping from the SQLite database (Stage 1 output) so
directory traversal never needs to re-read the inode tables from disk.
Only actual file data blocks are read from the device.
After extraction, optionally restores permissions/ownership/timestamps from
the inode metadata stored in the database (or read live from disk).
Usage:
python3 extract_tree.py [options] <inode> [<inode> ...]
# Extract all roots from Stage 2 output file
python3 extract_tree.py --from-file orphan_roots.txt [options]
Options:
--device DEV Block device [/dev/dm-0]
--backup-sb BLOCK Backup superblock block number [32768]
--db PATH SQLite database [inodes.db]
--dest DIR Destination base directory [/mnt/recovered]
--restore-meta Apply uid/gid/mode/timestamps after extraction
--skip-existing Skip an inode if its destination already exists
--active-only Skip inodes whose DB status is not 'active' [default]
--include-deleted Also extract deleted/corrupt inodes
--from-file FILE Read inode list from file (one inode per line,
extra columns ignored)
"""
import argparse, os, stat, sys
import ext4lib
import ext4db
DEFAULT_DEV = '/dev/dm-0'
DEFAULT_BACKUP_SB = 32768
DEFAULT_DEST = '/mnt/recovered'
# ── metadata restore ──────────────────────────────────────────────────────────
import ctypes, ctypes.util
_libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
class _Timeval(ctypes.Structure):
_fields_ = [('tv_sec', ctypes.c_long), ('tv_usec', ctypes.c_long)]
def _lutimes(path, atime, mtime):
times = (_Timeval * 2)((_Timeval(atime, 0)), (_Timeval(mtime, 0)))
_libc.lutimes(path.encode(), ctypes.byref(times))
def restore_meta_from_disk(f, sb, gdt_data, inum, dest_path):
try:
idata, slot = ext4lib.read_inode(f, sb, gdt_data, inum)
perms, uid, gid, atime, mtime = ext4lib.get_inode_meta(idata, slot, sb)
_apply_meta(dest_path, perms, uid, gid, atime, mtime)
except Exception as e:
print(f" WARN meta {dest_path}: {e}", file=sys.stderr)
def restore_meta_from_db(db, inum, dest_path):
row = ext4db.get_inode(db, inum)
if row is None:
return
try:
perms = stat.S_IMODE(row['mode'])
_apply_meta(dest_path, perms, row['uid'], row['gid'], row['atime'], row['mtime'])
except Exception as e:
print(f" WARN meta {dest_path}: {e}", file=sys.stderr)
def _apply_meta(dest_path, perms, uid, gid, atime, mtime):
is_link = os.path.islink(dest_path)
try:
os.lchown(dest_path, uid, gid)
except OSError:
pass
if not is_link:
try:
os.chmod(dest_path, perms)
except OSError:
pass
try:
_lutimes(dest_path, atime, mtime)
except Exception:
pass
# ── tree walker ───────────────────────────────────────────────────────────────
def extract_tree(f, sb, gdt_data, db, inum, dest_dir,
restore_meta=False, include_deleted=False,
visited=None):
if visited is None:
visited = set()
if inum in visited:
return
visited.add(inum)
entries = ext4db.get_dir_entries(db, inum)
if not entries:
# Fallback: read from disk (dir entries not in DB)
try:
entries = ext4lib.read_dir_entries(f, sb, gdt_data, inum)
except Exception:
return
os.makedirs(dest_dir, exist_ok=True)
for name, (child_inum, ftype) in entries.items():
if name in ('.', '..'):
continue
# Status check
if not include_deleted:
row = ext4db.get_inode(db, child_inum)
if row and row['status'] not in ('active', None):
continue
safe_name = name.replace('/', '_').replace('\x00', '')
dest = os.path.join(dest_dir, safe_name)
try:
# Derive ftype from DB itype if not set in dir entry
if ftype == 0:
row = ext4db.get_inode(db, child_inum)
if row:
itype = row['itype']
if itype == ext4lib.ITYPE_DIR: ftype = ext4lib.FTYPE_DIR
elif itype == ext4lib.ITYPE_REG: ftype = ext4lib.FTYPE_REG
elif itype == ext4lib.ITYPE_SYM: ftype = ext4lib.FTYPE_SYM
if ftype == ext4lib.FTYPE_DIR:
extract_tree(f, sb, gdt_data, db, child_inum, dest,
restore_meta=restore_meta,
include_deleted=include_deleted,
visited=visited)
elif ftype == ext4lib.FTYPE_REG:
ext4lib.dump_file(f, sb, gdt_data, child_inum, dest)
elif ftype == ext4lib.FTYPE_SYM:
ext4lib.dump_symlink(f, sb, gdt_data, child_inum, dest)
if restore_meta and os.path.lexists(dest):
restore_meta_from_db(db, child_inum, dest)
except Exception as e:
print(f" WARN {dest}: {e}", file=sys.stderr)
# Restore directory metadata last (writing children updates parent mtime)
if restore_meta:
restore_meta_from_db(db, inum, dest_dir)
# ── main ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description='Extract orphaned ext4 trees (Stage 3)')
parser.add_argument('inodes', nargs='*', type=int)
parser.add_argument('--device', default=DEFAULT_DEV)
parser.add_argument('--backup-sb', type=int, default=DEFAULT_BACKUP_SB)
parser.add_argument('--db', default='inodes.db')
parser.add_argument('--dest', default=DEFAULT_DEST)
parser.add_argument('--restore-meta', action='store_true')
parser.add_argument('--skip-existing', action='store_true')
parser.add_argument('--include-deleted', action='store_true')
parser.add_argument('--from-file', metavar='FILE',
help='Read inode list from file (first column = inode number)')
args = parser.parse_args()
inodes = list(args.inodes)
if args.from_file:
with open(args.from_file) as fh:
for line in fh:
line = line.strip()
if line and not line.startswith('#'):
inodes.append(int(line.split()[0]))
if not inodes:
parser.error('Provide at least one inode or use --from-file')
db = ext4db.open_db(args.db)
with open(args.device, 'rb') as f:
sb, gdt_data, _ = ext4lib.load_fs(f, args.backup_sb)
for inum in inodes:
dest = os.path.join(args.dest, str(inum))
if args.skip_existing and os.path.isdir(dest) and os.listdir(dest):
print(f"Skipping {inum}{dest} (already exists)")
continue
# Status filter
if not args.include_deleted:
row = ext4db.get_inode(db, inum)
if row and row['status'] not in ('active', None):
print(f"Skipping {inum} (status={row['status']})")
continue
print(f"Extracting inode {inum}{dest}")
extract_tree(f, sb, gdt_data, db, inum, dest,
restore_meta=args.restore_meta,
include_deleted=args.include_deleted)
print(f" done inode {inum}")
if __name__ == '__main__':
main()