import type { SupabaseClient } from '@supabase/supabase-js' import type { SearchParams, Trip } from '../utils/flightics' const POLL_INTERVAL = 60_000 // Revisar cola cada 60 segundos const DELAY_BETWEEN_JOBS = 10_000 // 10s entre busquedas (rate limit) const MAX_JOBS_PER_CYCLE = 5 // Maximo jobs por ciclo const SEARCH_TIMEOUT = 30_000 // Timeout por busqueda individual interface TrackedJob { id: string search_params: SearchParams interval_hours: number run_count: number expires_at: string | null } export default defineNitroPlugin((nitro) => { let running = false const interval = setInterval(processQueue, POLL_INTERVAL) nitro.hooks.hook('close', () => clearInterval(interval)) // Primera ejecucion tras 10s de arranque setTimeout(processQueue, 10_000) async function processQueue() { if (running) return running = true try { const supabase = getSupabaseAdmin() // Buscar tracked_searches que necesitan ejecutarse const { data: pendingJobs, error } = await supabase .from('tracked_searches') .select('*') .eq('is_active', true) .lte('next_run_at', new Date().toISOString()) .order('next_run_at', { ascending: true }) .limit(MAX_JOBS_PER_CYCLE) if (error || !pendingJobs || pendingJobs.length === 0) { return } console.log(`[search-worker] Procesando ${pendingJobs.length} busquedas pendientes`) for (const job of pendingJobs as unknown as TrackedJob[]) { try { await processJob(supabase, job) } catch (e: unknown) { const msg = e instanceof Error ? e.message : String(e) console.error(`[search-worker] Error en job ${job.id}:`, msg) await markJobFailed(supabase, job, msg) } // Esperar entre jobs para no saturar Flightics if ((pendingJobs as unknown as TrackedJob[]).indexOf(job) < pendingJobs.length - 1) { await new Promise(r => setTimeout(r, DELAY_BETWEEN_JOBS)) } } // Limpiar cache viejo (>2 horas) await supabase .from('search_cache') .delete() .lt('fetched_at', new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString()) } catch (e: unknown) { const msg = e instanceof Error ? e.message : String(e) console.error('[search-worker] Error general:', msg) } finally { running = false } } async function processJob(supabase: SupabaseClient, job: TrackedJob) { const searchParams = job.search_params // Validar que las fechas no esten en el pasado const departureDateEnd = searchParams.departureDateInterval?.end if (departureDateEnd) { const endDate = new Date(departureDateEnd) if (endDate < new Date()) { console.log(`[search-worker] Job ${job.id}: fechas en el pasado, desactivando`) await supabase .from('tracked_searches') .update({ is_active: false, last_error: 'Fechas de busqueda en el pasado' } as Record) .eq('id', job.id) return } } // Verificar expiracion if (job.expires_at && new Date(job.expires_at) < new Date()) { console.log(`[search-worker] Job ${job.id}: expirado, desactivando`) await supabase .from('tracked_searches') .update({ is_active: false } as Record) .eq('id', job.id) return } // Crear search_run const { data: run } = await supabase .from('search_runs') .insert({ tracked_search_id: job.id, status: 'running', started_at: new Date().toISOString() } as Record) .select('id') .single() if (!run) throw new Error('No se pudo crear search_run') // Buscar en cache primero const paramsHash = computeSearchHash(searchParams as unknown as Record) let trips: Trip[] = [] let fromCache = false const { data: cached } = await supabase .from('search_cache') .select('trips') .eq('params_hash', paramsHash) .gte('fetched_at', new Date(Date.now() - 60 * 60 * 1000).toISOString()) .single() if (cached?.trips) { trips = cached.trips as unknown as Trip[] fromCache = true console.log(`[search-worker] Job ${job.id}: usando cache (${trips.length} trips)`) } else { // Llamar a Flightics con timeout console.log(`[search-worker] Job ${job.id}: buscando en Flightics...`) const result = await Promise.race([ searchTripsComplete(searchParams, 3), new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout de busqueda')), SEARCH_TIMEOUT) ) ]) trips = result.trips || [] // Guardar en cache if (trips.length > 0) { const prices = trips.map(t => t.totalCost).filter(p => p > 0) await supabase .from('search_cache') .upsert({ params_hash: paramsHash, search_params: searchParams, trips, cheapest_price: prices.length > 0 ? Math.min(...prices) : null, total_results: trips.length, fetched_at: new Date().toISOString() } as Record, { onConflict: 'params_hash' }) } } // Calcular estadisticas de precio const prices = trips.map(t => t.totalCost).filter(p => p > 0).sort((a, b) => a - b) const cheapest = prices.length > 0 ? prices[0] : null const avg = prices.length > 0 ? Math.round(prices.reduce((s, p) => s + p, 0) / prices.length * 100) / 100 : null const median = prices.length > 0 ? prices[Math.floor(prices.length / 2)] : null // Top 5 trips mas baratos (resumen compacto) const topTrips = trips .filter(t => t.totalCost > 0) .sort((a, b) => a.totalCost - b.totalCost) .slice(0, 5) .map(t => ({ price: t.totalCost, currency: t.currency, bookingToken: t.bookingToken, legs: t.legs.map(l => ({ from: l.segments[0]?.departureCode, to: l.segments[l.segments.length - 1]?.arrivalCode, departure: l.segments[0]?.departureDate, airlines: [...new Set(l.segments.map(s => s.company?.code).filter(Boolean))] })) })) // Actualizar search_run const runId = (run as Record).id as string await supabase .from('search_runs') .update({ status: 'completed', cheapest_price: cheapest, total_trips_found: trips.length, top_trips: topTrips, from_cache: fromCache, completed_at: new Date().toISOString() } as Record) .eq('id', runId) // Insertar price_snapshot if (cheapest !== null) { await supabase .from('price_snapshots') .insert({ tracked_search_id: job.id, search_run_id: runId, cheapest_price: cheapest, avg_price: avg, median_price: median, total_results: trips.length, recorded_at: new Date().toISOString() } as Record) } // Actualizar tracked_search: next_run_at, last_run_at, run_count const nextRun = new Date(Date.now() + job.interval_hours * 60 * 60 * 1000) await supabase .from('tracked_searches') .update({ next_run_at: nextRun.toISOString(), last_run_at: new Date().toISOString(), run_count: (job.run_count || 0) + 1, last_error: null } as Record) .eq('id', job.id) console.log(`[search-worker] Job ${job.id} completado: ${trips.length} trips, mejor precio: ${cheapest}`) } async function markJobFailed(supabase: SupabaseClient, job: TrackedJob, errorMessage: string) { // Buscar el run mas reciente en estado running const { data: runs } = await supabase .from('search_runs') .select('id') .eq('tracked_search_id', job.id) .eq('status', 'running') .order('created_at', { ascending: false }) .limit(1) if (runs && runs.length > 0) { await supabase .from('search_runs') .update({ status: 'failed', error_message: errorMessage, completed_at: new Date().toISOString() } as Record) .eq('id', (runs[0] as Record).id as string) } // Programar reintento (avanzar next_run_at) const nextRun = new Date(Date.now() + job.interval_hours * 60 * 60 * 1000) await supabase .from('tracked_searches') .update({ last_error: errorMessage, next_run_at: nextRun.toISOString() } as Record) .eq('id', job.id) } })