Files
ext4recovery/ext4lib.py
2026-04-30 11:04:05 +00:00

344 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
EXT4 low-level filesystem primitives.
All functions that need a file handle expect it open in 'rb' mode.
"""
import os, stat, struct, sys
BLOCK = 4096
FTYPE_REG = 1
FTYPE_DIR = 2
FTYPE_SYM = 7
ITYPE_REG = 0x8000
ITYPE_DIR = 0x4000
ITYPE_SYM = 0xA000
# ── block I/O ─────────────────────────────────────────────────────────────────
def read_at(f, offset, size):
f.seek(offset)
return f.read(size)
# ── superblock / GDT ──────────────────────────────────────────────────────────
def parse_superblock(data):
sb = {}
sb['inodes_count'] = struct.unpack_from('<I', data, 0)[0]
sb['blocks_count'] = struct.unpack_from('<I', data, 4)[0]
sb['blocks_per_group'] = struct.unpack_from('<I', data, 32)[0]
sb['inodes_per_group'] = struct.unpack_from('<I', data, 40)[0]
sb['inode_size'] = struct.unpack_from('<H', data, 88)[0]
sb['magic'] = struct.unpack_from('<H', data, 56)[0]
sb['feature_incompat'] = struct.unpack_from('<I', data, 96)[0]
sb['desc_size'] = struct.unpack_from('<H', data, 254)[0] or 32
return sb
def parse_gdt_entry(gdt_data, offset, desc_size):
"""Return inode table block number from a group descriptor entry."""
lo = struct.unpack_from('<I', gdt_data, offset + 8)[0]
if desc_size >= 64:
hi = struct.unpack_from('<I', gdt_data, offset + 40)[0]
return lo | (hi << 32)
return lo
def load_fs(f, backup_sb_block):
"""Read superblock and full GDT from backup location.
Returns (sb, gdt_data, num_groups).
Raises AssertionError if the magic number is wrong.
"""
sb_data = read_at(f, backup_sb_block * BLOCK, 1024)
sb = parse_superblock(sb_data)
assert sb['magic'] == 0xef53, f"Bad superblock magic: {sb['magic']:#x}"
num_groups = (sb['blocks_count'] + sb['blocks_per_group'] - 1) // sb['blocks_per_group']
gdt_data = read_at(f, (backup_sb_block + 1) * BLOCK, num_groups * sb['desc_size'])
return sb, gdt_data, num_groups
# ── extent tree ───────────────────────────────────────────────────────────────
def read_extent_tree_blocks(f, data, inode_offset):
"""Return sorted list of (logical_block, phys_block) pairs for an inode.
data the BLOCK-sized buffer that contains the inode
inode_offset byte offset of the inode within data
"""
base = inode_offset + 40
magic, entries, _, depth = struct.unpack_from('<HHHH', data, base)
if magic != 0xF30A:
return []
return _walk_extent_node(f, data, base, depth)
def _walk_extent_node(f, data, base, depth):
magic, entries, _, _ = struct.unpack_from('<HHHH', data, base)
if magic != 0xF30A:
return []
result = []
if depth == 0:
for i in range(entries):
o = base + 12 + i * 12
l_block = struct.unpack_from('<I', data, o )[0]
ee_len = struct.unpack_from('<H', data, o + 4)[0]
start_hi = struct.unpack_from('<H', data, o + 6)[0]
start_lo = struct.unpack_from('<I', data, o + 8)[0]
phys = (start_hi << 32) | start_lo
if phys > 0:
for b in range(ee_len & 0x7FFF):
result.append((l_block + b, phys + b))
else:
for i in range(entries):
o = base + 12 + i * 12
leaf_lo = struct.unpack_from('<I', data, o + 4)[0]
leaf_hi = struct.unpack_from('<H', data, o + 8)[0]
leaf_block = (leaf_hi << 32) | leaf_lo
try:
child_data = read_at(f, leaf_block * BLOCK, BLOCK)
result.extend(_walk_extent_node(f, child_data, 0, depth - 1))
except OSError:
pass
return result
# ── inode ─────────────────────────────────────────────────────────────────────
def read_inode(f, sb, gdt_data, inum):
"""Return (block_data, offset_within_block) for the given inode number."""
grp = (inum - 1) // sb['inodes_per_group']
local_idx = (inum - 1) % sb['inodes_per_group']
tbl_block = parse_gdt_entry(gdt_data, grp * sb['desc_size'], sb['desc_size'])
byte_off = local_idx * sb['inode_size']
blk_off = byte_off // BLOCK
slot = byte_off % BLOCK
data = read_at(f, (tbl_block + blk_off) * BLOCK, BLOCK)
return data, slot
def parse_inode_full(data, offset, sb):
"""Parse all fields from a raw inode buffer.
Returns a dict, or None if the buffer is too short.
"""
if len(data) - offset < 128:
return None
mode = struct.unpack_from('<H', data, offset + 0)[0]
uid_lo = struct.unpack_from('<H', data, offset + 2)[0]
size_lo = struct.unpack_from('<I', data, offset + 4)[0]
atime = struct.unpack_from('<I', data, offset + 8)[0]
ctime = struct.unpack_from('<I', data, offset + 12)[0]
mtime = struct.unpack_from('<I', data, offset + 16)[0]
dtime = struct.unpack_from('<I', data, offset + 20)[0]
gid_lo = struct.unpack_from('<H', data, offset + 24)[0]
links = struct.unpack_from('<H', data, offset + 26)[0]
flags = struct.unpack_from('<I', data, offset + 32)[0]
uid_hi, gid_hi = struct.unpack_from('<HH', data, offset + 120)
size_hi = 0
if sb.get('inode_size', 128) >= 256 and len(data) - offset >= 164:
size_hi = struct.unpack_from('<I', data, offset + 108)[0]
return {
'mode': mode,
'type': mode & 0xF000,
'uid': uid_lo | (uid_hi << 16),
'gid': gid_lo | (gid_hi << 16),
'size': size_lo | (size_hi << 32),
'atime': atime,
'ctime': ctime,
'mtime': mtime,
'dtime': dtime,
'links': links,
'flags': flags,
}
def classify_inode(idata, slot):
"""Return 'active', 'deleted', 'corrupt', or 'unallocated'."""
links = struct.unpack_from('<H', idata, slot + 26)[0]
dtime = struct.unpack_from('<I', idata, slot + 20)[0]
if dtime != 0 and links == 0:
return 'deleted'
if dtime != 0 and links > 0:
return 'corrupt'
if dtime == 0 and links == 0:
return 'unallocated'
return 'active'
def get_inode_meta(idata, slot, sb):
"""Return (permissions, uid, gid, atime, mtime) from a raw inode buffer."""
mode = struct.unpack_from('<H', idata, slot + 0)[0]
uid_lo = struct.unpack_from('<H', idata, slot + 2)[0]
gid_lo = struct.unpack_from('<H', idata, slot + 24)[0]
atime = struct.unpack_from('<I', idata, slot + 8)[0]
mtime = struct.unpack_from('<I', idata, slot + 16)[0]
uid_hi, gid_hi = struct.unpack_from('<HH', idata, slot + 120)
uid = uid_lo | (uid_hi << 16)
gid = gid_lo | (gid_hi << 16)
if sb.get('inode_size', 128) >= 256:
atime_extra = struct.unpack_from('<I', idata, slot + 132)[0]
mtime_extra = struct.unpack_from('<I', idata, slot + 140)[0]
atime |= (atime_extra & 0x3) << 32
mtime |= (mtime_extra & 0x3) << 32
return stat.S_IMODE(mode), uid, gid, atime, mtime
# ── directory entries ─────────────────────────────────────────────────────────
def read_dir_entries_raw(f, idata, inode_offset):
"""Read directory entries given raw inode data (already in memory).
Returns dict of name -> (child_inum, ftype).
"""
entries = {}
for _logical, phys in sorted(read_extent_tree_blocks(f, idata, inode_offset)):
try:
bdata = read_at(f, phys * BLOCK, BLOCK)
offset = 0
while offset < BLOCK - 8:
e_ino, rec_len, name_len, ftype = struct.unpack_from('<IHBB', bdata, offset)
if rec_len < 8 or offset + rec_len > BLOCK:
break
if e_ino != 0 and name_len > 0:
name = bdata[offset + 8:offset + 8 + name_len].decode('utf-8', errors='replace')
entries[name] = (e_ino, ftype)
offset += rec_len
except OSError:
pass
return entries
def read_dir_entries(f, sb, gdt_data, inum):
"""Read directory entries for inode inum. Returns dict of name -> (child_inum, ftype)."""
idata, slot = read_inode(f, sb, gdt_data, inum)
return read_dir_entries_raw(f, idata, slot)
# ── file extraction ───────────────────────────────────────────────────────────
def dump_file(f, sb, gdt_data, inum, dest_path):
"""Extract a regular file by inode number to dest_path. Returns True on success."""
try:
idata, slot = read_inode(f, sb, gdt_data, inum)
size_lo = struct.unpack_from('<I', idata, slot + 4)[0]
size_hi = struct.unpack_from('<I', idata, slot + 108)[0]
size = size_lo | (size_hi << 32)
flags = struct.unpack_from('<I', idata, slot + 32)[0]
if flags & 0x10000000:
inline = idata[slot + 40:slot + 40 + size]
with open(dest_path, 'wb') as out:
out.write(inline)
return True
blocks = sorted(read_extent_tree_blocks(f, idata, slot))
written = 0
with open(dest_path, 'wb') as out:
for logical, phys in blocks:
hole = logical * BLOCK
if hole > written:
out.seek(hole)
written = hole
remaining = size - written
if remaining <= 0:
break
chunk = read_at(f, phys * BLOCK, BLOCK)
out.write(chunk[:min(BLOCK, remaining)])
written += min(BLOCK, remaining)
out.truncate(size)
return True
except OSError:
return False
def dump_symlink(f, sb, gdt_data, inum, dest_path):
"""Create a symlink at dest_path from the symlink inode. Returns True on success."""
try:
idata, slot = read_inode(f, sb, gdt_data, inum)
size = struct.unpack_from('<I', idata, slot + 4)[0]
if size <= 60:
target = idata[slot + 40:slot + 40 + size].decode('utf-8', errors='replace')
else:
extents = read_extent_tree_blocks(f, idata, slot)
if not extents:
return False
bdata = read_at(f, extents[0][1] * BLOCK, BLOCK)
target = bdata[:size].decode('utf-8', errors='replace')
target = target.split('\x00')[0].strip()
if not target or any(ord(c) < 32 for c in target):
print(f" WARN invalid symlink target for {dest_path!r}: {target!r}", file=sys.stderr)
return False
if os.path.lexists(dest_path):
return True
os.symlink(target, dest_path)
return True
except (OSError, IndexError) as e:
print(f" WARN symlink {dest_path}: {e}", file=sys.stderr)
return False
def dump_tree(f, sb, gdt_data, inum, dest_dir, db=None, depth=0, visited=None):
"""Recursively extract a directory tree.
If db is provided (an ext4db connection), directory entries are read from
the database instead of from disk — much faster for subsequent runs.
"""
if visited is None:
visited = set()
if inum in visited:
return
visited.add(inum)
if db is not None:
import ext4db
entries = ext4db.get_dir_entries(db, inum)
else:
try:
entries = read_dir_entries(f, sb, gdt_data, inum)
except Exception:
return
os.makedirs(dest_dir, exist_ok=True)
for name, (child_inum, ftype) in entries.items():
if name in ('.', '..'):
continue
safe_name = name.replace('/', '_').replace('\x00', '')
dest = os.path.join(dest_dir, safe_name)
try:
if ftype == 0:
idata, slot = read_inode(f, sb, gdt_data, child_inum)
itype = struct.unpack_from('<H', idata, slot)[0] & 0xF000
if itype == ITYPE_DIR: ftype = FTYPE_DIR
elif itype == ITYPE_REG: ftype = FTYPE_REG
elif itype == ITYPE_SYM: ftype = FTYPE_SYM
if ftype == FTYPE_DIR:
dump_tree(f, sb, gdt_data, child_inum, dest, db=db,
depth=depth + 1, visited=visited)
elif ftype == FTYPE_REG:
dump_file(f, sb, gdt_data, child_inum, dest)
elif ftype == FTYPE_SYM:
dump_symlink(f, sb, gdt_data, child_inum, dest)
except Exception as e:
print(f" WARN {dest}: {e}", file=sys.stderr)