3333202f3a
1 Commits
| Author | SHA1 | Message | Date | |
|---|---|---|---|---|
|
|
a92b4277ae |
Phase 14: Push + ingest pipeline (end-to-end fleet aggregation)
Customer-stack measurements now flow to the Admin-stack central DB via
HTTPS POST, with firmware buffer-and-replay back-fills handled correctly.
Client side (push)
- monitoring.PowerMeasurements gains ReceivedAt (default NOW()) +
index. Push selects WHERE ReceivedAt > LastCursor, so back-dated
rows from offline-buffer replays are picked up automatically.
- app.FleetPushState table holds per-resource cursors + backoff state.
- FleetPushClient: HttpClient wrapper, X-Customer-Token header,
X-Batch-Type, X-Push-Cursor. 413 returns retry-after halving signal.
- FleetPushService: BackgroundService loop. Per tick: sites (full set),
devices (full set), measurements (cursor-driven up to 3 batches).
Exponential backoff per resource on failure (1m → 30m cap).
Honors 429 Retry-After. Only registered when RunMode=Client AND
FleetIngest__Enabled=true.
Admin side (ingest)
- /api/fleet/ingest: anonymous, X-Customer-Token authed against
fleet.Customers via SHA-256 indexed lookup. 401 on bad token; 400
on bad batch type.
- FleetIngestService dispatches by X-Batch-Type:
sites/devices → upsert by (CustomerId, Id) with ON CONFLICT UPDATE
measurements → bulk INSERT ON CONFLICT (Time, CustomerId, DeviceId)
DO NOTHING (idempotent under re-delivery).
- Updates fleet.Customers.FirstSeenAt/LastSeenAt on each successful batch.
- Writes fleet.IngestEvents audit row per batch (accepted, rejected,
bytes, client cursor, time-spread, error).
- FleetTimescaleBootstrapper runs after MigrateAsync in Admin mode:
CREATE EXTENSION timescaledb, create_hypertable on fleet.PowerMeasurements,
chunk interval 7 days, compression with segmentby=(CustomerId,DeviceId)
+ compress_orderby "Time" DESC, compression policy 7 days, hourly_per_device
continuous aggregate (realtime, materialized_only=false, 30-day start_offset
so back-fills get materialized on next refresh tick).
Wiring
- docker-compose.yml threads Application__RunMode + FleetIngest__* from
.env (defaults safely off) so a single dev host can run two stacks.
- .env.example documents the new vars under their own section.
Tests
- FleetIngestValidationTests (2 new). 53/53 passing.
Verified end-to-end on the dev host
- Client (portal-dev_portal, RunMode=Client, FleetIngest__Enabled=true)
pushes to Admin (portal-admin-test, RunMode=Admin, separate admin_fleet DB)
via container DNS.
- Customer registered on Admin (DEV0001), token captured, dropped into
Client .env, Client restarted, push service started on schedule.
- Ingested measurements (including a 2026-04-01 back-dated sample
simulating firmware replay) all land in fleet.PowerMeasurements with
the correct CustomerId.
- Customer.FirstSeenAt/LastSeenAt update, IngestEvents records every
batch (sites + devices per tick, measurements when cursor advances).
- Hypertable confirmed via timescaledb_information.hypertables;
hourly_per_device CA confirmed via timescaledb_information.continuous_aggregates.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|