Initial commit: Vuelato - buscador de vuelos
Some checks failed
ci / ci (22, ubuntu-latest) (push) Has been cancelled
Some checks failed
ci / ci (22, ubuntu-latest) (push) Has been cancelled
Nuxt 4 + Supabase + Flightics API. Incluye búsqueda de vuelos, inspiraciones, watchlist, tracking de precios y mapa interactivo. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
258
server/plugins/search-worker.ts
Normal file
258
server/plugins/search-worker.ts
Normal file
@@ -0,0 +1,258 @@
|
||||
import type { SupabaseClient } from '@supabase/supabase-js'
|
||||
import type { SearchParams, Trip } from '../utils/flightics'
|
||||
|
||||
const POLL_INTERVAL = 60_000 // Revisar cola cada 60 segundos
|
||||
const DELAY_BETWEEN_JOBS = 10_000 // 10s entre busquedas (rate limit)
|
||||
const MAX_JOBS_PER_CYCLE = 5 // Maximo jobs por ciclo
|
||||
const SEARCH_TIMEOUT = 30_000 // Timeout por busqueda individual
|
||||
|
||||
interface TrackedJob {
|
||||
id: string
|
||||
search_params: SearchParams
|
||||
interval_hours: number
|
||||
run_count: number
|
||||
expires_at: string | null
|
||||
}
|
||||
|
||||
export default defineNitroPlugin((nitro) => {
|
||||
let running = false
|
||||
|
||||
const interval = setInterval(processQueue, POLL_INTERVAL)
|
||||
nitro.hooks.hook('close', () => clearInterval(interval))
|
||||
|
||||
// Primera ejecucion tras 10s de arranque
|
||||
setTimeout(processQueue, 10_000)
|
||||
|
||||
async function processQueue() {
|
||||
if (running) return
|
||||
running = true
|
||||
|
||||
try {
|
||||
const supabase = getSupabaseAdmin()
|
||||
|
||||
// Buscar tracked_searches que necesitan ejecutarse
|
||||
const { data: pendingJobs, error } = await supabase
|
||||
.from('tracked_searches')
|
||||
.select('*')
|
||||
.eq('is_active', true)
|
||||
.lte('next_run_at', new Date().toISOString())
|
||||
.order('next_run_at', { ascending: true })
|
||||
.limit(MAX_JOBS_PER_CYCLE)
|
||||
|
||||
if (error || !pendingJobs || pendingJobs.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
console.log(`[search-worker] Procesando ${pendingJobs.length} busquedas pendientes`)
|
||||
|
||||
for (const job of pendingJobs as unknown as TrackedJob[]) {
|
||||
try {
|
||||
await processJob(supabase, job)
|
||||
} catch (e: unknown) {
|
||||
const msg = e instanceof Error ? e.message : String(e)
|
||||
console.error(`[search-worker] Error en job ${job.id}:`, msg)
|
||||
await markJobFailed(supabase, job, msg)
|
||||
}
|
||||
|
||||
// Esperar entre jobs para no saturar Flightics
|
||||
if ((pendingJobs as unknown as TrackedJob[]).indexOf(job) < pendingJobs.length - 1) {
|
||||
await new Promise(r => setTimeout(r, DELAY_BETWEEN_JOBS))
|
||||
}
|
||||
}
|
||||
|
||||
// Limpiar cache viejo (>2 horas)
|
||||
await supabase
|
||||
.from('search_cache')
|
||||
.delete()
|
||||
.lt('fetched_at', new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString())
|
||||
|
||||
} catch (e: unknown) {
|
||||
const msg = e instanceof Error ? e.message : String(e)
|
||||
console.error('[search-worker] Error general:', msg)
|
||||
} finally {
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
async function processJob(supabase: SupabaseClient, job: TrackedJob) {
|
||||
const searchParams = job.search_params
|
||||
|
||||
// Validar que las fechas no esten en el pasado
|
||||
const departureDateEnd = searchParams.departureDateInterval?.end
|
||||
if (departureDateEnd) {
|
||||
const endDate = new Date(departureDateEnd)
|
||||
if (endDate < new Date()) {
|
||||
console.log(`[search-worker] Job ${job.id}: fechas en el pasado, desactivando`)
|
||||
await supabase
|
||||
.from('tracked_searches')
|
||||
.update({ is_active: false, last_error: 'Fechas de busqueda en el pasado' } as Record<string, unknown>)
|
||||
.eq('id', job.id)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Verificar expiracion
|
||||
if (job.expires_at && new Date(job.expires_at) < new Date()) {
|
||||
console.log(`[search-worker] Job ${job.id}: expirado, desactivando`)
|
||||
await supabase
|
||||
.from('tracked_searches')
|
||||
.update({ is_active: false } as Record<string, unknown>)
|
||||
.eq('id', job.id)
|
||||
return
|
||||
}
|
||||
|
||||
// Crear search_run
|
||||
const { data: run } = await supabase
|
||||
.from('search_runs')
|
||||
.insert({
|
||||
tracked_search_id: job.id,
|
||||
status: 'running',
|
||||
started_at: new Date().toISOString()
|
||||
} as Record<string, unknown>)
|
||||
.select('id')
|
||||
.single()
|
||||
|
||||
if (!run) throw new Error('No se pudo crear search_run')
|
||||
|
||||
// Buscar en cache primero
|
||||
const paramsHash = computeSearchHash(searchParams as unknown as Record<string, unknown>)
|
||||
let trips: Trip[] = []
|
||||
let fromCache = false
|
||||
|
||||
const { data: cached } = await supabase
|
||||
.from('search_cache')
|
||||
.select('trips')
|
||||
.eq('params_hash', paramsHash)
|
||||
.gte('fetched_at', new Date(Date.now() - 60 * 60 * 1000).toISOString())
|
||||
.single()
|
||||
|
||||
if (cached?.trips) {
|
||||
trips = cached.trips as unknown as Trip[]
|
||||
fromCache = true
|
||||
console.log(`[search-worker] Job ${job.id}: usando cache (${trips.length} trips)`)
|
||||
} else {
|
||||
// Llamar a Flightics con timeout
|
||||
console.log(`[search-worker] Job ${job.id}: buscando en Flightics...`)
|
||||
const result = await Promise.race([
|
||||
searchTripsComplete(searchParams, 3),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Timeout de busqueda')), SEARCH_TIMEOUT)
|
||||
)
|
||||
])
|
||||
trips = result.trips || []
|
||||
|
||||
// Guardar en cache
|
||||
if (trips.length > 0) {
|
||||
const prices = trips.map(t => t.totalCost).filter(p => p > 0)
|
||||
await supabase
|
||||
.from('search_cache')
|
||||
.upsert({
|
||||
params_hash: paramsHash,
|
||||
search_params: searchParams,
|
||||
trips,
|
||||
cheapest_price: prices.length > 0 ? Math.min(...prices) : null,
|
||||
total_results: trips.length,
|
||||
fetched_at: new Date().toISOString()
|
||||
} as Record<string, unknown>, { onConflict: 'params_hash' })
|
||||
}
|
||||
}
|
||||
|
||||
// Calcular estadisticas de precio
|
||||
const prices = trips.map(t => t.totalCost).filter(p => p > 0).sort((a, b) => a - b)
|
||||
const cheapest = prices.length > 0 ? prices[0] : null
|
||||
const avg = prices.length > 0 ? Math.round(prices.reduce((s, p) => s + p, 0) / prices.length * 100) / 100 : null
|
||||
const median = prices.length > 0 ? prices[Math.floor(prices.length / 2)] : null
|
||||
|
||||
// Top 5 trips mas baratos (resumen compacto)
|
||||
const topTrips = trips
|
||||
.filter(t => t.totalCost > 0)
|
||||
.sort((a, b) => a.totalCost - b.totalCost)
|
||||
.slice(0, 5)
|
||||
.map(t => ({
|
||||
price: t.totalCost,
|
||||
currency: t.currency,
|
||||
bookingToken: t.bookingToken,
|
||||
legs: t.legs.map(l => ({
|
||||
from: l.segments[0]?.departureCode,
|
||||
to: l.segments[l.segments.length - 1]?.arrivalCode,
|
||||
departure: l.segments[0]?.departureDate,
|
||||
airlines: [...new Set(l.segments.map(s => s.company?.code).filter(Boolean))]
|
||||
}))
|
||||
}))
|
||||
|
||||
// Actualizar search_run
|
||||
const runId = (run as Record<string, unknown>).id as string
|
||||
await supabase
|
||||
.from('search_runs')
|
||||
.update({
|
||||
status: 'completed',
|
||||
cheapest_price: cheapest,
|
||||
total_trips_found: trips.length,
|
||||
top_trips: topTrips,
|
||||
from_cache: fromCache,
|
||||
completed_at: new Date().toISOString()
|
||||
} as Record<string, unknown>)
|
||||
.eq('id', runId)
|
||||
|
||||
// Insertar price_snapshot
|
||||
if (cheapest !== null) {
|
||||
await supabase
|
||||
.from('price_snapshots')
|
||||
.insert({
|
||||
tracked_search_id: job.id,
|
||||
search_run_id: runId,
|
||||
cheapest_price: cheapest,
|
||||
avg_price: avg,
|
||||
median_price: median,
|
||||
total_results: trips.length,
|
||||
recorded_at: new Date().toISOString()
|
||||
} as Record<string, unknown>)
|
||||
}
|
||||
|
||||
// Actualizar tracked_search: next_run_at, last_run_at, run_count
|
||||
const nextRun = new Date(Date.now() + job.interval_hours * 60 * 60 * 1000)
|
||||
await supabase
|
||||
.from('tracked_searches')
|
||||
.update({
|
||||
next_run_at: nextRun.toISOString(),
|
||||
last_run_at: new Date().toISOString(),
|
||||
run_count: (job.run_count || 0) + 1,
|
||||
last_error: null
|
||||
} as Record<string, unknown>)
|
||||
.eq('id', job.id)
|
||||
|
||||
console.log(`[search-worker] Job ${job.id} completado: ${trips.length} trips, mejor precio: ${cheapest}`)
|
||||
}
|
||||
|
||||
async function markJobFailed(supabase: SupabaseClient, job: TrackedJob, errorMessage: string) {
|
||||
// Buscar el run mas reciente en estado running
|
||||
const { data: runs } = await supabase
|
||||
.from('search_runs')
|
||||
.select('id')
|
||||
.eq('tracked_search_id', job.id)
|
||||
.eq('status', 'running')
|
||||
.order('created_at', { ascending: false })
|
||||
.limit(1)
|
||||
|
||||
if (runs && runs.length > 0) {
|
||||
await supabase
|
||||
.from('search_runs')
|
||||
.update({
|
||||
status: 'failed',
|
||||
error_message: errorMessage,
|
||||
completed_at: new Date().toISOString()
|
||||
} as Record<string, unknown>)
|
||||
.eq('id', (runs[0] as Record<string, unknown>).id as string)
|
||||
}
|
||||
|
||||
// Programar reintento (avanzar next_run_at)
|
||||
const nextRun = new Date(Date.now() + job.interval_hours * 60 * 60 * 1000)
|
||||
await supabase
|
||||
.from('tracked_searches')
|
||||
.update({
|
||||
last_error: errorMessage,
|
||||
next_run_at: nextRun.toISOString()
|
||||
} as Record<string, unknown>)
|
||||
.eq('id', job.id)
|
||||
}
|
||||
})
|
||||
Reference in New Issue
Block a user