Phase 14: Push + ingest pipeline (end-to-end fleet aggregation)

Customer-stack measurements now flow to the Admin-stack central DB via
HTTPS POST, with firmware buffer-and-replay back-fills handled correctly.

Client side (push)
- monitoring.PowerMeasurements gains ReceivedAt (default NOW()) +
  index. Push selects WHERE ReceivedAt > LastCursor, so back-dated
  rows from offline-buffer replays are picked up automatically.
- app.FleetPushState table holds per-resource cursors + backoff state.
- FleetPushClient: HttpClient wrapper, X-Customer-Token header,
  X-Batch-Type, X-Push-Cursor. 413 returns retry-after halving signal.
- FleetPushService: BackgroundService loop. Per tick: sites (full set),
  devices (full set), measurements (cursor-driven up to 3 batches).
  Exponential backoff per resource on failure (1m → 30m cap).
  Honors 429 Retry-After. Only registered when RunMode=Client AND
  FleetIngest__Enabled=true.

Admin side (ingest)
- /api/fleet/ingest: anonymous, X-Customer-Token authed against
  fleet.Customers via SHA-256 indexed lookup. 401 on bad token; 400
  on bad batch type.
- FleetIngestService dispatches by X-Batch-Type:
  sites/devices → upsert by (CustomerId, Id) with ON CONFLICT UPDATE
  measurements → bulk INSERT ON CONFLICT (Time, CustomerId, DeviceId)
                 DO NOTHING (idempotent under re-delivery).
- Updates fleet.Customers.FirstSeenAt/LastSeenAt on each successful batch.
- Writes fleet.IngestEvents audit row per batch (accepted, rejected,
  bytes, client cursor, time-spread, error).
- FleetTimescaleBootstrapper runs after MigrateAsync in Admin mode:
  CREATE EXTENSION timescaledb, create_hypertable on fleet.PowerMeasurements,
  chunk interval 7 days, compression with segmentby=(CustomerId,DeviceId)
  + compress_orderby "Time" DESC, compression policy 7 days, hourly_per_device
  continuous aggregate (realtime, materialized_only=false, 30-day start_offset
  so back-fills get materialized on next refresh tick).

Wiring
- docker-compose.yml threads Application__RunMode + FleetIngest__* from
  .env (defaults safely off) so a single dev host can run two stacks.
- .env.example documents the new vars under their own section.

Tests
- FleetIngestValidationTests (2 new). 53/53 passing.

Verified end-to-end on the dev host
- Client (portal-dev_portal, RunMode=Client, FleetIngest__Enabled=true)
  pushes to Admin (portal-admin-test, RunMode=Admin, separate admin_fleet DB)
  via container DNS.
- Customer registered on Admin (DEV0001), token captured, dropped into
  Client .env, Client restarted, push service started on schedule.
- Ingested measurements (including a 2026-04-01 back-dated sample
  simulating firmware replay) all land in fleet.PowerMeasurements with
  the correct CustomerId.
- Customer.FirstSeenAt/LastSeenAt update, IngestEvents records every
  batch (sites + devices per tick, measurements when cursor advances).
- Hypertable confirmed via timescaledb_information.hypertables;
  hourly_per_device CA confirmed via timescaledb_information.continuous_aggregates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Diseri Pearson 2026-05-18 10:17:58 +02:00
parent 2c618b776b
commit a92b4277ae
17 changed files with 1598 additions and 3 deletions

View File

@ -26,3 +26,17 @@ Authentication__DefaultAdminPassword=ChangeMe123!
GRAFANA_ADMIN_PASSWORD=admin
# Path prefix Grafana is mounted at behind Traefik. Same-origin embed in the SPA.
Grafana__EmbedPathPrefix=/grafana
# ── RunMode (Client | Admin) ─────────────────────────────────────────────
# Client (default) = per-customer stack. Admin = fleet aggregator.
# Application__RunMode=Client
# ── Fleet ingest (Client mode → push to Admin) ───────────────────────────
# When Enabled=true, URL and Token are REQUIRED (RunModeGuards refuse otherwise).
# Get the token from the Admin Customers page; it is shown once at creation/rotation.
# FleetIngest__Enabled=false
# FleetIngest__Url=https://admin.portal.example.com/api/fleet/ingest
# FleetIngest__Token=
# FleetIngest__IntervalSeconds=60
# FleetIngest__BatchSize=5000
# FleetIngest__BatchMaxBytes=1048576

View File

