Tau.Acuvim/portal/src/Tau.Acuvim.Portal/Services/FleetPushService.cs
Diseri Pearson a92b4277ae Phase 14: Push + ingest pipeline (end-to-end fleet aggregation)
Customer-stack measurements now flow to the Admin-stack central DB via
HTTPS POST, with firmware buffer-and-replay back-fills handled correctly.

Client side (push)
- monitoring.PowerMeasurements gains ReceivedAt (default NOW()) +
  index. Push selects WHERE ReceivedAt > LastCursor, so back-dated
  rows from offline-buffer replays are picked up automatically.
- app.FleetPushState table holds per-resource cursors + backoff state.
- FleetPushClient: HttpClient wrapper, X-Customer-Token header,
  X-Batch-Type, X-Push-Cursor. 413 returns retry-after halving signal.
- FleetPushService: BackgroundService loop. Per tick: sites (full set),
  devices (full set), measurements (cursor-driven up to 3 batches).
  Exponential backoff per resource on failure (1m → 30m cap).
  Honors 429 Retry-After. Only registered when RunMode=Client AND
  FleetIngest__Enabled=true.

Admin side (ingest)
- /api/fleet/ingest: anonymous, X-Customer-Token authed against
  fleet.Customers via SHA-256 indexed lookup. 401 on bad token; 400
  on bad batch type.
- FleetIngestService dispatches by X-Batch-Type:
  sites/devices → upsert by (CustomerId, Id) with ON CONFLICT UPDATE
  measurements → bulk INSERT ON CONFLICT (Time, CustomerId, DeviceId)
                 DO NOTHING (idempotent under re-delivery).
- Updates fleet.Customers.FirstSeenAt/LastSeenAt on each successful batch.
- Writes fleet.IngestEvents audit row per batch (accepted, rejected,
  bytes, client cursor, time-spread, error).
- FleetTimescaleBootstrapper runs after MigrateAsync in Admin mode:
  CREATE EXTENSION timescaledb, create_hypertable on fleet.PowerMeasurements,
  chunk interval 7 days, compression with segmentby=(CustomerId,DeviceId)
  + compress_orderby "Time" DESC, compression policy 7 days, hourly_per_device
  continuous aggregate (realtime, materialized_only=false, 30-day start_offset
  so back-fills get materialized on next refresh tick).

Wiring
- docker-compose.yml threads Application__RunMode + FleetIngest__* from
  .env (defaults safely off) so a single dev host can run two stacks.
- .env.example documents the new vars under their own section.

Tests
- FleetIngestValidationTests (2 new). 53/53 passing.

Verified end-to-end on the dev host
- Client (portal-dev_portal, RunMode=Client, FleetIngest__Enabled=true)
  pushes to Admin (portal-admin-test, RunMode=Admin, separate admin_fleet DB)
  via container DNS.
- Customer registered on Admin (DEV0001), token captured, dropped into
  Client .env, Client restarted, push service started on schedule.
- Ingested measurements (including a 2026-04-01 back-dated sample
  simulating firmware replay) all land in fleet.PowerMeasurements with
  the correct CustomerId.
- Customer.FirstSeenAt/LastSeenAt update, IngestEvents records every
  batch (sites + devices per tick, measurements when cursor advances).
- Hypertable confirmed via timescaledb_information.hypertables;
  hourly_per_device CA confirmed via timescaledb_information.continuous_aggregates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 10:17:58 +02:00

