Files
xb-seed-catalog/xb_seed_status.py
constantprojects b260aa3bde initial commit!
2026-02-17 13:52:28 -07:00

513 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Seed Tracker - Match torrent files against qBittorrent session.
Compares {id}.torrent files against qBittorrent's BT_backup folder to determine
which torrents are actively seeding, then updates a NocoDB database with the
seeding user's name. Used for tracking who is seeding shared backup torrents.
Matching is done by:
1. Info hash (exact match)
2. Name + size fallback (for re-created torrents with different hashes)
Requirements:
- Python 3.10+
- No external dependencies (uses only stdlib)
Usage:
python seed_tracker.py --id-folder ./torrents --bt-backup /path/to/BT_backup \\
--nocodb-url https://noco.example.com --table-id tblXXX --api-token xc-xxx \\
--id-field cXXX --seeding-field cYYY
To find NocoDB IDs:
- Table ID: Click ... next to table name → Copy Table ID
- Field IDs: Click field header dropdown → Copy Field ID
"""
import argparse
import csv
import hashlib
import json
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
# --- Pure Python Bencode Decoder ---
def decode_bencode(data: bytes):
"""Decode bencoded data. Returns (decoded_value, remaining_bytes)."""
def decode_next(data: bytes, index: int):
if index >= len(data):
raise ValueError("Unexpected end of data")
char = chr(data[index])
# Integer: i<number>e
if char == 'i':
end = data.index(b'e', index)
return int(data[index + 1:end]), end + 1
# List: l<items>e
elif char == 'l':
result = []
index += 1
while chr(data[index]) != 'e':
item, index = decode_next(data, index)
result.append(item)
return result, index + 1
# Dictionary: d<key><value>...e
elif char == 'd':
result = {}
index += 1
while chr(data[index]) != 'e':
key, index = decode_next(data, index)
if isinstance(key, bytes):
key = key.decode('utf-8', errors='replace')
value, index = decode_next(data, index)
result[key] = value
return result, index + 1
# String: <length>:<content>
elif char.isdigit():
colon = data.index(b':', index)
length = int(data[index:colon])
start = colon + 1
return data[start:start + length], start + length
else:
raise ValueError(f"Invalid bencode at position {index}: {char}")
result, _ = decode_next(data, 0)
return result
def encode_bencode(data) -> bytes:
"""Encode data to bencode format."""
if isinstance(data, int):
return f"i{data}e".encode()
elif isinstance(data, bytes):
return f"{len(data)}:".encode() + data
elif isinstance(data, str):
encoded = data.encode('utf-8')
return f"{len(encoded)}:".encode() + encoded
elif isinstance(data, list):
return b'l' + b''.join(encode_bencode(item) for item in data) + b'e'
elif isinstance(data, dict):
result = b'd'
for key in sorted(data.keys()):
key_bytes = key.encode('utf-8') if isinstance(key, str) else key
result += encode_bencode(key_bytes)
result += encode_bencode(data[key])
result += b'e'
return result
else:
raise TypeError(f"Cannot encode type: {type(data)}")
# --- Core Functions ---
def get_torrent_info(torrent_path: Path) -> tuple[str | None, str | None, int | None]:
"""Extract info_hash, name, and total size from a .torrent file."""
try:
with open(torrent_path, 'rb') as f:
data = f.read()
decoded = decode_bencode(data)
if 'info' not in decoded:
return None, None, None
info = decoded['info']
# Get name
name = info.get('name')
if isinstance(name, bytes):
name = name.decode('utf-8', errors='replace')
# Get total size
if 'length' in info:
# Single file torrent
total_size = info['length']
elif 'files' in info:
# Multi-file torrent
total_size = sum(f['length'] for f in info['files'])
else:
total_size = None
# Re-encode the info dict and hash it
info_bencoded = encode_bencode(info)
info_hash = hashlib.sha1(info_bencoded).hexdigest().upper()
return info_hash, name, total_size
except Exception as e:
print(f" Warning: Could not read {torrent_path.name}: {e}", file=sys.stderr)
return None, None, None
def get_bt_backup_data(bt_backup_path: Path) -> tuple[set[str], dict[tuple[str, int], str]]:
"""Get info hashes and name+size lookup from qBittorrent's BT_backup folder.
Returns:
- Set of info hashes (for fast hash matching)
- Dict of (name, size) -> hash (for fallback matching)
"""
hashes = set()
name_size_lookup = {}
if not bt_backup_path.exists():
print(f"Warning: BT_backup path does not exist: {bt_backup_path}", file=sys.stderr)
return hashes, name_size_lookup
# First pass: collect all hashes from filenames (fast)
for file in bt_backup_path.iterdir():
if file.suffix in ('.torrent', '.fastresume'):
hashes.add(file.stem.upper())
# Second pass: parse .torrent files for name+size fallback
for file in bt_backup_path.iterdir():
if file.suffix == '.torrent':
try:
with open(file, 'rb') as f:
data = f.read()
decoded = decode_bencode(data)
if 'info' not in decoded:
continue
info = decoded['info']
# Get name
name = info.get('name')
if isinstance(name, bytes):
name = name.decode('utf-8', errors='replace')
# Get total size
if 'length' in info:
total_size = info['length']
elif 'files' in info:
total_size = sum(f['length'] for f in info['files'])
else:
continue
if name and total_size:
name_size_lookup[(name, total_size)] = file.stem.upper()
except Exception:
continue
return hashes, name_size_lookup
# --- NocoDB API Functions ---
class NocoDBClient:
"""Simple NocoDB API client."""
def __init__(self, base_url: str, table_id: str, api_token: str,
id_field: str, seeding_field: str, debug: bool = False):
self.base_url = base_url.rstrip('/')
self.table_id = table_id
self.api_token = api_token
self.id_field = id_field
self.seeding_field = seeding_field
self.debug = debug
self.endpoint = f"{self.base_url}/api/v2/tables/{table_id}/records"
def _request(self, method: str, data: dict | None = None, params: dict | None = None) -> dict:
"""Make an API request."""
url = self.endpoint
if params:
query = "&".join(f"{k}={urllib.request.quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {
"xc-token": self.api_token,
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json",
}
body = json.dumps(data).encode('utf-8') if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
if self.debug:
print(f" DEBUG: {method} {url}", file=sys.stderr)
if body:
print(f" DEBUG: Body: {body.decode()}", file=sys.stderr)
try:
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode('utf-8'))
if self.debug:
print(f" DEBUG: Response: {json.dumps(result)[:500]}", file=sys.stderr)
return result
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else ""
raise Exception(f"HTTP {e.code}: {error_body}")
def get_record(self, record_id: int | str) -> dict | None:
"""Get a single record by ID."""
try:
params = {"where": f"({self.id_field},eq,{record_id})", "limit": "1"}
result = self._request("GET", params=params)
records = result.get("list", [])
return records[0] if records else None
except Exception as e:
if self.debug:
print(f" DEBUG: get_record error: {e}", file=sys.stderr)
return None
def update_record(self, record_id: int | str, value: str) -> bool:
"""Update the seeding_users field on a record."""
try:
data = {self.id_field: int(record_id), self.seeding_field: value}
self._request("PATCH", data=data)
return True
except Exception as e:
print(f" API error: {e}", file=sys.stderr)
return False
def parse_multiselect(value) -> set[str]:
"""Parse NocoDB multi-select field into a set of values."""
if value is None:
return set()
if isinstance(value, list):
return set(value)
if isinstance(value, str):
if not value.strip():
return set()
return set(v.strip() for v in value.split(','))
return set()
def format_multiselect(values: set[str]) -> str:
"""Format a set of values as NocoDB multi-select string."""
if not values:
return ""
return ",".join(sorted(values))
def main():
parser = argparse.ArgumentParser(
description='Match torrent files against qBittorrent session and update NocoDB.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# API mode - update NocoDB directly:
%(prog)s --id-folder ./torrents --bt-backup ~/.local/share/qBittorrent/BT_backup \\
--nocodb-url https://noco.example.com --table-id tblXXXXX --api-token xc-xxxx \\
--id-field cXXXXX --seeding-field cYYYYY
# CSV mode - just output a file:
%(prog)s --id-folder ./torrents --bt-backup /path/to/BT_backup --csv-only
To find field IDs in NocoDB: click field header dropdown → Copy Field ID (starts with "c")
"""
)
parser.add_argument('--id-folder', required=True, type=Path,
help='Path to folder containing {id}.torrent files')
parser.add_argument('--bt-backup', required=True, type=Path,
help="Path to qBittorrent's BT_backup folder")
# NocoDB API options
parser.add_argument('--nocodb-url', type=str, default=None,
help='NocoDB base URL (e.g., http://localhost:8080)')
parser.add_argument('--table-id', type=str, default=None,
help='NocoDB table ID (starts with "tbl")')
parser.add_argument('--api-token', type=str, default=None,
help='NocoDB API token (xc-token)')
parser.add_argument('--id-field', type=str, default=None,
help='Field ID for the Id column (starts with "c")')
parser.add_argument('--seeding-field', type=str, default=None,
help='Field ID for the seeding_users column (starts with "c")')
# CSV fallback
parser.add_argument('--csv-only', action='store_true',
help='Skip API, just output CSV')
parser.add_argument('--output', type=Path, default=Path('seeding_update.csv'),
help='Output CSV path (default: seeding_update.csv)')
parser.add_argument('--debug', action='store_true',
help='Print debug info for API calls')
args = parser.parse_args()
# Validate paths
if not args.id_folder.exists():
print(f"Error: ID folder does not exist: {args.id_folder}", file=sys.stderr)
sys.exit(1)
if not args.bt_backup.exists():
print(f"Error: BT_backup folder does not exist: {args.bt_backup}", file=sys.stderr)
sys.exit(1)
# Determine mode
use_api = not args.csv_only
# Show banner
print("=" * 50)
print("Seed Tracker")
print("=" * 50)
print(f"Mode: {'NocoDB API' if use_api else 'CSV output'}")
if use_api:
if not all([args.nocodb_url, args.table_id, args.api_token, args.id_field, args.seeding_field]):
print("Error: API mode requires --nocodb-url, --table-id, --api-token, --id-field, and --seeding-field", file=sys.stderr)
print(" Use --csv-only to skip API and just output CSV", file=sys.stderr)
sys.exit(1)
noco = NocoDBClient(
args.nocodb_url, args.table_id, args.api_token,
args.id_field, args.seeding_field, args.debug
)
# Test connection
print(f"\nTesting NocoDB connection...")
try:
test_result = noco._request("GET", params={"limit": "1"})
records = test_result.get("list", [])
if records:
print(f" ✓ Connected!")
else:
print(" ✓ Connected (table empty)")
except Exception as e:
print(f" ✗ Connection failed: {e}", file=sys.stderr)
sys.exit(1)
else:
noco = None
# Prompt for username
username = input("\nEnter your username: ").strip()
if not username:
print("Error: Username cannot be empty.", file=sys.stderr)
sys.exit(1)
print(f"\nRunning as: {username}")
# Get hashes from qBittorrent session
print(f"\nScanning BT_backup: {args.bt_backup}")
session_hashes, name_size_lookup = get_bt_backup_data(args.bt_backup)
print(f"Found {len(session_hashes)} torrents in qBittorrent session")
print(f"Built name+size index for {len(name_size_lookup)} torrents")
# Scan id-folder and match
print(f"\nScanning ID folder: {args.id_folder}")
print()
matches = []
stats = {'total': 0, 'hash_match': 0, 'name_size_match': 0, 'not_found': 0, 'hash_error': 0}
for torrent_file in sorted(args.id_folder.glob('*.torrent')):
stats['total'] += 1
# Extract ID from filename
torrent_id = torrent_file.stem
# Get info hash, name, and size
info_hash, name, size = get_torrent_info(torrent_file)
display_name = name or "(unknown)"
if info_hash is None:
stats['hash_error'] += 1
print(f"{torrent_id}: {display_name} [read error]")
continue
# Check if in session - try hash first, then name+size
if info_hash in session_hashes:
stats['hash_match'] += 1
matches.append(torrent_id)
print(f"{torrent_id}: {display_name}")
elif name and size and (name, size) in name_size_lookup:
stats['name_size_match'] += 1
matches.append(torrent_id)
print(f"{torrent_id}: {display_name} [name+size match]")
else:
stats['not_found'] += 1
print(f" - {torrent_id}: {display_name}")
# Update NocoDB or write CSV
if use_api:
print(f"\nUpdating {len(matches)} records in NocoDB...")
api_stats = {'updated': 0, 'already_seeding': 0, 'failed': 0, 'not_found': 0}
for i, torrent_id in enumerate(matches, 1):
# Get current record
record = noco.get_record(torrent_id)
if record is None:
print(f" ? {torrent_id}: record not found in NocoDB")
api_stats['not_found'] += 1
continue
# Parse current seeders
current_seeders = parse_multiselect(record.get(noco.seeding_field))
if username in current_seeders:
print(f" = {torrent_id}: already listed")
api_stats['already_seeding'] += 1
continue
# Add username and update
current_seeders.add(username)
new_value = format_multiselect(current_seeders)
if noco.update_record(torrent_id, new_value):
print(f" + {torrent_id}: added {username}")
api_stats['updated'] += 1
else:
print(f" ! {torrent_id}: update failed")
api_stats['failed'] += 1
# Rate limit: NocoDB allows 5 req/sec, we'll do ~3/sec to be safe
if i % 3 == 0:
time.sleep(1)
# Summary
print("\n" + "=" * 50)
print("Summary")
print("=" * 50)
print(f" Total .torrent files scanned: {stats['total']}")
print(f" Matched by hash: {stats['hash_match']}")
print(f" Matched by name+size: {stats['name_size_match']}")
print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}")
print(f" Not in session: {stats['not_found']}")
print(f" Hash extraction errors: {stats['hash_error']}")
print()
print(f" API: Records updated: {api_stats['updated']}")
print(f" API: Already seeding: {api_stats['already_seeding']}")
print(f" API: Not found in NocoDB: {api_stats['not_found']}")
print(f" API: Update failed: {api_stats['failed']}")
print(f"\nDone!")
else:
# CSV-only mode
print(f"\nWriting {len(matches)} matches to: {args.output}")
with open(args.output, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Id', 'seeding_users'])
for torrent_id in matches:
writer.writerow([torrent_id, username])
# Summary
print("\n" + "=" * 50)
print("Summary")
print("=" * 50)
print(f" Total .torrent files scanned: {stats['total']}")
print(f" Matched by hash: {stats['hash_match']}")
print(f" Matched by name+size: {stats['name_size_match']}")
print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}")
print(f" Not in session: {stats['not_found']}")
print(f" Hash extraction errors: {stats['hash_error']}")
print(f"\nDone! Import {args.output} into NocoDB.")
if __name__ == '__main__':
main()