Customer-stack measurements now flow to the Admin-stack central DB via
HTTPS POST, with firmware buffer-and-replay back-fills handled correctly.
Client side (push)
- monitoring.PowerMeasurements gains ReceivedAt (default NOW()) +
index. Push selects WHERE ReceivedAt > LastCursor, so back-dated
rows from offline-buffer replays are picked up automatically.
- app.FleetPushState table holds per-resource cursors + backoff state.
- FleetPushClient: HttpClient wrapper, X-Customer-Token header,
X-Batch-Type, X-Push-Cursor. 413 returns retry-after halving signal.
- FleetPushService: BackgroundService loop. Per tick: sites (full set),
devices (full set), measurements (cursor-driven up to 3 batches).
Exponential backoff per resource on failure (1m → 30m cap).
Honors 429 Retry-After. Only registered when RunMode=Client AND
FleetIngest__Enabled=true.
Admin side (ingest)
- /api/fleet/ingest: anonymous, X-Customer-Token authed against
fleet.Customers via SHA-256 indexed lookup. 401 on bad token; 400
on bad batch type.
- FleetIngestService dispatches by X-Batch-Type:
sites/devices → upsert by (CustomerId, Id) with ON CONFLICT UPDATE
measurements → bulk INSERT ON CONFLICT (Time, CustomerId, DeviceId)
DO NOTHING (idempotent under re-delivery).
- Updates fleet.Customers.FirstSeenAt/LastSeenAt on each successful batch.
- Writes fleet.IngestEvents audit row per batch (accepted, rejected,
bytes, client cursor, time-spread, error).
- FleetTimescaleBootstrapper runs after MigrateAsync in Admin mode:
CREATE EXTENSION timescaledb, create_hypertable on fleet.PowerMeasurements,
chunk interval 7 days, compression with segmentby=(CustomerId,DeviceId)
+ compress_orderby "Time" DESC, compression policy 7 days, hourly_per_device
continuous aggregate (realtime, materialized_only=false, 30-day start_offset
so back-fills get materialized on next refresh tick).
Wiring
- docker-compose.yml threads Application__RunMode + FleetIngest__* from
.env (defaults safely off) so a single dev host can run two stacks.
- .env.example documents the new vars under their own section.
Tests
- FleetIngestValidationTests (2 new). 53/53 passing.
Verified end-to-end on the dev host
- Client (portal-dev_portal, RunMode=Client, FleetIngest__Enabled=true)
pushes to Admin (portal-admin-test, RunMode=Admin, separate admin_fleet DB)
via container DNS.
- Customer registered on Admin (DEV0001), token captured, dropped into
Client .env, Client restarted, push service started on schedule.
- Ingested measurements (including a 2026-04-01 back-dated sample
simulating firmware replay) all land in fleet.PowerMeasurements with
the correct CustomerId.
- Customer.FirstSeenAt/LastSeenAt update, IngestEvents records every
batch (sites + devices per tick, measurements when cursor advances).
- Hypertable confirmed via timescaledb_information.hypertables;
hourly_per_device CA confirmed via timescaledb_information.continuous_aggregates.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
64 lines
2.3 KiB
C#
64 lines
2.3 KiB
C#
using System;
|
|
using Microsoft.EntityFrameworkCore.Migrations;
|
|
|
|
#nullable disable
|
|
|
|
namespace Tau.Acuvim.Portal.Migrations
|
|
{
|
|
/// <inheritdoc />
|
|
public partial class AddReceivedAtAndPushState : Migration
|
|
{
|
|
/// <inheritdoc />
|
|
protected override void Up(MigrationBuilder migrationBuilder)
|
|
{
|
|
migrationBuilder.AddColumn<DateTime>(
|
|
name: "ReceivedAt",
|
|
schema: "monitoring",
|
|
table: "PowerMeasurements",
|
|
type: "timestamp with time zone",
|
|
nullable: false,
|
|
defaultValueSql: "NOW()");
|
|
|
|
migrationBuilder.CreateTable(
|
|
name: "FleetPushState",
|
|
schema: "app",
|
|
columns: table => new
|
|
{
|
|
ResourceType = table.Column<string>(type: "character varying(20)", maxLength: 20, nullable: false),
|
|
LastCursor = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
|
|
LastSyncedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
|
|
LastError = table.Column<string>(type: "character varying(500)", maxLength: 500, nullable: true),
|
|
ConsecutiveFailures = table.Column<int>(type: "integer", nullable: false)
|
|
},
|
|
constraints: table =>
|
|
{
|
|
table.PrimaryKey("PK_FleetPushState", x => x.ResourceType);
|
|
});
|
|
|
|
migrationBuilder.CreateIndex(
|
|
name: "IX_PowerMeasurements_ReceivedAt",
|
|
schema: "monitoring",
|
|
table: "PowerMeasurements",
|
|
column: "ReceivedAt");
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void Down(MigrationBuilder migrationBuilder)
|
|
{
|
|
migrationBuilder.DropTable(
|
|
name: "FleetPushState",
|
|
schema: "app");
|
|
|
|
migrationBuilder.DropIndex(
|
|
name: "IX_PowerMeasurements_ReceivedAt",
|
|
schema: "monitoring",
|
|
table: "PowerMeasurements");
|
|
|
|
migrationBuilder.DropColumn(
|
|
name: "ReceivedAt",
|
|
schema: "monitoring",
|
|
table: "PowerMeasurements");
|
|
}
|
|
}
|
|
}
|