diff --git a/portal/.env.example b/portal/.env.example index 0ab3e97..7d82ed0 100644 --- a/portal/.env.example +++ b/portal/.env.example @@ -26,3 +26,17 @@ Authentication__DefaultAdminPassword=ChangeMe123! GRAFANA_ADMIN_PASSWORD=admin # Path prefix Grafana is mounted at behind Traefik. Same-origin embed in the SPA. Grafana__EmbedPathPrefix=/grafana + +# ── RunMode (Client | Admin) ───────────────────────────────────────────── +# Client (default) = per-customer stack. Admin = fleet aggregator. +# Application__RunMode=Client + +# ── Fleet ingest (Client mode → push to Admin) ─────────────────────────── +# When Enabled=true, URL and Token are REQUIRED (RunModeGuards refuse otherwise). +# Get the token from the Admin Customers page; it is shown once at creation/rotation. +# FleetIngest__Enabled=false +# FleetIngest__Url=https://admin.portal.example.com/api/fleet/ingest +# FleetIngest__Token= +# FleetIngest__IntervalSeconds=60 +# FleetIngest__BatchSize=5000 +# FleetIngest__BatchMaxBytes=1048576 diff --git a/portal/README.md b/portal/README.md index 6c246fb..c8d528e 100644 --- a/portal/README.md +++ b/portal/README.md @@ -386,7 +386,7 @@ For per-customer provisioning, secret rotation, backups, and health monitoring s A second deployment of the same image — `RunMode=Admin`, separate DB — aggregates data from all customer stacks for a fleet-wide operator view. See [docs/FLEET-DESIGN.md](./docs/FLEET-DESIGN.md) for the full design. -**Phase 13 (this release):** the Admin stack runs, the Customers page registers customers and issues push tokens (shown once). Customer stacks pick up `RunMode=Client` + `FleetIngest__*` config but don't yet push — that lands in Phase 14. +**Phase 14 (this release):** the full push pipeline is live. Customer stacks with `FleetIngest__Enabled=true` run a `FleetPushService` background loop that batches sites, devices, and measurements (cursor by `ReceivedAt` — firmware buffer-and-replay back-fills get picked up automatically) and POSTs them to `FleetIngest__Url` with `X-Customer-Token`. Admin's `/api/fleet/ingest` upserts and writes an `IngestEvents` audit row per batch. Admin's `FleetTimescaleBootstrapper` makes `fleet.PowerMeasurements` a hypertable with compression-after-7-days and a realtime `fleet.hourly_per_device` continuous aggregate. **Spin up an Admin stack:** ```powershell @@ -400,4 +400,19 @@ docker run -d --name admin-portal --network ` portal-dev-portal ``` -Then sign in at `http://localhost:8090` → **Customers** → register the customer → token shown once. Drop the token into the customer stack's `.env` as `FleetIngest__Token` (Phase 14 uses it). +Then sign in at `http://localhost:8090` → **Customers** → register the customer → token shown once. + +**Enable push on the customer stack:** add to that customer's `.env`: +```ini +Application__RunMode=Client +FleetIngest__Enabled=true +FleetIngest__Url=http://admin-portal:8080/api/fleet/ingest # container DNS in-network, or https://admin-host +FleetIngest__Token= +FleetIngest__IntervalSeconds=60 # default +FleetIngest__BatchSize=5000 # default +``` +Restart the customer's portal container — the `FleetPushService` starts on its own. Verify the audit trail on the Admin side: +```powershell +docker exec psql -U power_user -d admin_fleet -c ` + 'SELECT \"BatchType\",\"RowsAccepted\",\"ReceivedAt\" FROM fleet.\"IngestEvents\" ORDER BY \"ReceivedAt\" DESC LIMIT 10;' +``` diff --git a/portal/docker-compose.yml b/portal/docker-compose.yml index c47dcfb..1afa2d7 100644 --- a/portal/docker-compose.yml +++ b/portal/docker-compose.yml @@ -17,6 +17,14 @@ services: - Authentication__DefaultAdminPassword=${Authentication__DefaultAdminPassword:-ChangeMe123!} - Grafana__BaseUrl=http://localhost:3001 - Grafana__InternalUrl=http://grafana:3000 + # RunMode: Client (default) or Admin. Override in .env to test fleet aggregation locally. + - Application__RunMode=${Application__RunMode:-Client} + # Fleet ingest (Client mode): set Enabled=true + Url + Token to enable the push background service. + - FleetIngest__Enabled=${FleetIngest__Enabled:-false} + - FleetIngest__Url=${FleetIngest__Url:-} + - FleetIngest__Token=${FleetIngest__Token:-} + - FleetIngest__IntervalSeconds=${FleetIngest__IntervalSeconds:-60} + - FleetIngest__BatchSize=${FleetIngest__BatchSize:-5000} depends_on: timescaledb: condition: service_healthy diff --git a/portal/src/Tau.Acuvim.Portal/DTOs/FleetIngestDtos.cs b/portal/src/Tau.Acuvim.Portal/DTOs/FleetIngestDtos.cs new file mode 100644 index 0000000..556e17e --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/DTOs/FleetIngestDtos.cs @@ -0,0 +1,35 @@ +namespace Tau.Acuvim.Portal.DTOs; + +// Wire types for the /api/fleet/ingest endpoint. +// Same shapes used on both push (Client) and ingest (Admin) sides. + +public sealed record FleetSiteDto( + Guid Id, + string Name, + string? Address, + int? LocalMunicipalityId, + bool IsActive); + +public sealed record FleetDeviceDto( + Guid Id, + Guid SiteId, + string Name, + string ExternalId, + string? Description, + bool IsActive); + +public sealed record FleetMeasurementDto( + DateTime Time, + Guid DeviceId, + double ActivePowerKw, + double? ReactivePowerKvar, + double? ApparentPowerKva, + double? PowerFactor, + double? VoltageV, + double? FrequencyHz, + double? EnergyImportedKwh, + double? EnergyExportedKwh, + string? Source, + DateTime ReceivedAt); + +public sealed record FleetIngestResult(int Accepted, int Rejected, string[] Errors); diff --git a/portal/src/Tau.Acuvim.Portal/Data/AppDbContext.cs b/portal/src/Tau.Acuvim.Portal/Data/AppDbContext.cs index 7bb4cf4..307179e 100644 --- a/portal/src/Tau.Acuvim.Portal/Data/AppDbContext.cs +++ b/portal/src/Tau.Acuvim.Portal/Data/AppDbContext.cs @@ -2,6 +2,7 @@ using Microsoft.AspNetCore.Identity; using Microsoft.AspNetCore.Identity.EntityFrameworkCore; using Microsoft.EntityFrameworkCore; using Tau.Acuvim.Portal.Domain.Branding; +using Tau.Acuvim.Portal.Domain.Fleet; using Tau.Acuvim.Portal.Domain.Identity; using Tau.Acuvim.Portal.Domain.Monitoring; using Tau.Acuvim.Portal.Domain.Rates; @@ -21,6 +22,7 @@ public class AppDbContext : IdentityDbContext Sites => Set(); public DbSet Devices => Set(); public DbSet PowerMeasurements => Set(); + public DbSet FleetPushState => Set(); protected override void OnModelCreating(ModelBuilder builder) { @@ -84,10 +86,18 @@ public class AppDbContext : IdentityDbContext new { x.Time, x.DeviceId }); entity.HasIndex(x => new { x.DeviceId, x.Time }).IsDescending(false, true); + entity.Property(x => x.ReceivedAt).HasDefaultValueSql("NOW()"); + entity.HasIndex(x => x.ReceivedAt); entity.HasOne(x => x.Device) .WithMany() .HasForeignKey(x => x.DeviceId) .OnDelete(DeleteBehavior.Cascade); }); + + builder.Entity(entity => + { + entity.ToTable("FleetPushState", schema: "app"); + entity.HasKey(x => x.ResourceType); + }); } } diff --git a/portal/src/Tau.Acuvim.Portal/Domain/Fleet/FleetPushState.cs b/portal/src/Tau.Acuvim.Portal/Domain/Fleet/FleetPushState.cs new file mode 100644 index 0000000..103fc89 --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Domain/Fleet/FleetPushState.cs @@ -0,0 +1,22 @@ +using System.ComponentModel.DataAnnotations; + +namespace Tau.Acuvim.Portal.Domain.Fleet; + +// Client-side state for the fleet push pipeline. One row per resource type. +public class FleetPushState +{ + public const string ResourceSites = "sites"; + public const string ResourceDevices = "devices"; + public const string ResourceMeasurements = "measurements"; + + [MaxLength(20)] + public string ResourceType { get; set; } = string.Empty; + + public DateTime? LastCursor { get; set; } + public DateTime? LastSyncedAt { get; set; } + + [MaxLength(500)] + public string? LastError { get; set; } + + public int ConsecutiveFailures { get; set; } +} diff --git a/portal/src/Tau.Acuvim.Portal/Domain/Monitoring/PowerMeasurement.cs b/portal/src/Tau.Acuvim.Portal/Domain/Monitoring/PowerMeasurement.cs index 4afdc29..7a968d3 100644 --- a/portal/src/Tau.Acuvim.Portal/Domain/Monitoring/PowerMeasurement.cs +++ b/portal/src/Tau.Acuvim.Portal/Domain/Monitoring/PowerMeasurement.cs @@ -21,4 +21,9 @@ public class PowerMeasurement [MaxLength(50)] public string? Source { get; set; } + + // Server-assigned timestamp on insert. Drives the fleet-push cursor — pushes find + // rows with ReceivedAt > LastCursor, which picks up firmware buffer-and-replay + // back-fills correctly (their Time is in the past but ReceivedAt is now). + public DateTime ReceivedAt { get; set; } } diff --git a/portal/src/Tau.Acuvim.Portal/Endpoints/FleetIngestEndpoints.cs b/portal/src/Tau.Acuvim.Portal/Endpoints/FleetIngestEndpoints.cs new file mode 100644 index 0000000..87f0bcd --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Endpoints/FleetIngestEndpoints.cs @@ -0,0 +1,35 @@ +using Tau.Acuvim.Portal.Services; + +namespace Tau.Acuvim.Portal.Endpoints; + +public static class FleetIngestEndpoints +{ + public static IEndpointRouteBuilder MapFleetIngestEndpoints(this IEndpointRouteBuilder app) + { + app.MapPost("/api/fleet/ingest", async ( + HttpContext ctx, + FleetIngestService svc, + CancellationToken ct) => + { + var token = ctx.Request.Headers["X-Customer-Token"].ToString(); + var batchType = ctx.Request.Headers["X-Batch-Type"].ToString(); + var cursor = ctx.Request.Headers["X-Push-Cursor"].ToString(); + + using var reader = new StreamReader(ctx.Request.Body); + var body = await reader.ReadToEndAsync(ct); + + try + { + var result = await svc.IngestAsync(token, batchType, cursor, body, ct); + return Results.Ok(result); + } + catch (UnauthorizedFleetIngestException) { return Results.Unauthorized(); } + catch (InvalidFleetBatchException ex) { return Results.BadRequest(new { error = ex.Message }); } + }) + .AllowAnonymous() + .DisableAntiforgery() + .WithTags("Fleet"); + + return app; + } +} diff --git a/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.Designer.cs b/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.Designer.cs new file mode 100644 index 0000000..15460fd --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.Designer.cs @@ -0,0 +1,662 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using Tau.Acuvim.Portal.Data; + +#nullable disable + +namespace Tau.Acuvim.Portal.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20260518081228_AddReceivedAtAndPushState")] + partial class AddReceivedAtAndPushState + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("app") + .HasAnnotation("ProductVersion", "10.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRole", b => + { + b.Property("Id") + .HasColumnType("text"); + + b.Property("ConcurrencyStamp") + .IsConcurrencyToken() + .HasColumnType("text"); + + b.Property("Name") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.Property("NormalizedName") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.HasKey("Id"); + + b.HasIndex("NormalizedName") + .IsUnique() + .HasDatabaseName("RoleNameIndex"); + + b.ToTable("AspNetRoles", "identity"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRoleClaim", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("ClaimType") + .HasColumnType("text"); + + b.Property("ClaimValue") + .HasColumnType("text"); + + b.Property("RoleId") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("RoleId"); + + b.ToTable("AspNetRoleClaims", "identity"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserClaim", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("ClaimType") + .HasColumnType("text"); + + b.Property("ClaimValue") + .HasColumnType("text"); + + b.Property("UserId") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("UserId"); + + b.ToTable("AspNetUserClaims", "identity"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserLogin", b => + { + b.Property("LoginProvider") + .HasColumnType("text"); + + b.Property("ProviderKey") + .HasColumnType("text"); + + b.Property("ProviderDisplayName") + .HasColumnType("text"); + + b.Property("UserId") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("LoginProvider", "ProviderKey"); + + b.HasIndex("UserId"); + + b.ToTable("AspNetUserLogins", "identity"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserRole", b => + { + b.Property("UserId") + .HasColumnType("text"); + + b.Property("RoleId") + .HasColumnType("text"); + + b.HasKey("UserId", "RoleId"); + + b.HasIndex("RoleId"); + + b.ToTable("AspNetUserRoles", "identity"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserToken", b => + { + b.Property("UserId") + .HasColumnType("text"); + + b.Property("LoginProvider") + .HasColumnType("text"); + + b.Property("Name") + .HasColumnType("text"); + + b.Property("Value") + .HasColumnType("text"); + + b.HasKey("UserId", "LoginProvider", "Name"); + + b.ToTable("AspNetUserTokens", "identity"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Branding.WhiteLabelSettings", b => + { + b.Property("Id") + .HasColumnType("integer"); + + b.Property("AccentColor") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("ApplicationName") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("FooterText") + .IsRequired() + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("LogoUrl") + .IsRequired() + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("PrimaryColor") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("SecondaryColor") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("UpdatedAt") + .HasColumnType("timestamp with time zone"); + + b.HasKey("Id"); + + b.ToTable("WhiteLabelSettings", "app"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Fleet.FleetPushState", b => + { + b.Property("ResourceType") + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("ConsecutiveFailures") + .HasColumnType("integer"); + + b.Property("LastCursor") + .HasColumnType("timestamp with time zone"); + + b.Property("LastError") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("LastSyncedAt") + .HasColumnType("timestamp with time zone"); + + b.HasKey("ResourceType"); + + b.ToTable("FleetPushState", "app"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", b => + { + b.Property("Id") + .HasColumnType("text"); + + b.Property("AccessFailedCount") + .HasColumnType("integer"); + + b.Property("ConcurrencyStamp") + .IsConcurrencyToken() + .HasColumnType("text"); + + b.Property("CreatedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("DisplayName") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("Email") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.Property("EmailConfirmed") + .HasColumnType("boolean"); + + b.Property("IsActive") + .HasColumnType("boolean"); + + b.Property("LockoutEnabled") + .HasColumnType("boolean"); + + b.Property("LockoutEnd") + .HasColumnType("timestamp with time zone"); + + b.Property("NormalizedEmail") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.Property("NormalizedUserName") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.Property("PasswordHash") + .HasColumnType("text"); + + b.Property("PhoneNumber") + .HasColumnType("text"); + + b.Property("PhoneNumberConfirmed") + .HasColumnType("boolean"); + + b.Property("SecurityStamp") + .HasColumnType("text"); + + b.Property("TwoFactorEnabled") + .HasColumnType("boolean"); + + b.Property("UserName") + .HasMaxLength(256) + .HasColumnType("character varying(256)"); + + b.HasKey("Id"); + + b.HasIndex("NormalizedEmail") + .HasDatabaseName("EmailIndex"); + + b.HasIndex("NormalizedUserName") + .IsUnique() + .HasDatabaseName("UserNameIndex"); + + b.ToTable("AspNetUsers", "identity"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Device", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("CreatedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("Description") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("ExternalId") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("IsActive") + .HasColumnType("boolean"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("SiteId") + .HasColumnType("uuid"); + + b.HasKey("Id"); + + b.HasIndex("ExternalId"); + + b.HasIndex("SiteId", "ExternalId") + .IsUnique(); + + b.ToTable("Devices", "monitoring"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.PowerMeasurement", b => + { + b.Property("Time") + .HasColumnType("timestamp with time zone"); + + b.Property("DeviceId") + .HasColumnType("uuid"); + + b.Property("ActivePowerKw") + .HasColumnType("double precision"); + + b.Property("ApparentPowerKva") + .HasColumnType("double precision"); + + b.Property("EnergyExportedKwh") + .HasColumnType("double precision"); + + b.Property("EnergyImportedKwh") + .HasColumnType("double precision"); + + b.Property("FrequencyHz") + .HasColumnType("double precision"); + + b.Property("PowerFactor") + .HasColumnType("double precision"); + + b.Property("ReactivePowerKvar") + .HasColumnType("double precision"); + + b.Property("ReceivedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasDefaultValueSql("NOW()"); + + b.Property("Source") + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("VoltageV") + .HasColumnType("double precision"); + + b.HasKey("Time", "DeviceId"); + + b.HasIndex("ReceivedAt"); + + b.HasIndex("DeviceId", "Time") + .IsDescending(false, true); + + b.ToTable("PowerMeasurements", "monitoring"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Address") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("CreatedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("IsActive") + .HasColumnType("boolean"); + + b.Property("MunicipalityId") + .HasColumnType("integer"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.HasKey("Id"); + + b.HasIndex("MunicipalityId"); + + b.HasIndex("Name"); + + b.ToTable("Sites", "monitoring"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Municipality", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("IsActive") + .HasColumnType("boolean"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("TimeZoneId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.HasKey("Id"); + + b.HasIndex("Name") + .IsUnique(); + + b.ToTable("Municipalities", "app"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("DefaultRatePerKwh") + .HasPrecision(10, 4) + .HasColumnType("numeric(10,4)"); + + b.Property("EffectiveFrom") + .HasColumnType("date"); + + b.Property("EffectiveTo") + .HasColumnType("date"); + + b.Property("FixedMonthlyCharge") + .HasPrecision(10, 2) + .HasColumnType("numeric(10,2)"); + + b.Property("IsActive") + .HasColumnType("boolean"); + + b.Property("MunicipalityId") + .HasColumnType("integer"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("VatPercentage") + .HasPrecision(5, 2) + .HasColumnType("numeric(5,2)"); + + b.HasKey("Id"); + + b.HasIndex("MunicipalityId", "EffectiveFrom"); + + b.ToTable("Tariffs", "app"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.TariffPeriod", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("DaysOfWeek") + .HasColumnType("integer"); + + b.Property("EndTime") + .HasColumnType("time without time zone"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("RatePerKwh") + .HasPrecision(10, 4) + .HasColumnType("numeric(10,4)"); + + b.Property("StartTime") + .HasColumnType("time without time zone"); + + b.Property("TariffId") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("TariffId"); + + b.ToTable("TariffPeriods", "app"); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityRoleClaim", b => + { + b.HasOne("Microsoft.AspNetCore.Identity.IdentityRole", null) + .WithMany() + .HasForeignKey("RoleId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserClaim", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null) + .WithMany() + .HasForeignKey("UserId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserLogin", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null) + .WithMany() + .HasForeignKey("UserId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserRole", b => + { + b.HasOne("Microsoft.AspNetCore.Identity.IdentityRole", null) + .WithMany() + .HasForeignKey("RoleId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null) + .WithMany() + .HasForeignKey("UserId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + }); + + modelBuilder.Entity("Microsoft.AspNetCore.Identity.IdentityUserToken", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", null) + .WithMany() + .HasForeignKey("UserId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Device", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Monitoring.Site", "Site") + .WithMany("Devices") + .HasForeignKey("SiteId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Site"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.PowerMeasurement", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Monitoring.Device", "Device") + .WithMany() + .HasForeignKey("DeviceId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Device"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Municipality", "Municipality") + .WithMany() + .HasForeignKey("MunicipalityId") + .OnDelete(DeleteBehavior.SetNull); + + b.Navigation("Municipality"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Municipality", "Municipality") + .WithMany("Tariffs") + .HasForeignKey("MunicipalityId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Municipality"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.TariffPeriod", b => + { + b.HasOne("Tau.Acuvim.Portal.Domain.Rates.Tariff", "Tariff") + .WithMany("Periods") + .HasForeignKey("TariffId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Tariff"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Monitoring.Site", b => + { + b.Navigation("Devices"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Municipality", b => + { + b.Navigation("Tariffs"); + }); + + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Rates.Tariff", b => + { + b.Navigation("Periods"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.cs b/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.cs new file mode 100644 index 0000000..f693ef5 --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Migrations/20260518081228_AddReceivedAtAndPushState.cs @@ -0,0 +1,63 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Tau.Acuvim.Portal.Migrations +{ + /// + public partial class AddReceivedAtAndPushState : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "ReceivedAt", + schema: "monitoring", + table: "PowerMeasurements", + type: "timestamp with time zone", + nullable: false, + defaultValueSql: "NOW()"); + + migrationBuilder.CreateTable( + name: "FleetPushState", + schema: "app", + columns: table => new + { + ResourceType = table.Column(type: "character varying(20)", maxLength: 20, nullable: false), + LastCursor = table.Column(type: "timestamp with time zone", nullable: true), + LastSyncedAt = table.Column(type: "timestamp with time zone", nullable: true), + LastError = table.Column(type: "character varying(500)", maxLength: 500, nullable: true), + ConsecutiveFailures = table.Column(type: "integer", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_FleetPushState", x => x.ResourceType); + }); + + migrationBuilder.CreateIndex( + name: "IX_PowerMeasurements_ReceivedAt", + schema: "monitoring", + table: "PowerMeasurements", + column: "ReceivedAt"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "FleetPushState", + schema: "app"); + + migrationBuilder.DropIndex( + name: "IX_PowerMeasurements_ReceivedAt", + schema: "monitoring", + table: "PowerMeasurements"); + + migrationBuilder.DropColumn( + name: "ReceivedAt", + schema: "monitoring", + table: "PowerMeasurements"); + } + } +} diff --git a/portal/src/Tau.Acuvim.Portal/Migrations/AppDbContextModelSnapshot.cs b/portal/src/Tau.Acuvim.Portal/Migrations/AppDbContextModelSnapshot.cs index 26b9f34..88a25c7 100644 --- a/portal/src/Tau.Acuvim.Portal/Migrations/AppDbContextModelSnapshot.cs +++ b/portal/src/Tau.Acuvim.Portal/Migrations/AppDbContextModelSnapshot.cs @@ -198,6 +198,30 @@ namespace Tau.Acuvim.Portal.Migrations b.ToTable("WhiteLabelSettings", "app"); }); + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Fleet.FleetPushState", b => + { + b.Property("ResourceType") + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("ConsecutiveFailures") + .HasColumnType("integer"); + + b.Property("LastCursor") + .HasColumnType("timestamp with time zone"); + + b.Property("LastError") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("LastSyncedAt") + .HasColumnType("timestamp with time zone"); + + b.HasKey("ResourceType"); + + b.ToTable("FleetPushState", "app"); + }); + modelBuilder.Entity("Tau.Acuvim.Portal.Domain.Identity.ApplicationUser", b => { b.Property("Id") @@ -341,6 +365,11 @@ namespace Tau.Acuvim.Portal.Migrations b.Property("ReactivePowerKvar") .HasColumnType("double precision"); + b.Property("ReceivedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasDefaultValueSql("NOW()"); + b.Property("Source") .HasMaxLength(50) .HasColumnType("character varying(50)"); @@ -350,6 +379,8 @@ namespace Tau.Acuvim.Portal.Migrations b.HasKey("Time", "DeviceId"); + b.HasIndex("ReceivedAt"); + b.HasIndex("DeviceId", "Time") .IsDescending(false, true); diff --git a/portal/src/Tau.Acuvim.Portal/Program.cs b/portal/src/Tau.Acuvim.Portal/Program.cs index b5cc3d6..60cb39e 100644 --- a/portal/src/Tau.Acuvim.Portal/Program.cs +++ b/portal/src/Tau.Acuvim.Portal/Program.cs @@ -133,10 +133,21 @@ try builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); + + if (fleetIngestOptions.Enabled) + { + builder.Services.AddHttpClient(c => + { + c.Timeout = TimeSpan.FromSeconds(30); + }); + builder.Services.AddHostedService(); + } } else { builder.Services.AddScoped(); + builder.Services.AddScoped(); + builder.Services.AddScoped(); } builder.Services.AddHealthChecks() @@ -188,7 +199,9 @@ try { var db = scope.ServiceProvider.GetRequiredService(); await db.Database.MigrateAsync(); - // Admin-side hypertable + continuous aggregates land in Phase 14. + + var fleetTimescale = scope.ServiceProvider.GetRequiredService(); + await fleetTimescale.EnsureAsync(); } var bootstrapper = scope.ServiceProvider.GetRequiredService(); @@ -250,6 +263,7 @@ try else { app.MapAdminCustomersEndpoints(); + app.MapFleetIngestEndpoints(); } app.MapHealthChecks("/health", new Microsoft.AspNetCore.Diagnostics.HealthChecks.HealthCheckOptions diff --git a/portal/src/Tau.Acuvim.Portal/Services/FleetIngestService.cs b/portal/src/Tau.Acuvim.Portal/Services/FleetIngestService.cs new file mode 100644 index 0000000..50d7f96 --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Services/FleetIngestService.cs @@ -0,0 +1,241 @@ +using System.Text; +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Npgsql; +using Tau.Acuvim.Portal.Data; +using Tau.Acuvim.Portal.Domain.Fleet; +using Tau.Acuvim.Portal.DTOs; + +namespace Tau.Acuvim.Portal.Services; + +public enum FleetBatchType { Sites, Devices, Measurements } + +public sealed class FleetIngestService( + AdminDbContext db, + CustomerService customers, + ILogger log) +{ + private static readonly JsonSerializerOptions JsonOpts = new(JsonSerializerDefaults.Web); + + public async Task IngestAsync( + string? token, + string? batchTypeHeader, + string? cursorHeader, + string rawBody, + CancellationToken ct) + { + var bodyBytes = Encoding.UTF8.GetByteCount(rawBody); + + // 1. Token → customer + var customer = await customers.FindByTokenAsync(token ?? string.Empty, ct); + if (customer is null) + { + await WriteEventAsync(customerId: Guid.Empty, batchTypeHeader, accepted: 0, rejected: 0, + bodyBytes, cursorHeader, error: "Unauthorized", spread: null, ct); + throw new UnauthorizedFleetIngestException(); + } + + // 2. Batch type + if (!TryParseBatchType(batchTypeHeader, out var batchType)) + { + await WriteEventAsync(customer.Id, batchTypeHeader, accepted: 0, rejected: 0, + bodyBytes, cursorHeader, error: "Unknown X-Batch-Type", spread: null, ct); + throw new InvalidFleetBatchException($"Unknown X-Batch-Type: '{batchTypeHeader}'"); + } + + // 3. Dispatch + FleetIngestResult result; + TimeSpan? spread = null; + try + { + (result, spread) = batchType switch + { + FleetBatchType.Sites => (await IngestSitesAsync(customer.Id, rawBody, ct), null), + FleetBatchType.Devices => (await IngestDevicesAsync(customer.Id, rawBody, ct), null), + FleetBatchType.Measurements => await IngestMeasurementsAsync(customer.Id, rawBody, ct), + _ => throw new InvalidOperationException() + }; + } + catch (Exception ex) + { + log.LogError(ex, "Fleet ingest failure for {Customer}", customer.Code); + await WriteEventAsync(customer.Id, batchTypeHeader, accepted: 0, rejected: 0, + bodyBytes, cursorHeader, error: Truncate(ex.Message, 500), spread: null, ct); + throw; + } + + // 4. Touch customer FirstSeen/LastSeen + var nowUtc = DateTime.UtcNow; + var c = await db.Customers.FirstAsync(x => x.Id == customer.Id, ct); + c.FirstSeenAt ??= nowUtc; + c.LastSeenAt = nowUtc; + await db.SaveChangesAsync(ct); + + // 5. Audit event + await WriteEventAsync(customer.Id, batchTypeHeader, result.Accepted, result.Rejected, + bodyBytes, cursorHeader, error: null, spread, ct); + + return result; + } + + private static bool TryParseBatchType(string? header, out FleetBatchType type) + { + type = default; + if (string.IsNullOrWhiteSpace(header)) return false; + return header.ToLowerInvariant() switch + { + "sites" => (type = FleetBatchType.Sites) is var _ && true, + "devices" => (type = FleetBatchType.Devices) is var _ && true, + "measurements" => (type = FleetBatchType.Measurements) is var _ && true, + _ => false, + }; + } + + // ── Sites: upsert by (CustomerId, Id) ────────────────────────────────── + private async Task IngestSitesAsync(Guid customerId, string body, CancellationToken ct) + { + var rows = JsonSerializer.Deserialize(body, JsonOpts) ?? Array.Empty(); + if (rows.Length == 0) return new FleetIngestResult(0, 0, Array.Empty()); + + var sql = new StringBuilder( + """ + INSERT INTO fleet."Sites" ("CustomerId","Id","Name","Address","LocalMunicipalityId","IsActive","ReceivedAt") VALUES + """); + var ps = new List(rows.Length * 6); + for (int i = 0; i < rows.Length; i++) + { + if (i > 0) sql.Append(','); + var b = i * 6; + sql.Append($" (@cust,@p{b},@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5})"); + ps.Add(new($"p{b}", rows[i].Id)); + ps.Add(new($"p{b + 1}", rows[i].Name)); + ps.Add(new($"p{b + 2}", (object?)rows[i].Address ?? DBNull.Value)); + ps.Add(new($"p{b + 3}", (object?)rows[i].LocalMunicipalityId ?? DBNull.Value)); + ps.Add(new($"p{b + 4}", rows[i].IsActive)); + ps.Add(new($"p{b + 5}", DateTime.UtcNow)); + } + ps.Add(new("cust", customerId)); + sql.Append(""" + ON CONFLICT ("CustomerId","Id") DO UPDATE SET + "Name"=EXCLUDED."Name", + "Address"=EXCLUDED."Address", + "LocalMunicipalityId"=EXCLUDED."LocalMunicipalityId", + "IsActive"=EXCLUDED."IsActive", + "ReceivedAt"=EXCLUDED."ReceivedAt"; + """); + await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct); + return new FleetIngestResult(rows.Length, 0, Array.Empty()); + } + + // ── Devices: upsert by (CustomerId, Id) ──────────────────────────────── + private async Task IngestDevicesAsync(Guid customerId, string body, CancellationToken ct) + { + var rows = JsonSerializer.Deserialize(body, JsonOpts) ?? Array.Empty(); + if (rows.Length == 0) return new FleetIngestResult(0, 0, Array.Empty()); + + var sql = new StringBuilder( + """ + INSERT INTO fleet."Devices" ("CustomerId","Id","SiteId","Name","ExternalId","Description","IsActive","ReceivedAt") VALUES + """); + var ps = new List(rows.Length * 7); + for (int i = 0; i < rows.Length; i++) + { + if (i > 0) sql.Append(','); + var b = i * 7; + sql.Append($" (@cust,@p{b},@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5},@p{b + 6})"); + ps.Add(new($"p{b}", rows[i].Id)); + ps.Add(new($"p{b + 1}", rows[i].SiteId)); + ps.Add(new($"p{b + 2}", rows[i].Name)); + ps.Add(new($"p{b + 3}", rows[i].ExternalId)); + ps.Add(new($"p{b + 4}", (object?)rows[i].Description ?? DBNull.Value)); + ps.Add(new($"p{b + 5}", rows[i].IsActive)); + ps.Add(new($"p{b + 6}", DateTime.UtcNow)); + } + ps.Add(new("cust", customerId)); + sql.Append(""" + ON CONFLICT ("CustomerId","Id") DO UPDATE SET + "SiteId"=EXCLUDED."SiteId", + "Name"=EXCLUDED."Name", + "ExternalId"=EXCLUDED."ExternalId", + "Description"=EXCLUDED."Description", + "IsActive"=EXCLUDED."IsActive", + "ReceivedAt"=EXCLUDED."ReceivedAt"; + """); + await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct); + return new FleetIngestResult(rows.Length, 0, Array.Empty()); + } + + // ── Measurements: ON CONFLICT DO NOTHING (time-series) ───────────────── + private async Task<(FleetIngestResult, TimeSpan?)> IngestMeasurementsAsync(Guid customerId, string body, CancellationToken ct) + { + var rows = JsonSerializer.Deserialize(body, JsonOpts) ?? Array.Empty(); + if (rows.Length == 0) return (new FleetIngestResult(0, 0, Array.Empty()), null); + + var minTime = rows[0].Time; + var maxTime = rows[0].Time; + for (int i = 1; i < rows.Length; i++) + { + if (rows[i].Time < minTime) minTime = rows[i].Time; + if (rows[i].Time > maxTime) maxTime = rows[i].Time; + } + var spread = maxTime - minTime; + + var sql = new StringBuilder( + """ + INSERT INTO fleet."PowerMeasurements" + ("Time","CustomerId","DeviceId","ActivePowerKw","ReactivePowerKvar","ApparentPowerKva", + "PowerFactor","VoltageV","FrequencyHz","EnergyImportedKwh","EnergyExportedKwh","Source") + VALUES + """); + var ps = new List(rows.Length * 11); + for (int i = 0; i < rows.Length; i++) + { + if (i > 0) sql.Append(','); + var b = i * 11; + sql.Append($" (@p{b},@cust,@p{b + 1},@p{b + 2},@p{b + 3},@p{b + 4},@p{b + 5},@p{b + 6},@p{b + 7},@p{b + 8},@p{b + 9},@p{b + 10})"); + ps.Add(new($"p{b}", DateTime.SpecifyKind(rows[i].Time, DateTimeKind.Utc))); + ps.Add(new($"p{b + 1}", rows[i].DeviceId)); + ps.Add(new($"p{b + 2}", rows[i].ActivePowerKw)); + ps.Add(new($"p{b + 3}", (object?)rows[i].ReactivePowerKvar ?? DBNull.Value)); + ps.Add(new($"p{b + 4}", (object?)rows[i].ApparentPowerKva ?? DBNull.Value)); + ps.Add(new($"p{b + 5}", (object?)rows[i].PowerFactor ?? DBNull.Value)); + ps.Add(new($"p{b + 6}", (object?)rows[i].VoltageV ?? DBNull.Value)); + ps.Add(new($"p{b + 7}", (object?)rows[i].FrequencyHz ?? DBNull.Value)); + ps.Add(new($"p{b + 8}", (object?)rows[i].EnergyImportedKwh ?? DBNull.Value)); + ps.Add(new($"p{b + 9}", (object?)rows[i].EnergyExportedKwh ?? DBNull.Value)); + ps.Add(new($"p{b + 10}", (object?)rows[i].Source ?? DBNull.Value)); + } + ps.Add(new("cust", customerId)); + sql.Append(""" ON CONFLICT ("Time","CustomerId","DeviceId") DO NOTHING;"""); + + var affected = await db.Database.ExecuteSqlRawAsync(sql.ToString(), ps, ct); + var rejected = rows.Length - affected; // ON CONFLICT skipped these + return (new FleetIngestResult(affected, rejected, Array.Empty()), spread); + } + + private async Task WriteEventAsync( + Guid customerId, string? batchType, int accepted, int rejected, int batchBytes, + string? cursor, string? error, TimeSpan? spread, CancellationToken ct) + { + if (customerId == Guid.Empty) return; // can't FK to nothing + db.IngestEvents.Add(new IngestEvent + { + CustomerId = customerId, + BatchType = batchType ?? "?", + RowsAccepted = accepted, + RowsRejected = rejected, + BatchBytes = batchBytes, + ClientHwm = Truncate(cursor, 50), + Error = error, + TimeSpread = spread, + ReceivedAt = DateTime.UtcNow + }); + await db.SaveChangesAsync(ct); + } + + private static string? Truncate(string? s, int max) => + s is null ? null : (s.Length <= max ? s : s.Substring(0, max)); +} + +public sealed class UnauthorizedFleetIngestException : Exception { } +public sealed class InvalidFleetBatchException(string message) : Exception(message); diff --git a/portal/src/Tau.Acuvim.Portal/Services/FleetPushClient.cs b/portal/src/Tau.Acuvim.Portal/Services/FleetPushClient.cs new file mode 100644 index 0000000..cfbcfec --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Services/FleetPushClient.cs @@ -0,0 +1,111 @@ +using System.Net; +using System.Text; +using System.Text.Json; +using Microsoft.Extensions.Options; +using Tau.Acuvim.Portal.Configuration; +using Tau.Acuvim.Portal.DTOs; + +namespace Tau.Acuvim.Portal.Services; + +public sealed class FleetPushResult +{ + public bool Succeeded { get; init; } + public int Accepted { get; init; } + public int Rejected { get; init; } + public HttpStatusCode StatusCode { get; init; } + public string? Error { get; init; } + public TimeSpan? RetryAfter { get; init; } +} + +// HttpClient wrapper for POSTing batches to the Admin ingest endpoint. +public sealed class FleetPushClient( + HttpClient http, + IOptions options, + ILogger log) +{ + private static readonly JsonSerializerOptions JsonOpts = new(JsonSerializerDefaults.Web); + + public Task PushSitesAsync(IReadOnlyList rows, DateTime cursor, CancellationToken ct) + => PushAsync("sites", rows, cursor, ct); + + public Task PushDevicesAsync(IReadOnlyList rows, DateTime cursor, CancellationToken ct) + => PushAsync("devices", rows, cursor, ct); + + public Task PushMeasurementsAsync(IReadOnlyList rows, DateTime cursor, CancellationToken ct) + => PushAsync("measurements", rows, cursor, ct); + + private async Task PushAsync( + string batchType, + IReadOnlyList rows, + DateTime cursor, + CancellationToken ct) + { + var opts = options.Value; + if (string.IsNullOrWhiteSpace(opts.Url) || string.IsNullOrWhiteSpace(opts.Token)) + { + return new FleetPushResult { Succeeded = false, Error = "FleetIngest URL or Token not configured." }; + } + + var json = JsonSerializer.Serialize(rows, JsonOpts); + var bytes = Encoding.UTF8.GetBytes(json); + + if (bytes.Length > opts.BatchMaxBytes) + { + return new FleetPushResult { Succeeded = false, StatusCode = HttpStatusCode.RequestEntityTooLarge, + Error = $"Local pre-check: batch size {bytes.Length} > {opts.BatchMaxBytes}. Halve batch and retry." }; + } + + using var req = new HttpRequestMessage(HttpMethod.Post, opts.Url); + req.Headers.Add("X-Customer-Token", opts.Token); + req.Headers.Add("X-Batch-Type", batchType); + req.Headers.Add("X-Push-Cursor", cursor.ToString("O")); + req.Content = new ByteArrayContent(bytes); + req.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json"); + + try + { + using var resp = await http.SendAsync(req, ct); + var status = resp.StatusCode; + + if (status == HttpStatusCode.OK) + { + try + { + var body = await resp.Content.ReadAsStringAsync(ct); + var result = JsonSerializer.Deserialize(body, JsonOpts); + return new FleetPushResult + { + Succeeded = true, + Accepted = result?.Accepted ?? rows.Count, + Rejected = result?.Rejected ?? 0, + StatusCode = status + }; + } + catch + { + return new FleetPushResult { Succeeded = true, Accepted = rows.Count, StatusCode = status }; + } + } + + TimeSpan? retryAfter = null; + if (resp.Headers.RetryAfter?.Delta is { } d) retryAfter = d; + + return new FleetPushResult + { + Succeeded = false, + StatusCode = status, + RetryAfter = retryAfter, + Error = $"HTTP {(int)status} {status}" + }; + } + catch (TaskCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + log.LogWarning(ex, "Fleet push transport failure to {Url}", opts.Url); + return new FleetPushResult { Succeeded = false, Error = ex.Message }; + } + } +} diff --git a/portal/src/Tau.Acuvim.Portal/Services/FleetPushService.cs b/portal/src/Tau.Acuvim.Portal/Services/FleetPushService.cs new file mode 100644 index 0000000..46aedc2 --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Services/FleetPushService.cs @@ -0,0 +1,221 @@ +using System.Net; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; +using Tau.Acuvim.Portal.Configuration; +using Tau.Acuvim.Portal.Data; +using Tau.Acuvim.Portal.Domain.Fleet; +using Tau.Acuvim.Portal.DTOs; + +namespace Tau.Acuvim.Portal.Services; + +// Background loop that pushes sites/devices/measurements to the Admin fleet ingest endpoint. +// Registered only when RunMode=Client AND FleetIngest__Enabled=true. +// +// Per tick: +// 1. Sites (full set, idempotent upsert on Admin side) +// 2. Devices (full set) +// 3. Measurements: up to 3 batches × BatchSize rows, cursor by ReceivedAt +// Then sleep IntervalSeconds. +public sealed class FleetPushService( + IServiceProvider services, + IOptions options, + ILogger log) + : BackgroundService +{ + private const int MaxBatchesPerTickPerResource = 3; + private static readonly TimeSpan MaxBackoff = TimeSpan.FromMinutes(30); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var interval = TimeSpan.FromSeconds(Math.Max(5, options.Value.IntervalSeconds)); + log.LogInformation("FleetPushService started. Url={Url}, interval={Interval}s", + options.Value.Url, interval.TotalSeconds); + + // Brief startup grace so MigrateAsync + IdentityBootstrapper finish before we hit the DB. + try { await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } + catch (TaskCanceledException) { return; } + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await RunTickAsync(stoppingToken); + } + catch (Exception ex) when (ex is not TaskCanceledException) + { + log.LogError(ex, "FleetPushService tick failed"); + } + + try { await Task.Delay(interval, stoppingToken); } + catch (TaskCanceledException) { break; } + } + } + + private async Task RunTickAsync(CancellationToken ct) + { + using var scope = services.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var client = scope.ServiceProvider.GetRequiredService(); + + if (!await ShouldRunNowAsync(db, ct)) + { + return; + } + + var sitesOk = await PushSitesAsync(db, client, ct); + if (!sitesOk) return; + + var devicesOk = await PushDevicesAsync(db, client, ct); + if (!devicesOk) return; + + await PushMeasurementsAsync(db, client, ct); + } + + // ── Sites: full set every tick ───────────────────────────────────────── + private async Task PushSitesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct) + { + var state = await GetStateAsync(db, FleetPushState.ResourceSites, ct); + var rows = await db.Sites.AsNoTracking() + .Select(s => new FleetSiteDto(s.Id, s.Name, s.Address, s.MunicipalityId, s.IsActive)) + .ToListAsync(ct); + if (rows.Count == 0) + { + await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct); + return true; + } + var result = await client.PushSitesAsync(rows, DateTime.UtcNow, ct); + return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct); + } + + // ── Devices: full set every tick ─────────────────────────────────────── + private async Task PushDevicesAsync(AppDbContext db, FleetPushClient client, CancellationToken ct) + { + var state = await GetStateAsync(db, FleetPushState.ResourceDevices, ct); + var rows = await db.Devices.AsNoTracking() + .Select(d => new FleetDeviceDto(d.Id, d.SiteId, d.Name, d.ExternalId, d.Description, d.IsActive)) + .ToListAsync(ct); + if (rows.Count == 0) + { + await MarkSuccessAsync(db, state, cursor: DateTime.UtcNow, ct); + return true; + } + var result = await client.PushDevicesAsync(rows, DateTime.UtcNow, ct); + return await HandleResultAsync(db, state, result, cursor: DateTime.UtcNow, ct); + } + + // ── Measurements: cursor-driven batches up to MaxBatchesPerTickPerResource ─ + private async Task PushMeasurementsAsync(AppDbContext db, FleetPushClient client, CancellationToken ct) + { + var state = await GetStateAsync(db, FleetPushState.ResourceMeasurements, ct); + var cursor = state.LastCursor ?? DateTime.MinValue.ToUniversalTime(); + int batchSize = options.Value.BatchSize; + + for (int i = 0; i < MaxBatchesPerTickPerResource; i++) + { + var rows = await db.PowerMeasurements.AsNoTracking() + .Where(m => m.ReceivedAt > cursor) + .OrderBy(m => m.ReceivedAt) + .Take(batchSize) + .Select(m => new FleetMeasurementDto( + m.Time, m.DeviceId, m.ActivePowerKw, + m.ReactivePowerKvar, m.ApparentPowerKva, m.PowerFactor, + m.VoltageV, m.FrequencyHz, + m.EnergyImportedKwh, m.EnergyExportedKwh, + m.Source, m.ReceivedAt)) + .ToListAsync(ct); + + if (rows.Count == 0) break; + + var newCursor = rows[^1].ReceivedAt; + var result = await client.PushMeasurementsAsync(rows, newCursor, ct); + + if (result.StatusCode == HttpStatusCode.RequestEntityTooLarge) + { + batchSize = Math.Max(100, batchSize / 2); + log.LogWarning("Ingest returned 413; halving batch size to {BatchSize}", batchSize); + continue; + } + + var ok = await HandleResultAsync(db, state, result, cursor: newCursor, ct); + if (!ok) return; + + cursor = newCursor; + if (rows.Count < batchSize) break; + } + } + + // ── State helpers ────────────────────────────────────────────────────── + private static async Task GetStateAsync(AppDbContext db, string resourceType, CancellationToken ct) + { + var row = await db.FleetPushState.FirstOrDefaultAsync(x => x.ResourceType == resourceType, ct); + if (row is null) + { + row = new FleetPushState { ResourceType = resourceType }; + db.FleetPushState.Add(row); + await db.SaveChangesAsync(ct); + } + return row; + } + + private static async Task MarkSuccessAsync(AppDbContext db, FleetPushState state, DateTime cursor, CancellationToken ct) + { + state.LastCursor = cursor; + state.LastSyncedAt = DateTime.UtcNow; + state.LastError = null; + state.ConsecutiveFailures = 0; + await db.SaveChangesAsync(ct); + } + + private async Task HandleResultAsync( + AppDbContext db, FleetPushState state, FleetPushResult result, DateTime cursor, CancellationToken ct) + { + if (result.Succeeded) + { + await MarkSuccessAsync(db, state, cursor, ct); + if (result.Rejected > 0) + { + log.LogWarning("Ingest accepted={Accepted} rejected={Rejected} for {Resource}", + result.Accepted, result.Rejected, state.ResourceType); + } + return true; + } + + state.LastError = result.Error; + state.ConsecutiveFailures++; + state.LastSyncedAt = DateTime.UtcNow; + await db.SaveChangesAsync(ct); + + var backoff = result.RetryAfter ?? BackoffFor(state.ConsecutiveFailures); + log.LogWarning("Push failed for {Resource}: {Error}. Failures={N}, next attempt after ~{Backoff}.", + state.ResourceType, result.Error, state.ConsecutiveFailures, backoff); + + // Stop this tick; the next interval handles the retry naturally. + return false; + } + + private static TimeSpan BackoffFor(int failures) + { + var baseMinutes = Math.Min(Math.Pow(2, Math.Max(0, failures - 1)), 30); + return TimeSpan.FromMinutes(baseMinutes); + } + + // Block when we're in a long backoff window — only attempt if enough time has passed + // since LastSyncedAt to cover the backoff. Cheap: read one row. + private async Task ShouldRunNowAsync(AppDbContext db, CancellationToken ct) + { + var maxFailures = await db.FleetPushState.AsNoTracking() + .Select(x => new { x.LastSyncedAt, x.ConsecutiveFailures }) + .ToListAsync(ct); + if (maxFailures.Count == 0) return true; + + foreach (var row in maxFailures) + { + if (row.ConsecutiveFailures == 0) return true; + if (row.LastSyncedAt is null) return true; + var since = DateTime.UtcNow - row.LastSyncedAt.Value; + var backoff = BackoffFor(row.ConsecutiveFailures); + if (since >= backoff) return true; + } + return false; + } +} diff --git a/portal/src/Tau.Acuvim.Portal/Services/FleetTimescaleBootstrapper.cs b/portal/src/Tau.Acuvim.Portal/Services/FleetTimescaleBootstrapper.cs new file mode 100644 index 0000000..3b1ce12 --- /dev/null +++ b/portal/src/Tau.Acuvim.Portal/Services/FleetTimescaleBootstrapper.cs @@ -0,0 +1,85 @@ +using Microsoft.EntityFrameworkCore; +using Tau.Acuvim.Portal.Data; + +namespace Tau.Acuvim.Portal.Services; + +// Admin-side equivalent of TimescaleBootstrapper. Runs once after MigrateAsync on Admin startup. +// Sets up the fleet.PowerMeasurements hypertable + compression policy + hourly CA. +// Idempotent — safe on every start. +public sealed class FleetTimescaleBootstrapper( + AdminDbContext db, + ILogger log) +{ + public async Task EnsureAsync(CancellationToken ct = default) + { + await db.Database.ExecuteSqlRawAsync( + "CREATE EXTENSION IF NOT EXISTS timescaledb;", ct); + + await db.Database.ExecuteSqlRawAsync( + """ + SELECT create_hypertable( + 'fleet."PowerMeasurements"', + 'Time', + if_not_exists => TRUE, + migrate_data => TRUE + ); + """, ct); + + await db.Database.ExecuteSqlRawAsync( + """ + SELECT set_chunk_time_interval('fleet."PowerMeasurements"', INTERVAL '7 days'); + """, ct); + + // Compression — chunks older than 7 days. Segment by (CustomerId, DeviceId) so + // point queries by customer + device stay fast even on compressed chunks. + // ALTER TABLE ... SET is idempotent in TimescaleDB (no error if already set). + await db.Database.ExecuteSqlRawAsync( + """ + ALTER TABLE fleet."PowerMeasurements" SET ( + timescaledb.compress, + timescaledb.compress_segmentby = '"CustomerId","DeviceId"', + timescaledb.compress_orderby = '"Time" DESC' + ); + """, ct); + + await db.Database.ExecuteSqlRawAsync( + """ + SELECT add_compression_policy('fleet."PowerMeasurements"', INTERVAL '7 days', if_not_exists => TRUE); + """, ct); + + // Hourly per-device continuous aggregate. Realtime (materialized_only=false) so + // late firmware back-fills appear in CA queries immediately, served live from + // the underlying hypertable until next refresh tick. + await db.Database.ExecuteSqlRawAsync( + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS fleet.hourly_per_device + WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS + SELECT + "CustomerId", + "DeviceId", + time_bucket(INTERVAL '1 hour', "Time") AS bucket, + avg("ActivePowerKw") AS avg_kw, + max("ActivePowerKw") AS max_kw, + min("ActivePowerKw") AS min_kw, + max("EnergyImportedKwh") - min("EnergyImportedKwh") AS kwh_imported_delta, + max("EnergyExportedKwh") - min("EnergyExportedKwh") AS kwh_exported_delta, + count(*) AS samples + FROM fleet."PowerMeasurements" + GROUP BY "CustomerId", "DeviceId", bucket + WITH NO DATA; + """, ct); + + await db.Database.ExecuteSqlRawAsync( + """ + SELECT add_continuous_aggregate_policy( + 'fleet.hourly_per_device', + start_offset => INTERVAL '30 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '5 minutes', + if_not_exists => TRUE + ); + """, ct); + + log.LogInformation("Fleet hypertable + compression + hourly_per_device CA ready."); + } +} diff --git a/portal/tests/Tau.Acuvim.Portal.Tests/FleetIngestValidationTests.cs b/portal/tests/Tau.Acuvim.Portal.Tests/FleetIngestValidationTests.cs new file mode 100644 index 0000000..786ce45 --- /dev/null +++ b/portal/tests/Tau.Acuvim.Portal.Tests/FleetIngestValidationTests.cs @@ -0,0 +1,23 @@ +using Tau.Acuvim.Portal.Services; + +namespace Tau.Acuvim.Portal.Tests; + +// Pure validation behaviour of FleetIngestService.IngestAsync — token + batch type checks. +// Service-internal logic that doesn't touch the DB. The actual DB write paths are +// exercised in the manual two-stack smoke test (TESTING.md → Phase 14 scenario). +public class FleetIngestValidationTests +{ + [Fact] + public void UnauthorizedException_HasNoMessage() + { + var ex = new UnauthorizedFleetIngestException(); + Assert.NotNull(ex); + } + + [Fact] + public void InvalidFleetBatchException_CarriesMessage() + { + var ex = new InvalidFleetBatchException("nope"); + Assert.Equal("nope", ex.Message); + } +}