222 lines
8.9 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Net;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Tau.Acuvim.Portal.Configuration;
using Tau.Acuvim.Portal.Data;
using Tau.Acuvim.Portal.Domain.Fleet;
using Tau.Acuvim.Portal.DTOs;
namespace Tau.Acuvim.Portal.Services;
// Background loop that pushes sites/devices/measurements to the Admin fleet ingest endpoint.
// Registered only when RunMode=Client AND FleetIngest__Enabled=true.
//
// Per tick:
// 1. Sites (full set, idempotent upsert on Admin side)
// 2. Devices (full set)
// 3. Measurements: up to 3 batches × BatchSize rows, cursor by ReceivedAt
// Then sleep IntervalSeconds.
public sealed class FleetPushService(
IServiceProvider services,
IOptions<FleetIngestOptions> options,
ILogger<FleetPushService> log)
: BackgroundService
{
private const int MaxBatchesPerTickPerResource = 3;
private static readonly TimeSpan MaxBackoff = TimeSpan.FromMinutes(30);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = TimeSpan.FromSeconds(Math.Max(5, options.Value.IntervalSeconds));
log.LogInformation("FleetPushService started. Url={Url}, interval={Interval}s",
options.Value.Url, interval.TotalSeconds);
// Brief startup grace so MigrateAsync + IdentityBootstrapper finish before we hit the DB.
try { await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); }
catch (TaskCanceledException) { return; }
while (!stoppingToken.IsCancellationRequested)
{
try
{
await RunTickAsync(stoppingToken);
}
catch (Exception ex) when (ex is not TaskCanceledException)
{
log.LogError(ex, "FleetPushService tick failed");
}
try { await Task.Delay(interval, stoppingToken); }
catch (TaskCanceledException) { break; }
}
}
private async Task RunTickAsync(CancellationToken ct)
{
using var scope = services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var client = scope.ServiceProvider.GetRequiredService<FleetPushClient>();
if (!await ShouldRunNowAsync(db, ct))
{
return;
}
var sitesOk = await PushSitesAsync(db, client, ct);
if (!sitesOk) return;
var devicesOk = await PushDevicesAsync(db, client, ct);
if (!devicesOk) return;
await PushMeasurementsAsync(db, client, ct);
}
// ── Sites: full set every tick ─────────────────────────────────────────
private async Task<bool> PushSitesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceSites, ct);
var rows = await db.Sites.AsNoTracking()
.Select(s => new FleetSiteDto(s.Id, s.Name, s.Address, s.MunicipalityId, s.IsActive))
.ToListAsync(ct);
if (rows.Count == 0)
{
await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct);
return true;
}
var result = await client.PushSitesAsync(rows, DateTime.UtcNow, ct);
return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct);
}
// ── Devices: full set every tick ───────────────────────────────────────
private async Task<bool> PushDevicesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceDevices, ct);
var rows = await db.Devices.AsNoTracking()
.Select(d => new FleetDeviceDto(d.Id, d.SiteId, d.Name, d.ExternalId, d.Description, d.IsActive))
.ToListAsync(ct);
if (rows.Count == 0)
{
await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct);
return true;
}
var result = await client.PushDevicesAsync(rows, DateTime.UtcNow, ct);
return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct);
}
// ── Measurements: cursor-driven batches up to MaxBatchesPerTickPerResource ─
private async Task PushMeasurementsAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceMeasurements, ct);
var cursor = state.LastCursor ?? DateTime.MinValue.ToUniversalTime();
int batchSize = options.Value.BatchSize;
for (int i = 0; i < MaxBatchesPerTickPerResource; i++)
{
var rows = await db.PowerMeasurements.AsNoTracking()
.Where(m => m.ReceivedAt > cursor)
.OrderBy(m => m.ReceivedAt)
.Take(batchSize)
.Select(m => new FleetMeasurementDto(
m.Time, m.DeviceId, m.ActivePowerKw,
m.ReactivePowerKvar, m.ApparentPowerKva, m.PowerFactor,
m.VoltageV, m.FrequencyHz,
m.EnergyImportedKwh, m.EnergyExportedKwh,
m.Source, m.ReceivedAt))
.ToListAsync(ct);
if (rows.Count == 0) break;
var newCursor = rows[^1].ReceivedAt;
var result = await client.PushMeasurementsAsync(rows, newCursor, ct);
if (result.StatusCode == HttpStatusCode.RequestEntityTooLarge)
{
batchSize = Math.Max(100, batchSize / 2);
log.LogWarning("Ingest returned 413; halving batch size to {BatchSize}", batchSize);
continue;
}
var ok = await HandleResultAsync(db, state, result, cursor: newCursor, ct);
if (!ok) return;
cursor = newCursor;
if (rows.Count < batchSize) break;
}
}
// ── State helpers ──────────────────────────────────────────────────────
private static async Task<FleetPushState> GetStateAsync(AppDbContext db, string resourceType, CancellationToken ct)
{
var row = await db.FleetPushState.FirstOrDefaultAsync(x => x.ResourceType == resourceType, ct);
if (row is null)
{
row = new FleetPushState { ResourceType = resourceType };
db.FleetPushState.Add(row);
await db.SaveChangesAsync(ct);
}
return row;
}
private static async Task MarkSuccessAsync(AppDbContext db, FleetPushState state, DateTime cursor, CancellationToken ct)
{
state.LastCursor = cursor;
state.LastSyncedAt = DateTime.UtcNow;
state.LastError = null;
state.ConsecutiveFailures = 0;
await db.SaveChangesAsync(ct);
}
private async Task<bool> HandleResultAsync(
AppDbContext db, FleetPushState state, FleetPushResult result, DateTime cursor, CancellationToken ct)
{
if (result.Succeeded)
{
await MarkSuccessAsync(db, state, cursor, ct);
if (result.Rejected > 0)
{
log.LogWarning("Ingest accepted={Accepted} rejected={Rejected} for {Resource}",
result.Accepted, result.Rejected, state.ResourceType);
}
return true;
}
state.LastError = result.Error;
state.ConsecutiveFailures++;
state.LastSyncedAt = DateTime.UtcNow;
await db.SaveChangesAsync(ct);
var backoff = result.RetryAfter ?? BackoffFor(state.ConsecutiveFailures);
log.LogWarning("Push failed for {Resource}: {Error}. Failures={N}, next attempt after ~{Backoff}.",
state.ResourceType, result.Error, state.ConsecutiveFailures, backoff);
// Stop this tick; the next interval handles the retry naturally.
return false;
}
private static TimeSpan BackoffFor(int failures)
{
var baseMinutes = Math.Min(Math.Pow(2, Math.Max(0, failures - 1)), 30);
return TimeSpan.FromMinutes(baseMinutes);
}
// Block when we're in a long backoff window — only attempt if enough time has passed
// since LastSyncedAt to cover the backoff. Cheap: read one row.
private async Task<bool> ShouldRunNowAsync(AppDbContext db, CancellationToken ct)
{
var maxFailures = await db.FleetPushState.AsNoTracking()
.Select(x => new { x.LastSyncedAt, x.ConsecutiveFailures })
.ToListAsync(ct);
if (maxFailures.Count == 0) return true;
foreach (var row in maxFailures)
{
if (row.ConsecutiveFailures == 0) return true;
if (row.LastSyncedAt is null) return true;
var since = DateTime.UtcNow - row.LastSyncedAt.Value;
var backoff = BackoffFor(row.ConsecutiveFailures);
if (since >= backoff) return true;
}
return false;
}
}