// Aggregates runtime stats for the admin dashboard. // // Stat sources: // - In-memory (free): IMDb ratings Map size, IMDb mapping Map sizes, search workers count // - File reads (cheap, cached 30s): cron.txt, lastcron.txt tail // - Streaming line counts (cached 5min): tmdbintegral/movie.json, tv.json // - Disk usage via du (lazy, cached 10min): movie/, tv/, justwatchmovie/, justwatchtv/ // - Prometheus counters: HTTP totals, search cache hits/misses import { execFile as execFileCb } from 'node:child_process'; import { createReadStream, existsSync, statSync } from 'node:fs'; import { readFile } from 'node:fs/promises'; import { join } from 'node:path'; import { createInterface } from 'node:readline'; import { promisify } from 'node:util'; import { CRON_TXT, IMDB_RATINGS, JUSTWATCH_MOVIE_DIR, JUSTWATCH_TV_DIR, LASTCRON_TXT, MOVIE_DIR, TMDBINTEGRAL_DIR, TV_DIR, } from '../config.js'; import { preloadMappings } from './imdbMapping.js'; import { getRatings } from './imdbRatings.js'; import { registry } from './metrics.js'; const execFile = promisify(execFileCb); // ---------- TTL cache helpers ---------- const ttlCache = new Map(); async function cached(key, ttlMs, fn) { const now = Date.now(); const hit = ttlCache.get(key); if (hit && now - hit.t < ttlMs) return hit.v; const v = await fn(); ttlCache.set(key, { t: now, v }); return v; } // ---------- Cheap helpers ---------- async function countLines(file) { if (!existsSync(file)) return 0; let n = 0; const rl = createInterface({ input: createReadStream(file), crlfDelay: Infinity }); for await (const _ of rl) n++; return n; } async function tailLines(file, n) { if (!existsSync(file)) return []; // Read whole file — log files are < 1 MB usually. If they grow, switch to a // backwards-stream chunk reader. const text = await readFile(file, 'utf8'); const lines = text.split('\n'); return lines.slice(-n - 1, -1); } async function diskUsage(dir) { if (!existsSync(dir)) return 0; try { const { stdout } = await execFile('du', ['-sb', dir], { maxBuffer: 1024 * 1024 }); const bytes = parseInt(stdout.split('\t')[0], 10); return Number.isFinite(bytes) ? bytes : 0; } catch { return 0; } } // ---------- Cron parsing ---------- function parseCronTxt(text) { if (!text) return { started: null, finished: null, duration_s: null, status: 'unknown' }; const startMatch = text.match(/Started At (.+)/); const finishMatch = text.match(/Finished At (.+)/); const started = startMatch ? new Date(startMatch[1]).getTime() || null : null; const finished = finishMatch ? new Date(finishMatch[1]).getTime() || null : null; const duration_s = started && finished ? Math.round((finished - started) / 1000) : null; const status = finished ? 'ok' : started ? 'running_or_failed' : 'unknown'; return { started, finished, duration_s, status }; } function summarizeLogTail(lines) { const counts = { downloading: 0, updating: 0, removing: 0, failed: 0, writing: 0 }; for (const l of lines) { if (l.startsWith('Downloading:')) counts.downloading++; else if (l.startsWith('Updating:')) counts.updating++; else if (l.startsWith('Removing:')) counts.removing++; else if (l.startsWith('Writing ')) counts.writing++; else if (l.startsWith('Failed')) counts.failed++; } return counts; } // ---------- Aggregator ---------- async function getCounterValue(name) { const arr = await registry.getSingleMetric(name)?.get(); if (!arr?.values) return 0; return arr.values.reduce((sum, v) => sum + (v.value || 0), 0); } export async function getStats() { const [ movieTotal, tvTotal, cronText, cronLogTail, duMovie, duTv, duJwMovie, duJwTv, duRatings, ratings, mappings, ] = await Promise.all([ cached('movie_total', 5 * 60_000, () => countLines(join(TMDBINTEGRAL_DIR, 'movie.json'))), cached('tv_total', 5 * 60_000, () => countLines(join(TMDBINTEGRAL_DIR, 'tv.json'))), cached('cron_txt', 30_000, async () => (existsSync(CRON_TXT) ? await readFile(CRON_TXT, 'utf8') : '')), cached('cron_tail', 30_000, () => tailLines(LASTCRON_TXT, 200)), cached('du_movie', 10 * 60_000, () => diskUsage(MOVIE_DIR)), cached('du_tv', 10 * 60_000, () => diskUsage(TV_DIR)), cached('du_jwmovie', 10 * 60_000, () => diskUsage(JUSTWATCH_MOVIE_DIR)), cached('du_jwtv', 10 * 60_000, () => diskUsage(JUSTWATCH_TV_DIR)), cached('du_ratings', 10 * 60_000, async () => existsSync(IMDB_RATINGS) ? statSync(IMDB_RATINGS).size : 0, ), getRatings().catch(() => null), preloadMappings().catch(() => ({ movie: 0, tv: 0 })), ]); const cron = parseCronTxt(cronText); const cronLogSummary = summarizeLogTail(cronLogTail); const cacheHits = await getCounterValue('search_cache_hits_total'); const cacheMisses = await getCounterValue('search_cache_misses_total'); const httpRequests = await getCounterValue('http_requests_total'); return { timestamp: Date.now(), data: { movies: { master: movieTotal, with_imdb: mappings.movie }, tv: { master: tvTotal, with_imdb: mappings.tv }, imdb_ratings: ratings ? ratings.size : 0, disk: { movie_b: duMovie, tv_b: duTv, justwatch_movie_b: duJwMovie, justwatch_tv_b: duJwTv, ratings_b: duRatings, total_b: duMovie + duTv + duJwMovie + duJwTv + duRatings, }, }, cron: { ...cron, log_summary: cronLogSummary, log_tail: cronLogTail.slice(-50), }, search_cache: { hits: cacheHits, misses: cacheMisses, hit_rate: cacheHits + cacheMisses > 0 ? cacheHits / (cacheHits + cacheMisses) : null, }, http: { total_requests: httpRequests, }, process: { uptime_s: Math.round(process.uptime()), memory_mb: Math.round(process.memoryUsage().rss / 1024 / 1024), heap_mb: Math.round(process.memoryUsage().heapUsed / 1024 / 1024), node: process.version, pid: process.pid, }, }; } // Returns the parsed Prometheus metrics in a structured form for the admin UI. // Groups by metric name + labels, returns counters as scalars and histograms // with their bucket counts. export async function getMetricsForUI() { const text = await registry.metrics(); const groups = {}; let currentName = null; let currentHelp = ''; let currentType = ''; for (const line of text.split('\n')) { if (!line) continue; if (line.startsWith('# HELP ')) { const m = line.match(/^# HELP (\S+) (.+)$/); if (m) { currentName = m[1]; currentHelp = m[2]; } continue; } if (line.startsWith('# TYPE ')) { const m = line.match(/^# TYPE (\S+) (\S+)$/); if (m) { currentName = m[1]; currentType = m[2]; groups[currentName] = { name: currentName, help: currentHelp, type: currentType, samples: [] }; } continue; } if (line.startsWith('#')) continue; const m = line.match(/^(\S+?)(?:\{([^}]*)\})?\s+(\S+)/); if (!m) continue; const baseName = m[1].replace(/_(bucket|count|sum)$/, ''); const labelsStr = m[2] || ''; const value = parseFloat(m[3]); const labels = {}; if (labelsStr) { for (const pair of labelsStr.split(',')) { const [k, v] = pair.split('='); if (k && v) labels[k.trim()] = v.replace(/^"|"$/g, ''); } } const grp = groups[baseName] || groups[m[1]]; if (grp) grp.samples.push({ name: m[1], labels, value }); } return Object.values(groups); }