@ -386,7 +386,7 @@ For per-customer provisioning, secret rotation, backups, and health monitoring s
A second deployment of the same image — `RunMode=Admin`, separate DB — aggregates data from all customer stacks for a fleet-wide operator view. See [docs/FLEET-DESIGN.md](./docs/FLEET-DESIGN.md) for the full design.
**Phase 13 (this release):** the Admin stack runs, the Customers page registers customers and issues push tokens (shown once). Customer stacks pick up `RunMode=Client` + `FleetIngest__*` config but don't yet push — that lands in Phase 14.
**Phase 14 (this release):** the full push pipeline is live. Customer stacks with `FleetIngest__Enabled=true` run a `FleetPushService` background loop that batches sites, devices, and measurements (cursor by `ReceivedAt` — firmware buffer-and-replay back-fills get picked up automatically) and POSTs them to `FleetIngest__Url` with `X-Customer-Token`. Admin's `/api/fleet/ingest` upserts and writes an `IngestEvents` audit row per batch. Admin's `FleetTimescaleBootstrapper` makes `fleet.PowerMeasurements` a hypertable with compression-after-7-days and a realtime `fleet.hourly_per_device` continuous aggregate.
**Spin up an Admin stack:**
```powershell
@ -400,4 +400,19 @@ docker run -d --name admin-portal --network <existing-network> `
portal-dev-portal
```
Then sign in at `http://localhost:8090`**Customers** → register the customer → token shown once. Drop the token into the customer stack's `.env` as `FleetIngest__Token` (Phase 14 uses it).
Then sign in at `http://localhost:8090`**Customers** → register the customer → token shown once.
**Enable push on the customer stack:** add to that customer's `.env`:
```ini
Application__RunMode=Client
FleetIngest__Enabled=true
FleetIngest__Url=http://admin-portal:8080/api/fleet/ingest # container DNS in-network, or https://admin-host
FleetIngest__Token=<token from Customers page>
FleetIngest__IntervalSeconds=60 # default
FleetIngest__BatchSize=5000 # default
```
Restart the customer's portal container — the `FleetPushService` starts on its own. Verify the audit trail on the Admin side:
```powershell
docker exec <admin-timescale> psql -U power_user -d admin_fleet -c `
'SELECT \"BatchType\",\"RowsAccepted\",\"ReceivedAt\" FROM fleet.\"IngestEvents\" ORDER BY \"ReceivedAt\" DESC LIMIT 10;'
```

View File

@ -17,6 +17,14 @@ services:
- Authentication__DefaultAdminPassword=${Authentication__DefaultAdminPassword:-ChangeMe123!}
- Grafana__BaseUrl=http://localhost:3001
- Grafana__InternalUrl=http://grafana:3000
# RunMode: Client (default) or Admin. Override in .env to test fleet aggregation locally.
- Application__RunMode=${Application__RunMode:-Client}
# Fleet ingest (Client mode): set Enabled=true + Url + Token to enable the push background service.
- FleetIngest__Enabled=${FleetIngest__Enabled:-false}
- FleetIngest__Url=${FleetIngest__Url:-}
- FleetIngest__Token=${FleetIngest__Token:-}
- FleetIngest__IntervalSeconds=${FleetIngest__IntervalSeconds:-60}
- FleetIngest__BatchSize=${FleetIngest__BatchSize:-5000}
depends_on:
timescaledb:
condition: service_healthy

View File

@ -0,0 +1,35 @@
namespace Tau.Acuvim.Portal.DTOs;
// Wire types for the /api/fleet/ingest endpoint.
// Same shapes used on both push (Client) and ingest (Admin) sides.
public sealed record FleetSiteDto(
Guid Id,
string Name,
string? Address,
int? LocalMunicipalityId,
bool IsActive);
public sealed record FleetDeviceDto(
Guid Id,
Guid SiteId,
string Name,
string ExternalId,
string? Description,
bool IsActive);
public sealed record FleetMeasurementDto(
DateTime Time,
Guid DeviceId,
double ActivePowerKw,
double? ReactivePowerKvar,
double? ApparentPowerKva,
double? PowerFactor,
double? VoltageV,
double? FrequencyHz,
double? EnergyImportedKwh,
double? EnergyExportedKwh,
string? Source,
DateTime ReceivedAt);
public sealed record FleetIngestResult(int Accepted, int Rejected, string[] Errors);

View File

@ -2,6 +2,7 @@ using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.Identity.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Tau.Acuvim.Portal.Domain.Branding;
using Tau.Acuvim.Portal.Domain.Fleet;
using Tau.Acuvim.Portal.Domain.Identity;
using Tau.Acuvim.Portal.Domain.Monitoring;
using Tau.Acuvim.Portal.Domain.Rates;
@ -21,6 +22,7 @@ public class AppDbContext : IdentityDbContext<ApplicationUser, IdentityRole, str
public DbSet<Site> Sites => Set<Site>();
public DbSet<Device> Devices => Set<Device>();
public DbSet<PowerMeasurement> PowerMeasurements => Set<PowerMeasurement>();
public DbSet<FleetPushState> FleetPushState => Set<FleetPushState>();
protected override void OnModelCreating(ModelBuilder builder)
{
@ -84,10 +86,18 @@ public class AppDbContext : IdentityDbContext<ApplicationUser, IdentityRole, str
entity.ToTable("PowerMeasurements", schema: "monitoring");
entity.HasKey(x => new { x.Time, x.DeviceId });
entity.HasIndex(x => new { x.DeviceId, x.Time }).IsDescending(false, true);
entity.Property(x => x.ReceivedAt).HasDefaultValueSql("NOW()");
entity.HasIndex(x => x.ReceivedAt);
entity.HasOne(x => x.Device)
.WithMany()
.HasForeignKey(x => x.DeviceId)
.OnDelete(DeleteBehavior.Cascade);
});
builder.Entity<FleetPushState>(entity =>
{
entity.ToTable("FleetPushState", schema: "app");
entity.HasKey(x => x.ResourceType);
});
}
}

View File

@ -0,0 +1,22 @@
using System.ComponentModel.DataAnnotations;
namespace Tau.Acuvim.Portal.Domain.Fleet;
// Client-side state for the fleet push pipeline. One row per resource type.
public class FleetPushState
{
public const string ResourceSites = "sites";
public const string ResourceDevices = "devices";
public const string ResourceMeasurements = "measurements";
[MaxLength(20)]
public string ResourceType { get; set; } = string.Empty;
public DateTime? LastCursor { get; set; }
public DateTime? LastSyncedAt { get; set; }
[MaxLength(500)]
public string? LastError { get; set; }
public int ConsecutiveFailures { get; set; }
}

View File

@ -21,4 +21,9 @@ public class PowerMeasurement
[MaxLength(50)]
public string? Source { get; set; }
// Server-assigned timestamp on insert. Drives the fleet-push cursor — pushes find
// rows with ReceivedAt > LastCursor, which picks up firmware buffer-and-replay
// back-fills correctly (their Time is in the past but ReceivedAt is now).
public DateTime ReceivedAt { get; set; }
}

View File

@ -0,0 +1,35 @@
using Tau.Acuvim.Portal.Services;
namespace Tau.Acuvim.Portal.Endpoints;
public static class FleetIngestEndpoints
{
public static IEndpointRouteBuilder MapFleetIngestEndpoints(this IEndpointRouteBuilder app)
{
app.MapPost("/api/fleet/ingest", async (
HttpContext ctx,
FleetIngestService svc,
CancellationToken ct) =>
{
var token = ctx.Request.Headers["X-Customer-Token"].ToString();
var batchType = ctx.Request.Headers["X-Batch-Type"].ToString();
var cursor = ctx.Request.Headers["X-Push-Cursor"].ToString();
using var reader = new StreamReader(ctx.Request.Body);
var body = await reader.ReadToEndAsync(ct);
try
{
var result = await svc.IngestAsync(token, batchType, cursor, body, ct);
return Results.Ok(result);
}
catch (UnauthorizedFleetIngestException) { return Results.Unauthorized(); }
catch (InvalidFleetBatchException ex) { return Results.BadRequest(new { error = ex.Message }); }
})
.AllowAnonymous()
.DisableAntiforgery()
.WithTags("Fleet");
return app;
}
}

View File

@ -0,0 +1,662 @@
// <auto-generated />
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
using Tau.Acuvim.Portal.Data;
#nullable disable
namespace Tau.Acuvim.Portal.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20260518081228_AddReceivedAtAndPushState")]
partial class AddReceivedAtAndPushState
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasDefaultSchema("app")
.HasAnnotation("ProductVersion", "10.0.8")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRole", b =>
{
b.Property<string>("Id")
.HasColumnType("text");
b.Property<string>("ConcurrencyStamp")
.IsConcurrencyToken()
.HasColumnType("text");
b.Property<string>("Name")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<string>("NormalizedName")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.HasKey("Id");
b.HasIndex("NormalizedName")
.IsUnique()
.HasDatabaseName("RoleNameIndex");
b.ToTable("AspNetRoles", "identity");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRoleClaim<string>", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<string>("ClaimType")
.HasColumnType("text");
b.Property<string>("ClaimValue")
.HasColumnType("text");
b.Property<string>("RoleId")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("RoleId");
b.ToTable("AspNetRoleClaims", "identity");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserClaim<string>", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<string>("ClaimType")
.HasColumnType("text");
b.Property<string>("ClaimValue")
.HasColumnType("text");
b.Property<string>("UserId")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("UserId");
b.ToTable("AspNetUserClaims", "identity");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserLogin<string>", b =>
{
b.Property<string>("LoginProvider")
.HasColumnType("text");
b.Property<string>("ProviderKey")
.HasColumnType("text");
b.Property<string>("ProviderDisplayName")
.HasColumnType("text");
b.Property<string>("UserId")
.IsRequired()
.HasColumnType("text");
b.HasKey("LoginProvider", "ProviderKey");
b.HasIndex("UserId");
b.ToTable("AspNetUserLogins", "identity");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserRole<string>", b =>
{
b.Property<string>("UserId")
.HasColumnType("text");
b.Property<string>("RoleId")
.HasColumnType("text");
b.HasKey("UserId", "RoleId");
b.HasIndex("RoleId");
b.ToTable("AspNetUserRoles", "identity");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserToken<string>", b =>
{
b.Property<string>("UserId")
.HasColumnType("text");
b.Property<string>("LoginProvider")
.HasColumnType("text");
b.Property<string>("Name")
.HasColumnType("text");
b.Property<string>("Value")
.HasColumnType("text");
b.HasKey("UserId", "LoginProvider", "Name");
b.ToTable("AspNetUserTokens", "identity");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Branding.WhiteLabelSettings", b =>
{
b.Property<int>("Id")
.HasColumnType("integer");
b.Property<string>("AccentColor")
.IsRequired()
.HasMaxLength(20)
.HasColumnType("character varying(20)");
b.Property<string>("ApplicationName")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("FooterText")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<string>("LogoUrl")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<string>("PrimaryColor")
.IsRequired()
.HasMaxLength(20)
.HasColumnType("character varying(20)");
b.Property<string>("SecondaryColor")
.IsRequired()
.HasMaxLength(20)
.HasColumnType("character varying(20)");
b.Property<DateTime>("UpdatedAt")
.HasColumnType("timestamp with time zone");
b.HasKey("Id");
b.ToTable("WhiteLabelSettings", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Fleet.FleetPushState", b =>
{
b.Property<string>("ResourceType")
.HasMaxLength(20)
.HasColumnType("character varying(20)");
b.Property<int>("ConsecutiveFailures")
.HasColumnType("integer");
b.Property<DateTime?>("LastCursor")
.HasColumnType("timestamp with time zone");
b.Property<string>("LastError")
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<DateTime?>("LastSyncedAt")
.HasColumnType("timestamp with time zone");
b.HasKey("ResourceType");
b.ToTable("FleetPushState", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", b =>
{
b.Property<string>("Id")
.HasColumnType("text");
b.Property<int>("AccessFailedCount")
.HasColumnType("integer");
b.Property<string>("ConcurrencyStamp")
.IsConcurrencyToken()
.HasColumnType("text");
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("DisplayName")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("Email")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<bool>("EmailConfirmed")
.HasColumnType("boolean");
b.Property<bool>("IsActive")
.HasColumnType("boolean");
b.Property<bool>("LockoutEnabled")
.HasColumnType("boolean");
b.Property<DateTimeOffset?>("LockoutEnd")
.HasColumnType("timestamp with time zone");
b.Property<string>("NormalizedEmail")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<string>("NormalizedUserName")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.Property<string>("PasswordHash")
.HasColumnType("text");
b.Property<string>("PhoneNumber")
.HasColumnType("text");
b.Property<bool>("PhoneNumberConfirmed")
.HasColumnType("boolean");
b.Property<string>("SecurityStamp")
.HasColumnType("text");
b.Property<bool>("TwoFactorEnabled")
.HasColumnType("boolean");
b.Property<string>("UserName")
.HasMaxLength(256)
.HasColumnType("character varying(256)");
b.HasKey("Id");
b.HasIndex("NormalizedEmail")
.HasDatabaseName("EmailIndex");
b.HasIndex("NormalizedUserName")
.IsUnique()
.HasDatabaseName("UserNameIndex");
b.ToTable("AspNetUsers", "identity");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Device", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("Description")
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<string>("ExternalId")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<bool>("IsActive")
.HasColumnType("boolean");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<Guid>("SiteId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("ExternalId");
b.HasIndex("SiteId", "ExternalId")
.IsUnique();
b.ToTable("Devices", "monitoring");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.PowerMeasurement", b =>
{
b.Property<DateTime>("Time")
.HasColumnType("timestamp with time zone");
b.Property<Guid>("DeviceId")
.HasColumnType("uuid");
b.Property<double>("ActivePowerKw")
.HasColumnType("double precision");
b.Property<double?>("ApparentPowerKva")
.HasColumnType("double precision");
b.Property<double?>("EnergyExportedKwh")
.HasColumnType("double precision");
b.Property<double?>("EnergyImportedKwh")
.HasColumnType("double precision");
b.Property<double?>("FrequencyHz")
.HasColumnType("double precision");
b.Property<double?>("PowerFactor")
.HasColumnType("double precision");
b.Property<double?>("ReactivePowerKvar")
.HasColumnType("double precision");
b.Property<DateTime>("ReceivedAt")
.ValueGeneratedOnAdd()
.HasColumnType("timestamp with time zone")
.HasDefaultValueSql("NOW()");
b.Property<string>("Source")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<double?>("VoltageV")
.HasColumnType("double precision");
b.HasKey("Time", "DeviceId");
b.HasIndex("ReceivedAt");
b.HasIndex("DeviceId", "Time")
.IsDescending(false, true);
b.ToTable("PowerMeasurements", "monitoring");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("Address")
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsActive")
.HasColumnType("boolean");
b.Property<int?>("MunicipalityId")
.HasColumnType("integer");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.HasKey("Id");
b.HasIndex("MunicipalityId");
b.HasIndex("Name");
b.ToTable("Sites", "monitoring");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Municipality", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsActive")
.HasColumnType("boolean");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("TimeZoneId")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.HasKey("Id");
b.HasIndex("Name")
.IsUnique();
b.ToTable("Municipalities", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<decimal>("DefaultRatePerKwh")
.HasPrecision(10, 4)
.HasColumnType("numeric(10,4)");
b.Property<DateOnly>("EffectiveFrom")
.HasColumnType("date");
b.Property<DateOnly?>("EffectiveTo")
.HasColumnType("date");
b.Property<decimal>("FixedMonthlyCharge")
.HasPrecision(10, 2)
.HasColumnType("numeric(10,2)");
b.Property<bool>("IsActive")
.HasColumnType("boolean");
b.Property<int>("MunicipalityId")
.HasColumnType("integer");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<decimal>("VatPercentage")
.HasPrecision(5, 2)
.HasColumnType("numeric(5,2)");
b.HasKey("Id");
b.HasIndex("MunicipalityId", "EffectiveFrom");
b.ToTable("Tariffs", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.TariffPeriod", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<int>("DaysOfWeek")
.HasColumnType("integer");
b.Property<TimeOnly>("EndTime")
.HasColumnType("time without time zone");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<decimal>("RatePerKwh")
.HasPrecision(10, 4)
.HasColumnType("numeric(10,4)");
b.Property<TimeOnly>("StartTime")
.HasColumnType("time without time zone");
b.Property<int>("TariffId")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("TariffId");
b.ToTable("TariffPeriods", "app");
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRoleClaim<string>", b =>
{
b.HasOne("Microsoft.AspNetCore.Identity.IdentityRole", null)
.WithMany()
.HasForeignKey("RoleId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserClaim<string>", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null)
.WithMany()
.HasForeignKey("UserId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserLogin<string>", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null)
.WithMany()
.HasForeignKey("UserId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserRole<string>", b =>
{
b.HasOne("Microsoft.AspNetCore.Identity.IdentityRole", null)
.WithMany()
.HasForeignKey("RoleId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null)
.WithMany()
.HasForeignKey("UserId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserToken<string>", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null)
.WithMany()
.HasForeignKey("UserId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Device", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Monitoring.Site", "Site")
.WithMany("Devices")
.HasForeignKey("SiteId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Site");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.PowerMeasurement", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Monitoring.Device", "Device")
.WithMany()
.HasForeignKey("DeviceId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Device");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Municipality", "Municipality")
.WithMany()
.HasForeignKey("MunicipalityId")
.OnDelete(DeleteBehavior.SetNull);
b.Navigation("Municipality");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Municipality", "Municipality")
.WithMany("Tariffs")
.HasForeignKey("MunicipalityId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Municipality");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.TariffPeriod", b =>
{
b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Tariff", "Tariff")
.WithMany("Periods")
.HasForeignKey("TariffId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Tariff");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b =>
{
b.Navigation("Devices");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Municipality", b =>
{
b.Navigation("Tariffs");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b =>
{
b.Navigation("Periods");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,63 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Tau.Acuvim.Portal.Migrations
{
/// <inheritdoc />
public partial class AddReceivedAtAndPushState : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<DateTime>(
name: "ReceivedAt",
schema: "monitoring",
table: "PowerMeasurements",
type: "timestamp with time zone",
nullable: false,
defaultValueSql: "NOW()");
migrationBuilder.CreateTable(
name: "FleetPushState",
schema: "app",
columns: table => new
{
ResourceType = table.Column<string>(type: "character varying(20)", maxLength: 20, nullable: false),
LastCursor = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
LastSyncedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
LastError = table.Column<string>(type: "character varying(500)", maxLength: 500, nullable: true),
ConsecutiveFailures = table.Column<int>(type: "integer", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_FleetPushState", x => x.ResourceType);
});
migrationBuilder.CreateIndex(
name: "IX_PowerMeasurements_ReceivedAt",
schema: "monitoring",
table: "PowerMeasurements",
column: "ReceivedAt");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "FleetPushState",
schema: "app");
migrationBuilder.DropIndex(
name: "IX_PowerMeasurements_ReceivedAt",
schema: "monitoring",
table: "PowerMeasurements");
migrationBuilder.DropColumn(
name: "ReceivedAt",
schema: "monitoring",
table: "PowerMeasurements");
}
}
}

View File

@ -198,6 +198,30 @@ namespace Tau.Acuvim.Portal.Migrations
b.ToTable("WhiteLabelSettings", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Fleet.FleetPushState", b =>
{
b.Property<string>("ResourceType")
.HasMaxLength(20)
.HasColumnType("character varying(20)");
b.Property<int>("ConsecutiveFailures")
.HasColumnType("integer");
b.Property<DateTime?>("LastCursor")
.HasColumnType("timestamp with time zone");
b.Property<string>("LastError")
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<DateTime?>("LastSyncedAt")
.HasColumnType("timestamp with time zone");
b.HasKey("ResourceType");
b.ToTable("FleetPushState", "app");
});
modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", b =>
{
b.Property<string>("Id")
@ -341,6 +365,11 @@ namespace Tau.Acuvim.Portal.Migrations
b.Property<double?>("ReactivePowerKvar")
.HasColumnType("double precision");
b.Property<DateTime>("ReceivedAt")
.ValueGeneratedOnAdd()
.HasColumnType("timestamp with time zone")
.HasDefaultValueSql("NOW()");
b.Property<string>("Source")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
@ -350,6 +379,8 @@ namespace Tau.Acuvim.Portal.Migrations
b.HasKey("Time", "DeviceId");
b.HasIndex("ReceivedAt");
b.HasIndex("DeviceId", "Time")
.IsDescending(false, true);

View File

@ -133,10 +133,21 @@ try
builder.Services.AddScoped<TimescaleBootstrapper>();
builder.Services.AddScoped<MeasurementIngestService>();
builder.Services.AddScoped<MeasurementQueryService>();
if (fleetIngestOptions.Enabled)
{
builder.Services.AddHttpClient<FleetPushClient>(c =>
{
c.Timeout = TimeSpan.FromSeconds(30);
});
builder.Services.AddHostedService<FleetPushService>();
}
}
else
{
builder.Services.AddScoped<CustomerService>();
builder.Services.AddScoped<FleetIngestService>();
builder.Services.AddScoped<FleetTimescaleBootstrapper>();
}
builder.Services.AddHealthChecks()
@ -188,7 +199,9 @@ try
{
var db = scope.ServiceProvider.GetRequiredService<AdminDbContext>();
await db.Database.MigrateAsync();
// Admin-side hypertable + continuous aggregates land in Phase 14.
var fleetTimescale = scope.ServiceProvider.GetRequiredService<FleetTimescaleBootstrapper>();
await fleetTimescale.EnsureAsync();
}
var bootstrapper = scope.ServiceProvider.GetRequiredService<IdentityBootstrapper>();
@ -250,6 +263,7 @@ try
else
{
app.MapAdminCustomersEndpoints();
app.MapFleetIngestEndpoints();
}
app.MapHealthChecks("/health", new Microsoft.AspNetCore.Diagnostics.HealthChecks.HealthCheckOptions

View File

@ -0,0 +1,241 @@
using System.Text;
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using Tau.Acuvim.Portal.Data;
using Tau.Acuvim.Portal.Domain.Fleet;
using Tau.Acuvim.Portal.DTOs;
namespace Tau.Acuvim.Portal.Services;
public enum FleetBatchType { Sites, Devices, Measurements }
public sealed class FleetIngestService(
AdminDbContext db,
CustomerService customers,
ILogger<FleetIngestService> log)
{
private static readonly JsonSerializerOptions JsonOpts = new(JsonSerializerDefaults.Web);
public async Task<FleetIngestResult> IngestAsync(
string? token,
string? batchTypeHeader,
string? cursorHeader,
string rawBody,
CancellationToken ct)
{
var bodyBytes = Encoding.UTF8.GetByteCount(rawBody);
// 1. Token → customer
var customer = await customers.FindByTokenAsync(token ?? string.Empty, ct);
if (customer is null)
{
await WriteEventAsync(customerId: Guid.Empty, batchTypeHeader, accepted: 0, rejected: 0,
bodyBytes, cursorHeader, error: "Unauthorized", spread: null, ct);
throw new UnauthorizedFleetIngestException();
}
// 2. Batch type
if (!TryParseBatchType(batchTypeHeader, out var batchType))
{
await WriteEventAsync(customer.Id, batchTypeHeader, accepted: 0, rejected: 0,
bodyBytes, cursorHeader, error: "Unknown X-Batch-Type", spread: null, ct);
throw new InvalidFleetBatchException($"Unknown X-Batch-Type: '{batchTypeHeader}'");
}
// 3. Dispatch
FleetIngestResult result;
TimeSpan? spread = null;
try
{
(result, spread) = batchType switch
{
FleetBatchType.Sites => (await IngestSitesAsync(customer.Id, rawBody, ct), null),
FleetBatchType.Devices => (await IngestDevicesAsync(customer.Id, rawBody, ct), null),
FleetBatchType.Measurements => await IngestMeasurementsAsync(customer.Id, rawBody, ct),
_ => throw new InvalidOperationException()
};
}
catch (Exception ex)
{
log.LogError(ex, "Fleet ingest failure for {Customer}", customer.Code);
await WriteEventAsync(customer.Id, batchTypeHeader, accepted: 0, rejected: 0,
bodyBytes, cursorHeader, error: Truncate(ex.Message, 500), spread: null, ct);
throw;
}
// 4. Touch customer FirstSeen/LastSeen
var nowUtc = DateTime.UtcNow;
var c = await db.Customers.FirstAsync(x => x.Id == customer.Id, ct);
c.FirstSeenAt ??= nowUtc;
c.LastSeenAt = nowUtc;
await db.SaveChangesAsync(ct);
// 5. Audit event
await WriteEventAsync(customer.Id, batchTypeHeader, result.Accepted, result.Rejected,
bodyBytes, cursorHeader, error: null, spread, ct);
return result;
}
private static bool TryParseBatchType(string? header, out FleetBatchType type)
{
type = default;
if (string.IsNullOrWhiteSpace(header)) return false;
return header.ToLowerInvariant() switch
{
"sites" => (type = FleetBatchType.Sites) is var _ && true,
"devices" => (type = FleetBatchType.Devices) is var _ && true,
"measurements" => (type = FleetBatchType.Measurements) is var _ && true,
_ => false,
};
}
// ── Sites: upsert by (CustomerId, Id) ──────────────────────────────────
private async Task<FleetIngestResult> IngestSitesAsync(Guid customerId, string body, CancellationToken ct)
{
var rows = JsonSerializer.Deserialize<FleetSiteDto[]>(body, JsonOpts) ?? Array.Empty<FleetSiteDto>();
if (rows.Length == 0) return new FleetIngestResult(0, 0, Array.Empty<string>());
var sql = new StringBuilder(
"""
INSERT INTO fleet."Sites" ("CustomerId","Id","Name","Address","LocalMunicipalityId","IsActive","ReceivedAt") VALUES
""");
var ps = new List<NpgsqlParameter>(rows.Length * 6);
for (int i = 0; i < rows.Length; i++)
{
if (i > 0) sql.Append(',');
var b = i * 6;
sql.Append($" (@cust,@p{b},@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5})");
ps.Add(new($"p{b}", rows[i].Id));
ps.Add(new($"p{b + 1}", rows[i].Name));
ps.Add(new($"p{b + 2}", (object?)rows[i].Address ?? DBNull.Value));
ps.Add(new($"p{b + 3}", (object?)rows[i].LocalMunicipalityId ?? DBNull.Value));
ps.Add(new($"p{b + 4}", rows[i].IsActive));
ps.Add(new($"p{b + 5}", DateTime.UtcNow));
}
ps.Add(new("cust", customerId));
sql.Append("""
ON CONFLICT ("CustomerId","Id") DO UPDATE SET
"Name"=EXCLUDED."Name",
"Address"=EXCLUDED."Address",
"LocalMunicipalityId"=EXCLUDED."LocalMunicipalityId",
"IsActive"=EXCLUDED."IsActive",
"ReceivedAt"=EXCLUDED."ReceivedAt";
""");
await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct);
return new FleetIngestResult(rows.Length, 0, Array.Empty<string>());
}
// ── Devices: upsert by (CustomerId, Id) ────────────────────────────────
private async Task<FleetIngestResult> IngestDevicesAsync(Guid customerId, string body, CancellationToken ct)
{
var rows = JsonSerializer.Deserialize<FleetDeviceDto[]>(body, JsonOpts) ?? Array.Empty<FleetDeviceDto>();
if (rows.Length == 0) return new FleetIngestResult(0, 0, Array.Empty<string>());
var sql = new StringBuilder(
"""
INSERT INTO fleet."Devices" ("CustomerId","Id","SiteId","Name","ExternalId","Description","IsActive","ReceivedAt") VALUES
""");
var ps = new List<NpgsqlParameter>(rows.Length * 7);
for (int i = 0; i < rows.Length; i++)
{
if (i > 0) sql.Append(',');
var b = i * 7;
sql.Append($" (@cust,@p{b},@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5},@p{b + 6})");
ps.Add(new($"p{b}", rows[i].Id));
ps.Add(new($"p{b + 1}", rows[i].SiteId));
ps.Add(new($"p{b + 2}", rows[i].Name));
ps.Add(new($"p{b + 3}", rows[i].ExternalId));
ps.Add(new($"p{b + 4}", (object?)rows[i].Description ?? DBNull.Value));
ps.Add(new($"p{b + 5}", rows[i].IsActive));
ps.Add(new($"p{b + 6}", DateTime.UtcNow));
}
ps.Add(new("cust", customerId));
sql.Append("""
ON CONFLICT ("CustomerId","Id") DO UPDATE SET
"SiteId"=EXCLUDED."SiteId",
"Name"=EXCLUDED."Name",
"ExternalId"=EXCLUDED."ExternalId",
"Description"=EXCLUDED."Description",
"IsActive"=EXCLUDED."IsActive",
"ReceivedAt"=EXCLUDED."ReceivedAt";
""");
await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct);
return new FleetIngestResult(rows.Length, 0, Array.Empty<string>());
}
// ── Measurements: ON CONFLICT DO NOTHING (time-series) ─────────────────
private async Task<(FleetIngestResult, TimeSpan?)> IngestMeasurementsAsync(Guid customerId, string body, CancellationToken ct)
{
var rows = JsonSerializer.Deserialize<FleetMeasurementDto[]>(body, JsonOpts) ?? Array.Empty<FleetMeasurementDto>();
if (rows.Length == 0) return (new FleetIngestResult(0, 0, Array.Empty<string>()), null);
var minTime = rows[0].Time;
var maxTime = rows[0].Time;
for (int i = 1; i < rows.Length; i++)
{
if (rows[i].Time < minTime) minTime = rows[i].Time;
if (rows[i].Time > maxTime) maxTime = rows[i].Time;
}
var spread = maxTime - minTime;
var sql = new StringBuilder(
"""
INSERT INTO fleet."PowerMeasurements"
("Time","CustomerId","DeviceId","ActivePowerKw","ReactivePowerKvar","ApparentPowerKva",
"PowerFactor","VoltageV","FrequencyHz","EnergyImportedKwh","EnergyExportedKwh","Source")
VALUES
""");
var ps = new List<NpgsqlParameter>(rows.Length * 11);
for (int i = 0; i < rows.Length; i++)
{
if (i > 0) sql.Append(',');
var b = i * 11;
sql.Append($" (@p{b},@cust,@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5},@p{b + 6},@p{b + 7},@p{b + 8},@p{b + 9},@p{b + 10})");
ps.Add(new($"p{b}", DateTime.SpecifyKind(rows[i].Time, DateTimeKind.Utc)));
ps.Add(new($"p{b + 1}", rows[i].DeviceId));
ps.Add(new($"p{b + 2}", rows[i].ActivePowerKw));
ps.Add(new($"p{b + 3}", (object?)rows[i].ReactivePowerKvar ?? DBNull.Value));
ps.Add(new($"p{b + 4}", (object?)rows[i].ApparentPowerKva ?? DBNull.Value));
ps.Add(new($"p{b + 5}", (object?)rows[i].PowerFactor ?? DBNull.Value));
ps.Add(new($"p{b + 6}", (object?)rows[i].VoltageV ?? DBNull.Value));
ps.Add(new($"p{b + 7}", (object?)rows[i].FrequencyHz ?? DBNull.Value));
ps.Add(new($"p{b + 8}", (object?)rows[i].EnergyImportedKwh ?? DBNull.Value));
ps.Add(new($"p{b + 9}", (object?)rows[i].EnergyExportedKwh ?? DBNull.Value));
ps.Add(new($"p{b + 10}", (object?)rows[i].Source ?? DBNull.Value));
}
ps.Add(new("cust", customerId));
sql.Append(""" ON CONFLICT ("Time","CustomerId","DeviceId") DO NOTHING;""");
var affected = await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct);
var rejected = rows.Length - affected; // ON CONFLICT skipped these
return (new FleetIngestResult(affected, rejected, Array.Empty<string>()), spread);
}
private async Task WriteEventAsync(
Guid customerId, string? batchType, int accepted, int rejected, int batchBytes,
string? cursor, string? error, TimeSpan? spread, CancellationToken ct)
{
if (customerId == Guid.Empty) return; // can't FK to nothing
db.IngestEvents.Add(new IngestEvent
{
CustomerId = customerId,
BatchType = batchType ?? "?",
RowsAccepted = accepted,
RowsRejected = rejected,
BatchBytes = batchBytes,
ClientHwm = Truncate(cursor, 50),
Error = error,
TimeSpread = spread,
ReceivedAt = DateTime.UtcNow
});
await db.SaveChangesAsync(ct);
}
private static string? Truncate(string? s, int max) =>
s is null ? null : (s.Length <= max ? s : s.Substring(0, max));
}
public sealed class UnauthorizedFleetIngestException : Exception { }
public sealed class InvalidFleetBatchException(string message) : Exception(message);

View File

@ -0,0 +1,111 @@
using System.Net;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Options;
using Tau.Acuvim.Portal.Configuration;
using Tau.Acuvim.Portal.DTOs;
namespace Tau.Acuvim.Portal.Services;
public sealed class FleetPushResult
{
public bool Succeeded { get; init; }
public int Accepted { get; init; }
public int Rejected { get; init; }
public HttpStatusCode StatusCode { get; init; }
public string? Error { get; init; }
public TimeSpan? RetryAfter { get; init; }
}
// HttpClient wrapper for POSTing batches to the Admin ingest endpoint.
public sealed class FleetPushClient(
HttpClient http,
IOptions<FleetIngestOptions> options,
ILogger<FleetPushClient> log)
{
private static readonly JsonSerializerOptions JsonOpts = new(JsonSerializerDefaults.Web);
public Task<FleetPushResult> PushSitesAsync(IReadOnlyList<FleetSiteDto> rows, DateTime cursor, CancellationToken ct)
=> PushAsync("sites", rows, cursor, ct);
public Task<FleetPushResult> PushDevicesAsync(IReadOnlyList<FleetDeviceDto> rows, DateTime cursor, CancellationToken ct)
=> PushAsync("devices", rows, cursor, ct);
public Task<FleetPushResult> PushMeasurementsAsync(IReadOnlyList<FleetMeasurementDto> rows, DateTime cursor, CancellationToken ct)
=> PushAsync("measurements", rows, cursor, ct);
private async Task<FleetPushResult> PushAsync<T>(
string batchType,
IReadOnlyList<T> rows,
DateTime cursor,
CancellationToken ct)
{
var opts = options.Value;
if (string.IsNullOrWhiteSpace(opts.Url) || string.IsNullOrWhiteSpace(opts.Token))
{
return new FleetPushResult { Succeeded = false, Error = "FleetIngest URL or Token not configured." };
}
var json = JsonSerializer.Serialize(rows, JsonOpts);
var bytes = Encoding.UTF8.GetBytes(json);
if (bytes.Length > opts.BatchMaxBytes)
{
return new FleetPushResult { Succeeded = false, StatusCode = HttpStatusCode.RequestEntityTooLarge,
Error = $"Local pre-check: batch size {bytes.Length} > {opts.BatchMaxBytes}. Halve batch and retry." };
}
using var req = new HttpRequestMessage(HttpMethod.Post, opts.Url);
req.Headers.Add("X-Customer-Token", opts.Token);
req.Headers.Add("X-Batch-Type", batchType);
req.Headers.Add("X-Push-Cursor", cursor.ToString("O"));
req.Content = new ByteArrayContent(bytes);
req.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
try
{
using var resp = await http.SendAsync(req, ct);
var status = resp.StatusCode;
if (status == HttpStatusCode.OK)
{
try
{
var body = await resp.Content.ReadAsStringAsync(ct);
var result = JsonSerializer.Deserialize<FleetIngestResult>(body, JsonOpts);
return new FleetPushResult
{
Succeeded = true,
Accepted = result?.Accepted ?? rows.Count,
Rejected = result?.Rejected ?? 0,
StatusCode = status
};
}
catch
{
return new FleetPushResult { Succeeded = true, Accepted = rows.Count, StatusCode = status };
}
}
TimeSpan? retryAfter = null;
if (resp.Headers.RetryAfter?.Delta is { } d) retryAfter = d;
return new FleetPushResult
{
Succeeded = false,
StatusCode = status,
RetryAfter = retryAfter,
Error = $"HTTP {(int)status} {status}"
};
}
catch (TaskCanceledException) when (ct.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
log.LogWarning(ex, "Fleet push transport failure to {Url}", opts.Url);
return new FleetPushResult { Succeeded = false, Error = ex.Message };
}
}
}

View File

@ -0,0 +1,221 @@
using System.Net;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Tau.Acuvim.Portal.Configuration;
using Tau.Acuvim.Portal.Data;
using Tau.Acuvim.Portal.Domain.Fleet;
using Tau.Acuvim.Portal.DTOs;
namespace Tau.Acuvim.Portal.Services;
// Background loop that pushes sites/devices/measurements to the Admin fleet ingest endpoint.
// Registered only when RunMode=Client AND FleetIngest__Enabled=true.
//
// Per tick:
// 1. Sites (full set, idempotent upsert on Admin side)
// 2. Devices (full set)
// 3. Measurements: up to 3 batches × BatchSize rows, cursor by ReceivedAt
// Then sleep IntervalSeconds.
public sealed class FleetPushService(
IServiceProvider services,
IOptions<FleetIngestOptions> options,
ILogger<FleetPushService> log)
: BackgroundService
{
private const int MaxBatchesPerTickPerResource = 3;
private static readonly TimeSpan MaxBackoff = TimeSpan.FromMinutes(30);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var interval = TimeSpan.FromSeconds(Math.Max(5, options.Value.IntervalSeconds));
log.LogInformation("FleetPushService started. Url={Url}, interval={Interval}s",
options.Value.Url, interval.TotalSeconds);
// Brief startup grace so MigrateAsync + IdentityBootstrapper finish before we hit the DB.
try { await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); }
catch (TaskCanceledException) { return; }
while (!stoppingToken.IsCancellationRequested)
{
try
{
await RunTickAsync(stoppingToken);
}
catch (Exception ex) when (ex is not TaskCanceledException)
{
log.LogError(ex, "FleetPushService tick failed");
}
try { await Task.Delay(interval, stoppingToken); }
catch (TaskCanceledException) { break; }
}
}
private async Task RunTickAsync(CancellationToken ct)
{
using var scope = services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var client = scope.ServiceProvider.GetRequiredService<FleetPushClient>();
if (!await ShouldRunNowAsync(db, ct))
{
return;
}
var sitesOk = await PushSitesAsync(db, client, ct);
if (!sitesOk) return;
var devicesOk = await PushDevicesAsync(db, client, ct);
if (!devicesOk) return;
await PushMeasurementsAsync(db, client, ct);
}
// ── Sites: full set every tick ─────────────────────────────────────────
private async Task<bool> PushSitesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceSites, ct);
var rows = await db.Sites.AsNoTracking()
.Select(s => new FleetSiteDto(s.Id, s.Name, s.Address, s.MunicipalityId, s.IsActive))
.ToListAsync(ct);
if (rows.Count == 0)
{
await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct);
return true;
}
var result = await client.PushSitesAsync(rows, DateTime.UtcNow, ct);
return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct);
}
// ── Devices: full set every tick ───────────────────────────────────────
private async Task<bool> PushDevicesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceDevices, ct);
var rows = await db.Devices.AsNoTracking()
.Select(d => new FleetDeviceDto(d.Id, d.SiteId, d.Name, d.ExternalId, d.Description, d.IsActive))
.ToListAsync(ct);
if (rows.Count == 0)
{
await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct);
return true;
}
var result = await client.PushDevicesAsync(rows, DateTime.UtcNow, ct);
return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct);
}
// ── Measurements: cursor-driven batches up to MaxBatchesPerTickPerResource ─
private async Task PushMeasurementsAsync(AppDbContext db, FleetPushClient client, CancellationToken ct)
{
var state = await GetStateAsync(db, FleetPushState.ResourceMeasurements, ct);
var cursor = state.LastCursor ?? DateTime.MinValue.ToUniversalTime();
int batchSize = options.Value.BatchSize;
for (int i = 0; i < MaxBatchesPerTickPerResource; i++)
{
var rows = await db.PowerMeasurements.AsNoTracking()
.Where(m => m.ReceivedAt > cursor)
.OrderBy(m => m.ReceivedAt)
.Take(batchSize)
.Select(m => new FleetMeasurementDto(
m.Time, m.DeviceId, m.ActivePowerKw,
m.ReactivePowerKvar, m.ApparentPowerKva, m.PowerFactor,
m.VoltageV, m.FrequencyHz,
m.EnergyImportedKwh, m.EnergyExportedKwh,
m.Source, m.ReceivedAt))
.ToListAsync(ct);
if (rows.Count == 0) break;
var newCursor = rows[^1].ReceivedAt;
var result = await client.PushMeasurementsAsync(rows, newCursor, ct);
if (result.StatusCode == HttpStatusCode.RequestEntityTooLarge)
{
batchSize = Math.Max(100, batchSize / 2);
log.LogWarning("Ingest returned 413; halving batch size to {BatchSize}", batchSize);
continue;
}
var ok = await HandleResultAsync(db, state, result, cursor: newCursor, ct);
if (!ok) return;
cursor = newCursor;
if (rows.Count < batchSize) break;
}
}
// ── State helpers ──────────────────────────────────────────────────────
private static async Task<FleetPushState> GetStateAsync(AppDbContext db, string resourceType, CancellationToken ct)
{
var row = await db.FleetPushState.FirstOrDefaultAsync(x => x.ResourceType == resourceType, ct);
if (row is null)
{
row = new FleetPushState { ResourceType = resourceType };
db.FleetPushState.Add(row);
await db.SaveChangesAsync(ct);
}
return row;
}
private static async Task MarkSuccessAsync(AppDbContext db, FleetPushState state, DateTime cursor, CancellationToken ct)
{
state.LastCursor = cursor;
state.LastSyncedAt = DateTime.UtcNow;
state.LastError = null;
state.ConsecutiveFailures = 0;
await db.SaveChangesAsync(ct);
}
private async Task<bool> HandleResultAsync(
AppDbContext db, FleetPushState state, FleetPushResult result, DateTime cursor, CancellationToken ct)
{
if (result.Succeeded)
{
await MarkSuccessAsync(db, state, cursor, ct);
if (result.Rejected > 0)
{
log.LogWarning("Ingest accepted={Accepted} rejected={Rejected} for {Resource}",
result.Accepted, result.Rejected, state.ResourceType);
}
return true;
}
state.LastError = result.Error;
state.ConsecutiveFailures++;
state.LastSyncedAt = DateTime.UtcNow;
await db.SaveChangesAsync(ct);
var backoff = result.RetryAfter ?? BackoffFor(state.ConsecutiveFailures);
log.LogWarning("Push failed for {Resource}: {Error}. Failures={N}, next attempt after ~{Backoff}.",
state.ResourceType, result.Error, state.ConsecutiveFailures, backoff);
// Stop this tick; the next interval handles the retry naturally.
return false;
}
private static TimeSpan BackoffFor(int failures)
{
var baseMinutes = Math.Min(Math.Pow(2, Math.Max(0, failures - 1)), 30);
return TimeSpan.FromMinutes(baseMinutes);
}
// Block when we're in a long backoff window — only attempt if enough time has passed
// since LastSyncedAt to cover the backoff. Cheap: read one row.
private async Task<bool> ShouldRunNowAsync(AppDbContext db, CancellationToken ct)
{
var maxFailures = await db.FleetPushState.AsNoTracking()
.Select(x => new { x.LastSyncedAt, x.ConsecutiveFailures })
.ToListAsync(ct);
if (maxFailures.Count == 0) return true;
foreach (var row in maxFailures)
{
if (row.ConsecutiveFailures == 0) return true;
if (row.LastSyncedAt is null) return true;
var since = DateTime.UtcNow - row.LastSyncedAt.Value;
var backoff = BackoffFor(row.ConsecutiveFailures);
if (since >= backoff) return true;
}
return false;
}
}

View File

@ -0,0 +1,85 @@
using Microsoft.EntityFrameworkCore;
using Tau.Acuvim.Portal.Data;
namespace Tau.Acuvim.Portal.Services;
// Admin-side equivalent of TimescaleBootstrapper. Runs once after MigrateAsync on Admin startup.
// Sets up the fleet.PowerMeasurements hypertable + compression policy + hourly CA.
// Idempotent — safe on every start.
public sealed class FleetTimescaleBootstrapper(
AdminDbContext db,
ILogger<FleetTimescaleBootstrapper> log)
{
public async Task EnsureAsync(CancellationToken ct = default)
{
await db.Database.ExecuteSqlRawAsync(
"CREATE EXTENSION IF NOT EXISTS timescaledb;", ct);
await db.Database.ExecuteSqlRawAsync(
"""
SELECT create_hypertable(
'fleet."PowerMeasurements"',
'Time',
if_not_exists => TRUE,
migrate_data => TRUE
);
""", ct);
await db.Database.ExecuteSqlRawAsync(
"""
SELECT set_chunk_time_interval('fleet."PowerMeasurements"', INTERVAL '7 days');
""", ct);
// Compression — chunks older than 7 days. Segment by (CustomerId, DeviceId) so
// point queries by customer + device stay fast even on compressed chunks.
// ALTER TABLE ... SET is idempotent in TimescaleDB (no error if already set).
await db.Database.ExecuteSqlRawAsync(
"""
ALTER TABLE fleet."PowerMeasurements" SET (
timescaledb.compress,
timescaledb.compress_segmentby = '"CustomerId","DeviceId"',
timescaledb.compress_orderby = '"Time" DESC'
);
""", ct);
await db.Database.ExecuteSqlRawAsync(
"""
SELECT add_compression_policy('fleet."PowerMeasurements"', INTERVAL '7 days', if_not_exists => TRUE);
""", ct);
// Hourly per-device continuous aggregate. Realtime (materialized_only=false) so
// late firmware back-fills appear in CA queries immediately, served live from
// the underlying hypertable until next refresh tick.
await db.Database.ExecuteSqlRawAsync(
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS fleet.hourly_per_device
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
SELECT
"CustomerId",
"DeviceId",
time_bucket(INTERVAL '1 hour', "Time") AS bucket,
avg("ActivePowerKw") AS avg_kw,
max("ActivePowerKw") AS max_kw,
min("ActivePowerKw") AS min_kw,
max("EnergyImportedKwh") - min("EnergyImportedKwh") AS kwh_imported_delta,
max("EnergyExportedKwh") - min("EnergyExportedKwh") AS kwh_exported_delta,
count(*) AS samples
FROM fleet."PowerMeasurements"
GROUP BY "CustomerId", "DeviceId", bucket
WITH NO DATA;
""", ct);
await db.Database.ExecuteSqlRawAsync(
"""
SELECT add_continuous_aggregate_policy(
'fleet.hourly_per_device',
start_offset => INTERVAL '30 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '5 minutes',
if_not_exists => TRUE
);
""", ct);
log.LogInformation("Fleet hypertable + compression + hourly_per_device CA ready.");
}
}

View File

@ -0,0 +1,23 @@
using Tau.Acuvim.Portal.Services;
namespace Tau.Acuvim.Portal.Tests;
// Pure validation behaviour of FleetIngestService.IngestAsync — token + batch type checks.
// Service-internal logic that doesn't touch the DB. The actual DB write paths are
// exercised in the manual two-stack smoke test (TESTING.md → Phase 14 scenario).
public class FleetIngestValidationTests
{
[Fact]
public void UnauthorizedException_HasNoMessage()
{
var ex = new UnauthorizedFleetIngestException();
Assert.NotNull(ex);
}
[Fact]
public void InvalidFleetBatchException_CarriesMessage()
{
var ex = new InvalidFleetBatchException("nope");
Assert.Equal("nope", ex.Message);
}
}