#!/usr/bin/env python3 """ CSV Uploader - Upload seed_tracker CSV output to NocoDB. For use when others generate CSVs with --csv-only and send them to you for upload. """ import argparse import csv import json import sys import time import urllib.request import urllib.error from pathlib import Path class NocoDBClient: """Simple NocoDB API client.""" ID_FIELD = "Id" SEEDING_FIELD = "Seeding Users" def __init__(self, base_url: str, table_id: str, api_token: str): self.base_url = base_url.rstrip('/') self.table_id = table_id self.api_token = api_token self.endpoint = f"{self.base_url}/api/v2/tables/{table_id}/records" def _request(self, method: str, data: dict | None = None, params: dict | None = None) -> dict: url = self.endpoint if params: query = "&".join(f"{k}={urllib.request.quote(str(v))}" for k, v in params.items()) url = f"{url}?{query}" headers = { "xc-token": self.api_token, "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/json", } body = json.dumps(data).encode('utf-8') if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as response: return json.loads(response.read().decode('utf-8')) except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else "" raise Exception(f"HTTP {e.code}: {error_body}") def get_record(self, record_id: int | str) -> dict | None: try: params = {"where": f"({self.ID_FIELD},eq,{record_id})", "limit": "1"} result = self._request("GET", params=params) records = result.get("list", []) return records[0] if records else None except Exception: return None def update_record(self, row_id: int, value: str) -> bool: try: data = {"Id": row_id, self.SEEDING_FIELD: value} self._request("PATCH", data=data) return True except Exception as e: print(f" API error: {e}", file=sys.stderr) return False def parse_multiselect(value) -> set[str]: if value is None: return set() if isinstance(value, list): return set(value) if isinstance(value, str): if not value.strip(): return set() return set(v.strip() for v in value.split(',')) return set() def format_multiselect(values: set[str]) -> str: if not values: return "" return ",".join(sorted(values)) def main(): parser = argparse.ArgumentParser( description='Upload seed_tracker CSV to NocoDB.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Example: %(prog)s --csv seeds.csv --nocodb-url https://noco.example.com \\ --table-id xxxxxxxxxxxxx --api-token xc-xxx """ ) parser.add_argument('--csv', required=True, type=Path, help='CSV file to upload (Id,seeding_users columns)') parser.add_argument('--nocodb-url', required=True, type=str, help='NocoDB base URL') parser.add_argument('--table-id', required=True, type=str, help='Table ID') parser.add_argument('--api-token', required=True, type=str, help='API token') args = parser.parse_args() if not args.csv.exists(): print(f"Error: CSV file not found: {args.csv}", file=sys.stderr) sys.exit(1) # Read CSV with open(args.csv, 'r', newline='', encoding='utf-8') as f: reader = csv.DictReader(f) rows = list(reader) print(f"Loaded {len(rows)} rows from {args.csv}") # Connect noco = NocoDBClient(args.nocodb_url, args.table_id, args.api_token) print("Testing connection...") try: noco._request("GET", params={"limit": "1"}) print(" ✓ Connected!") except Exception as e: print(f" ✗ Connection failed: {e}", file=sys.stderr) sys.exit(1) # Process print(f"\nUpdating records...") stats = {'updated': 0, 'already': 0, 'not_found': 0, 'failed': 0} for i, row in enumerate(rows, 1): record_id = row.get('Id') or row.get('id') username = row.get('seeding_users') or row.get('Seeding_Users') if not record_id or not username: continue record = noco.get_record(record_id) if record is None: print(f" ? {record_id}: not found") stats['not_found'] += 1 continue current = parse_multiselect(record.get(noco.SEEDING_FIELD)) if username in current: print(f" = {record_id}: already listed") stats['already'] += 1 continue current.add(username) if noco.update_record(record["Id"], format_multiselect(current)): print(f" + {record_id}: added {username}") stats['updated'] += 1 else: print(f" ! {record_id}: failed") stats['failed'] += 1 if i % 3 == 0: time.sleep(1) # Summary print(f"\nDone!") print(f" Updated: {stats['updated']}") print(f" Already: {stats['already']}") print(f" Not found: {stats['not_found']}") print(f" Failed: {stats['failed']}") if __name__ == '__main__': main()