Files
xb-seed-catalog/xb_seed_status.py
constantprojects 16eb35f095 Fix seeding field overwrite bug and hardcode NocoDB field names
NocoDB returns records keyed by field titles, but the code was looking
up values by field IDs, always getting None. This caused every update
to overwrite existing seeders instead of appending.

Hardcode "Id" and "Seeding Users" field names as class constants and
remove the --id-field and --seeding-field CLI args.
2026-02-17 15:21:57 -07:00

505 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Seed Tracker - Match torrent files against qBittorrent session.
Compares {id}.torrent files against qBittorrent's BT_backup folder to determine
which torrents are actively seeding, then updates a NocoDB database with the
seeding user's name. Used for tracking who is seeding shared backup torrents.
Matching is done by:
1. Info hash (exact match)
2. Name + size fallback (for re-created torrents with different hashes)
Requirements:
- Python 3.10+
- No external dependencies (uses only stdlib)
Usage:
python seed_tracker.py --id-folder ./torrents --bt-backup /path/to/BT_backup \\
--nocodb-url https://noco.example.com --table-id tblXXX --api-token xc-xxx
To find NocoDB IDs:
- Table ID: Click ... next to table name → Copy Table ID
"""
import argparse
import csv
import hashlib
import json
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
# --- Pure Python Bencode Decoder ---
def decode_bencode(data: bytes):
"""Decode bencoded data. Returns (decoded_value, remaining_bytes)."""
def decode_next(data: bytes, index: int):
if index >= len(data):
raise ValueError("Unexpected end of data")
char = chr(data[index])
# Integer: i<number>e
if char == 'i':
end = data.index(b'e', index)
return int(data[index + 1:end]), end + 1
# List: l<items>e
elif char == 'l':
result = []
index += 1
while chr(data[index]) != 'e':
item, index = decode_next(data, index)
result.append(item)
return result, index + 1
# Dictionary: d<key><value>...e
elif char == 'd':
result = {}
index += 1
while chr(data[index]) != 'e':
key, index = decode_next(data, index)
if isinstance(key, bytes):
key = key.decode('utf-8', errors='replace')
value, index = decode_next(data, index)
result[key] = value
return result, index + 1
# String: <length>:<content>
elif char.isdigit():
colon = data.index(b':', index)
length = int(data[index:colon])
start = colon + 1
return data[start:start + length], start + length
else:
raise ValueError(f"Invalid bencode at position {index}: {char}")
result, _ = decode_next(data, 0)
return result
def encode_bencode(data) -> bytes:
"""Encode data to bencode format."""
if isinstance(data, int):
return f"i{data}e".encode()
elif isinstance(data, bytes):
return f"{len(data)}:".encode() + data
elif isinstance(data, str):
encoded = data.encode('utf-8')
return f"{len(encoded)}:".encode() + encoded
elif isinstance(data, list):
return b'l' + b''.join(encode_bencode(item) for item in data) + b'e'
elif isinstance(data, dict):
result = b'd'
for key in sorted(data.keys()):
key_bytes = key.encode('utf-8') if isinstance(key, str) else key
result += encode_bencode(key_bytes)
result += encode_bencode(data[key])
result += b'e'
return result
else:
raise TypeError(f"Cannot encode type: {type(data)}")
# --- Core Functions ---
def get_torrent_info(torrent_path: Path) -> tuple[str | None, str | None, int | None]:
"""Extract info_hash, name, and total size from a .torrent file."""
try:
with open(torrent_path, 'rb') as f:
data = f.read()
decoded = decode_bencode(data)
if 'info' not in decoded:
return None, None, None
info = decoded['info']
# Get name
name = info.get('name')
if isinstance(name, bytes):
name = name.decode('utf-8', errors='replace')
# Get total size
if 'length' in info:
# Single file torrent
total_size = info['length']
elif 'files' in info:
# Multi-file torrent
total_size = sum(f['length'] for f in info['files'])
else:
total_size = None
# Re-encode the info dict and hash it
info_bencoded = encode_bencode(info)
info_hash = hashlib.sha1(info_bencoded).hexdigest().upper()
return info_hash, name, total_size
except Exception as e:
print(f" Warning: Could not read {torrent_path.name}: {e}", file=sys.stderr)
return None, None, None
def get_bt_backup_data(bt_backup_path: Path) -> tuple[set[str], dict[tuple[str, int], str]]:
"""Get info hashes and name+size lookup from qBittorrent's BT_backup folder.
Returns:
- Set of info hashes (for fast hash matching)
- Dict of (name, size) -> hash (for fallback matching)
"""
hashes = set()
name_size_lookup = {}
if not bt_backup_path.exists():
print(f"Warning: BT_backup path does not exist: {bt_backup_path}", file=sys.stderr)
return hashes, name_size_lookup
# First pass: collect all hashes from filenames (fast)
for file in bt_backup_path.iterdir():
if file.suffix in ('.torrent', '.fastresume'):
hashes.add(file.stem.upper())
# Second pass: parse .torrent files for name+size fallback
for file in bt_backup_path.iterdir():
if file.suffix == '.torrent':
try:
with open(file, 'rb') as f:
data = f.read()
decoded = decode_bencode(data)
if 'info' not in decoded:
continue
info = decoded['info']
# Get name
name = info.get('name')
if isinstance(name, bytes):
name = name.decode('utf-8', errors='replace')
# Get total size
if 'length' in info:
total_size = info['length']
elif 'files' in info:
total_size = sum(f['length'] for f in info['files'])
else:
continue
if name and total_size:
name_size_lookup[(name, total_size)] = file.stem.upper()
except Exception:
continue
return hashes, name_size_lookup
# --- NocoDB API Functions ---
class NocoDBClient:
"""Simple NocoDB API client."""
ID_FIELD = "Id"
SEEDING_FIELD = "Seeding Users"
def __init__(self, base_url: str, table_id: str, api_token: str,
debug: bool = False):
self.base_url = base_url.rstrip('/')
self.table_id = table_id
self.api_token = api_token
self.debug = debug
self.endpoint = f"{self.base_url}/api/v2/tables/{table_id}/records"
def _request(self, method: str, url: str | None = None, data: dict | None = None, params: dict | None = None) -> dict:
"""Make an API request."""
if url is None:
url = self.endpoint
if params:
query = "&".join(f"{k}={urllib.request.quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {
"xc-token": self.api_token,
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json",
}
body = json.dumps(data).encode('utf-8') if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
if self.debug:
print(f" DEBUG: {method} {url}", file=sys.stderr)
if body:
print(f" DEBUG: Body: {body.decode()}", file=sys.stderr)
try:
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode('utf-8'))
if self.debug:
print(f" DEBUG: Response: {json.dumps(result)[:500]}", file=sys.stderr)
return result
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else ""
raise Exception(f"HTTP {e.code}: {error_body}")
def get_record(self, record_id: int | str) -> dict | None:
"""Get a single record by ID."""
try:
params = {"where": f"({self.ID_FIELD},eq,{record_id})", "limit": "1"}
result = self._request("GET", params=params)
records = result.get("list", [])
return records[0] if records else None
except Exception as e:
if self.debug:
print(f" DEBUG: get_record error: {e}", file=sys.stderr)
return None
def update_record(self, row_id: int, value: str) -> bool:
"""Update the seeding_users field on a record."""
try:
data = {"Id": row_id, self.SEEDING_FIELD: value}
self._request("PATCH", data=data)
return True
except Exception as e:
print(f" API error: {e}", file=sys.stderr)
return False
def parse_multiselect(value) -> set[str]:
"""Parse NocoDB multi-select field into a set of values."""
if value is None:
return set()
if isinstance(value, list):
return set(value)
if isinstance(value, str):
if not value.strip():
return set()
return set(v.strip() for v in value.split(','))
return set()
def format_multiselect(values: set[str]) -> str:
"""Format a set of values as NocoDB multi-select string."""
if not values:
return ""
return ",".join(sorted(values))
def main():
parser = argparse.ArgumentParser(
description='Match torrent files against qBittorrent session and update NocoDB.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# API mode - update NocoDB directly:
%(prog)s --id-folder ./torrents --bt-backup ~/.local/share/qBittorrent/BT_backup \\
--nocodb-url https://noco.example.com --table-id tblXXXXX --api-token xc-xxxx
# CSV mode - just output a file:
%(prog)s --id-folder ./torrents --bt-backup /path/to/BT_backup --csv-only
"""
)
parser.add_argument('--id-folder', required=True, type=Path,
help='Path to folder containing {id}.torrent files')
parser.add_argument('--bt-backup', required=True, type=Path,
help="Path to qBittorrent's BT_backup folder")
# NocoDB API options
parser.add_argument('--nocodb-url', type=str, default=None,
help='NocoDB base URL (e.g., http://localhost:8080)')
parser.add_argument('--table-id', type=str, default=None,
help='NocoDB table ID (starts with "tbl")')
parser.add_argument('--api-token', type=str, default=None,
help='NocoDB API token (xc-token)')
# CSV fallback
parser.add_argument('--csv-only', action='store_true',
help='Skip API, just output CSV')
parser.add_argument('--output', type=Path, default=Path('seeding_update.csv'),
help='Output CSV path (default: seeding_update.csv)')
parser.add_argument('--debug', action='store_true',
help='Print debug info for API calls')
args = parser.parse_args()
# Validate paths
if not args.id_folder.exists():
print(f"Error: ID folder does not exist: {args.id_folder}", file=sys.stderr)
sys.exit(1)
if not args.bt_backup.exists():
print(f"Error: BT_backup folder does not exist: {args.bt_backup}", file=sys.stderr)
sys.exit(1)
# Determine mode
use_api = not args.csv_only
# Show banner
print("=" * 50)
print("Seed Tracker")
print("=" * 50)
print(f"Mode: {'NocoDB API' if use_api else 'CSV output'}")
if use_api:
if not all([args.nocodb_url, args.table_id, args.api_token]):
print("Error: API mode requires --nocodb-url, --table-id, and --api-token", file=sys.stderr)
print(" Use --csv-only to skip API and just output CSV", file=sys.stderr)
sys.exit(1)
noco = NocoDBClient(
args.nocodb_url, args.table_id, args.api_token,
args.debug
)
# Test connection
print(f"\nTesting NocoDB connection...")
try:
test_result = noco._request("GET", params={"limit": "1"})
records = test_result.get("list", [])
if records:
print(f" ✓ Connected!")
else:
print(" ✓ Connected (table empty)")
except Exception as e:
print(f" ✗ Connection failed: {e}", file=sys.stderr)
sys.exit(1)
else:
noco = None
# Prompt for username
username = input("\nEnter your username: ").strip()
if not username:
print("Error: Username cannot be empty.", file=sys.stderr)
sys.exit(1)
print(f"\nRunning as: {username}")
# Get hashes from qBittorrent session
print(f"\nScanning BT_backup: {args.bt_backup}")
session_hashes, name_size_lookup = get_bt_backup_data(args.bt_backup)
print(f"Found {len(session_hashes)} torrents in qBittorrent session")
print(f"Built name+size index for {len(name_size_lookup)} torrents")
# Scan id-folder and match
print(f"\nScanning ID folder: {args.id_folder}")
print()
matches = []
stats = {'total': 0, 'hash_match': 0, 'name_size_match': 0, 'not_found': 0, 'hash_error': 0}
for torrent_file in sorted(args.id_folder.glob('*.torrent')):
stats['total'] += 1
# Extract ID from filename
torrent_id = torrent_file.stem
# Get info hash, name, and size
info_hash, name, size = get_torrent_info(torrent_file)
display_name = name or "(unknown)"
if info_hash is None:
stats['hash_error'] += 1
print(f"{torrent_id}: {display_name} [read error]")
continue
# Check if in session - try hash first, then name+size
if info_hash in session_hashes:
stats['hash_match'] += 1
matches.append(torrent_id)
print(f"{torrent_id}: {display_name}")
elif name and size and (name, size) in name_size_lookup:
stats['name_size_match'] += 1
matches.append(torrent_id)
print(f"{torrent_id}: {display_name} [name+size match]")
else:
stats['not_found'] += 1
print(f" - {torrent_id}: {display_name}")
# Update NocoDB or write CSV
if use_api:
print(f"\nUpdating {len(matches)} records in NocoDB...")
api_stats = {'updated': 0, 'already_seeding': 0, 'failed': 0, 'not_found': 0}
for i, torrent_id in enumerate(matches, 1):
# Get current record
record = noco.get_record(torrent_id)
if record is None:
print(f" ? {torrent_id}: record not found in NocoDB")
api_stats['not_found'] += 1
continue
# Parse current seeders
current_seeders = parse_multiselect(record.get(noco.SEEDING_FIELD))
if username in current_seeders:
print(f" = {torrent_id}: already listed")
api_stats['already_seeding'] += 1
continue
# Add username and update
current_seeders.add(username)
new_value = format_multiselect(current_seeders)
if noco.update_record(record["Id"], new_value):
print(f" + {torrent_id}: added {username}")
api_stats['updated'] += 1
else:
print(f" ! {torrent_id}: update failed")
api_stats['failed'] += 1
# Rate limit: NocoDB allows 5 req/sec, we'll do ~3/sec to be safe
if i % 3 == 0:
time.sleep(1)
# Summary
print("\n" + "=" * 50)
print("Summary")
print("=" * 50)
print(f" Total .torrent files scanned: {stats['total']}")
print(f" Matched by hash: {stats['hash_match']}")
print(f" Matched by name+size: {stats['name_size_match']}")
print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}")
print(f" Not in session: {stats['not_found']}")
print(f" Hash extraction errors: {stats['hash_error']}")
print()
print(f" API: Records updated: {api_stats['updated']}")
print(f" API: Already seeding: {api_stats['already_seeding']}")
print(f" API: Not found in NocoDB: {api_stats['not_found']}")
print(f" API: Update failed: {api_stats['failed']}")
print(f"\nDone!")
else:
# CSV-only mode
print(f"\nWriting {len(matches)} matches to: {args.output}")
with open(args.output, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Id', 'seeding_users'])
for torrent_id in matches:
writer.writerow([torrent_id, username])
# Summary
print("\n" + "=" * 50)
print("Summary")
print("=" * 50)
print(f" Total .torrent files scanned: {stats['total']}")
print(f" Matched by hash: {stats['hash_match']}")
print(f" Matched by name+size: {stats['name_size_match']}")
print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}")
print(f" Not in session: {stats['not_found']}")
print(f" Hash extraction errors: {stats['hash_error']}")
print(f"\nDone! Import {args.output} into NocoDB.")
if __name__ == '__main__':
main()