Files
gob-alert/apps/api/src/ingest/ingest.service.ts
alexandrump 82f3464565 first commit
2026-02-09 01:02:53 +01:00

106 lines
4.2 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import axios from 'axios';
import { CatalogService } from '../catalog/catalog.service';
import { CreateCatalogItemDto } from '../catalog/dto/create-catalog-item.dto';
import { NormalizerService } from '../normalizer/normalizer.service';
import { ClassifierService } from '../classifier/classifier.service';
import { IngestRunEntity } from '../entities/ingest-run.entity';
@Injectable()
export class IngestService {
private readonly logger = new Logger(IngestService.name);
constructor(
private readonly catalogService: CatalogService,
private readonly normalizer: NormalizerService,
private readonly classifier: ClassifierService,
@InjectRepository(IngestRunEntity)
private readonly runs: Repository<IngestRunEntity>,
) {}
async runOnce(): Promise<{ runId: string; imported: number; updated: number; errors: string[] }> {
const apiBase = process.env.DATOS_API_BASE || 'https://datos.gob.es/apidata/3/action/package_search';
this.logger.log(`Running ingest against ${apiBase}`);
const runStart = Date.now();
const run = await this.runs.save(this.runs.create({ status: 'running' }));
try {
const res = await axios.get(apiBase, { params: { q: '', rows: 20 } });
const data = res.data;
let rows: any[] = [];
if (Array.isArray(data)) rows = data;
else if (data?.result?.results) rows = data.result.results;
else if (data?.result && Array.isArray(data.result)) rows = data.result;
else if (data?.items) rows = data.items;
let imported = 0;
let updated = 0;
const errors: string[] = [];
for (const r of rows) {
try {
const dto: CreateCatalogItemDto = {
title: r.title || r.name || r.id || 'sin-titulo',
description: r.notes || r.description || r.title || '',
sourceUrl: undefined,
format: undefined,
publisher: (r.organization && (r.organization.title || r.organization.name)) || r.owner_org || r.author || undefined,
updatedAt: r.metadata_modified || r.last_modified || r.updated || undefined,
};
if (Array.isArray(r.resources) && r.resources.length > 0) {
dto.sourceUrl = r.resources[0].url || r.resources[0].access_url || dto.sourceUrl;
const formats = Array.from(new Set(
r.resources
.map((x: any) => (x.format || x.format_description || '').toString().toUpperCase())
.filter(Boolean)
));
if (formats.length) dto.format = formats.join(',');
}
if (!dto.sourceUrl && (r.url || r.link)) dto.sourceUrl = r.url || r.link;
if (!dto.sourceUrl && r.id) dto.sourceUrl = `urn:dataset:${r.id}`;
const normalized = await this.normalizer.normalize(dto);
const result = await this.catalogService.upsertWithVersion(normalized);
if (result.created) imported++;
if (result.updated) updated++;
if (result.item) {
const classification = await this.classifier.classify(result.item);
await this.classifier.applyTags(result.item, classification);
}
} catch (err: any) {
this.logger.error('Error ingesting row', err?.message || err);
errors.push(err?.message || String(err));
}
}
run.status = errors.length ? 'partial' : 'success';
run.imported = imported;
run.updated = updated;
run.errorCount = errors.length;
run.errors = errors.slice(0, 50);
run.finishedAt = new Date();
run.durationMs = Date.now() - runStart;
await this.runs.save(run);
return { runId: run.id, imported, updated, errors };
} catch (err: any) {
this.logger.error('Ingest request failed', err?.message || err);
const errors = [err?.message || String(err)];
run.status = 'failed';
run.imported = 0;
run.updated = 0;
run.errorCount = errors.length;
run.errors = errors;
run.finishedAt = new Date();
run.durationMs = Date.now() - runStart;
await this.runs.save(run);
return { runId: run.id, imported: 0, updated: 0, errors };
}
}
}