Files
ext4recovery/ext4db.py
2026-04-30 11:04:05 +00:00

216 lines
7.5 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
SQLite persistence layer for the ext4 recovery pipeline.
Schema:
filesystem_meta superblock geometry + scan parameters
inodes per-inode metadata (mode, timestamps, status, …)
dir_entries directory name → child inode mappings
scanned_groups which block groups have been fully scanned (for resume)
"""
import sqlite3
from datetime import datetime, timezone
_SCHEMA = """
CREATE TABLE IF NOT EXISTS filesystem_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS inodes (
inum INTEGER PRIMARY KEY,
grp INTEGER NOT NULL,
mode INTEGER,
itype INTEGER,
uid INTEGER,
gid INTEGER,
size INTEGER,
atime INTEGER,
ctime INTEGER,
mtime INTEGER,
dtime INTEGER,
links INTEGER,
flags INTEGER,
status TEXT
);
CREATE TABLE IF NOT EXISTS dir_entries (
parent_inum INTEGER NOT NULL,
name TEXT NOT NULL,
child_inum INTEGER NOT NULL,
ftype INTEGER,
PRIMARY KEY (parent_inum, name)
);
CREATE TABLE IF NOT EXISTS scanned_groups (
grp INTEGER PRIMARY KEY,
ts TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_inodes_itype ON inodes(itype);
CREATE INDEX IF NOT EXISTS idx_inodes_status ON inodes(status);
CREATE INDEX IF NOT EXISTS idx_de_parent ON dir_entries(parent_inum);
CREATE INDEX IF NOT EXISTS idx_de_child ON dir_entries(child_inum);
"""
def open_db(path):
"""Open (or create) the recovery database. Returns a sqlite3.Connection."""
db = sqlite3.connect(path)
db.row_factory = sqlite3.Row
db.executescript(_SCHEMA)
db.commit()
return db
# ── filesystem metadata ───────────────────────────────────────────────────────
def save_fs_meta(db, sb, device, backup_sb_block, zeroed_groups=0):
meta = {
'device': device,
'backup_sb_block': backup_sb_block,
'zeroed_groups': zeroed_groups,
'inodes_count': sb['inodes_count'],
'blocks_count': sb['blocks_count'],
'blocks_per_group': sb['blocks_per_group'],
'inodes_per_group': sb['inodes_per_group'],
'inode_size': sb['inode_size'],
'desc_size': sb['desc_size'],
}
db.executemany(
"INSERT OR REPLACE INTO filesystem_meta VALUES (?, ?)",
((k, str(v)) for k, v in meta.items()),
)
db.commit()
def get_fs_meta(db):
"""Return filesystem_meta as a plain dict (all values are strings)."""
rows = db.execute("SELECT key, value FROM filesystem_meta").fetchall()
return {r['key']: r['value'] for r in rows}
def get_fs_meta_int(db, key, default=0):
row = db.execute("SELECT value FROM filesystem_meta WHERE key=?", (key,)).fetchone()
return int(row['value']) if row else default
# ── inode table ───────────────────────────────────────────────────────────────
def _i64(v):
"""Convert an unsigned Python int to SQLite-safe signed 64-bit integer.
Corrupted inodes can produce 64-bit values (e.g. size = size_lo | size_hi<<32)
that exceed SQLite's signed INTEGER max (2^63-1) and cause OverflowError.
"""
v = int(v) & 0xFFFFFFFFFFFFFFFF
return v - (1 << 64) if v >= (1 << 63) else v
def save_inode(db, inum, grp, inode, status):
db.execute(
"""INSERT OR REPLACE INTO inodes
(inum, grp, mode, itype, uid, gid, size, atime, ctime, mtime, dtime, links, flags, status)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
inum, grp,
_i64(inode.get('mode', 0)),
_i64(inode.get('type', 0)),
_i64(inode.get('uid', 0)),
_i64(inode.get('gid', 0)),
_i64(inode.get('size', 0)),
_i64(inode.get('atime', 0)),
_i64(inode.get('ctime', 0)),
_i64(inode.get('mtime', 0)),
_i64(inode.get('dtime', 0)),
_i64(inode.get('links', 0)),
_i64(inode.get('flags', 0)),
status,
),
)
def get_inode(db, inum):
"""Return the inode row or None."""
return db.execute("SELECT * FROM inodes WHERE inum=?", (inum,)).fetchone()
def get_all_dir_inums(db, include_deleted=False):
"""Return list of inode numbers for all directory inodes."""
ITYPE_DIR = 0x4000
if include_deleted:
rows = db.execute(
"SELECT inum FROM inodes WHERE itype=? AND status != 'unallocated'",
(ITYPE_DIR,),
).fetchall()
else:
rows = db.execute(
"SELECT inum FROM inodes WHERE itype=? AND status='active'",
(ITYPE_DIR,),
).fetchall()
return [r['inum'] for r in rows]
# ── directory entries ─────────────────────────────────────────────────────────
def save_dir_entry(db, parent_inum, name, child_inum, ftype):
db.execute(
"INSERT OR REPLACE INTO dir_entries (parent_inum, name, child_inum, ftype) VALUES (?,?,?,?)",
(parent_inum, name, child_inum, ftype),
)
def get_dir_entries(db, parent_inum):
"""Return dict of name -> (child_inum, ftype) for a directory."""
rows = db.execute(
"SELECT name, child_inum, ftype FROM dir_entries WHERE parent_inum=?",
(parent_inum,),
).fetchall()
return {r['name']: (r['child_inum'], r['ftype']) for r in rows}
def get_dotdot(db, inum):
"""Return the parent inode number recorded in the .. entry, or None."""
row = db.execute(
"SELECT child_inum FROM dir_entries WHERE parent_inum=? AND name='..'",
(inum,),
).fetchone()
return row['child_inum'] if row else None
def get_dot(db, inum):
"""Return the inode number recorded in the . entry, or None."""
row = db.execute(
"SELECT child_inum FROM dir_entries WHERE parent_inum=? AND name='.'",
(inum,),
).fetchone()
return row['child_inum'] if row else None
# ── scan progress ─────────────────────────────────────────────────────────────
def mark_group_scanned(db, grp):
db.execute(
"INSERT OR REPLACE INTO scanned_groups VALUES (?, ?)",
(grp, datetime.now(timezone.utc).isoformat()),
)
def get_scanned_groups(db):
"""Return set of already-scanned group numbers."""
rows = db.execute("SELECT grp FROM scanned_groups").fetchall()
return {r['grp'] for r in rows}
# ── summary stats ─────────────────────────────────────────────────────────────
def print_stats(db):
total = db.execute("SELECT COUNT(*) FROM inodes").fetchone()[0]
active = db.execute("SELECT COUNT(*) FROM inodes WHERE status='active'").fetchone()[0]
dirs = db.execute("SELECT COUNT(*) FROM inodes WHERE itype=0x4000").fetchone()[0]
scanned = db.execute("SELECT COUNT(*) FROM scanned_groups").fetchone()[0]
dentries = db.execute("SELECT COUNT(*) FROM dir_entries").fetchone()[0]
print(f" inodes scanned : {total:>10,} (active={active:,}, dirs={dirs:,})")
print(f" dir entries : {dentries:>10,}")
print(f" groups scanned : {scanned:>10,}")