MQTT Ingestion Worker
Architecture
The MQTT Ingestion Worker (Ampra.MQTT) is a standalone .NET Worker Service that runs independently of the web API. It connects to the EMQX broker, subscribes to all device telemetry topics, and writes normalised data into MongoDB.
Service Registration
The worker registers the following services in Program.cs:
builder.Services.Configure<MqttSettings>(builder.Configuration.GetSection("Mqtt"));
builder.Services.Configure<IngestionSettings>(builder.Configuration.GetSection("Ingestion"));
builder.Services.AddSingleton<IMongoClient>(...);
builder.Services.AddSingleton<IMongoDatabase>(...);
builder.Services.AddSingleton<SourceValidator>();
builder.Services.AddSingleton<MongoIngestionService>();
builder.Services.AddHostedService<Worker>();
All services are singletons — appropriate for a long-running worker with shared state (throttle timers, validation cache, deduplication hashes).
Connection Lifecycle
The Worker class extends BackgroundService and implements a resilient connection loop:
Connection parameters:
| Parameter | Value |
|---|---|
| Protocol | TCP (MQTTv5) |
| Keep-alive | 30 seconds |
| Clean session | Yes |
| Client ID | ampra-ingestion-worker |
| Topic filter | ampra/sources/+/data |
Message Processing Pipeline
Every message received on the subscribed topic passes through a multi-stage pipeline:
Source Validation
The SourceValidator performs a PostgreSQL query to verify that the source ID extracted from the MQTT topic corresponds to an active, MQTT-type sun source.
SELECT COUNT(*)
FROM "SunSources"
WHERE "Id" = @id
AND "ConnectionType" = 0 -- MQTT type
AND "IsActive" = true
Caching: Results are cached in a ConcurrentDictionary with a 5-minute TTL. On database failure, the last cached result is returned (stale-on-error).
Data Normalisation
The DataNormalizer is the heart of the ingestion pipeline. It accepts arbitrary JSON and maps it to the canonical NormalizedSunSourceData schema using a comprehensive field alias dictionary.
Field Alias Map
Each canonical field accepts multiple alternative names, enabling compatibility with diverse inverter and monitoring device firmware:
| Canonical Field | Accepted Aliases |
|---|---|
stateOfCharge | state_of_charge, soc, battery_soc, batterysoc |
stateOfHealth | state_of_health, soh, battery_soh, batterysoh |
batteryVoltage | battery_voltage, bat_voltage, bat_v, batt_voltage |
batteryCurrent | battery_current, bat_current, bat_i, batt_current |
batteryTemperature | battery_temperature, bat_temp, battery_temp, batt_temp |
batteryCapacity | battery_capacity, bat_capacity, batt_capacity |
remainingCapacity | remaining_capacity, rem_capacity |
solarVoltage | solar_voltage, pv_voltage, pv_v, panel_voltage, mppt_voltage, voltage_v |
solarCurrent | solar_current, pv_current, pv_i, panel_current, mppt_current, current_a |
solarPower | solar_power, pv_power, pv_w, panel_power, mppt_power, power_w, power |
outputVoltage | output_voltage, out_voltage, ac_voltage, ac_out_voltage |
outputCurrent | output_current, out_current, ac_current, ac_out_current |
outputFrequency | output_frequency, out_frequency, ac_frequency, frequency_hz, frequency |
loadPower | load_power, load_w, consumption, consumption_w |
loadPercentage | load_percentage, load_pct, load_percent |
apparentPower | apparent_power, va_power |
gridVoltage | grid_voltage, mains_voltage, utility_voltage |
gridCurrent | grid_current, mains_current, utility_current |
gridFrequency | grid_frequency, mains_frequency, utility_frequency |
totalEnergyProduced | total_energy_produced, total_energy, total_kwh, energy_total, energy_kwh |
totalEnergyConsumed | total_energy_consumed, total_consumption |
dailyEnergyProduced | daily_energy_produced, daily_energy, daily_kwh, energy_today, today_kwh |
dailyEnergyConsumed | daily_energy_consumed, daily_consumption |
operatingMode | operating_mode, mode, inverter_mode, work_mode |
deviceStatus | device_status, status, inverter_status |
Timestamp aliases: timestamp, ts, time, date_time, datetime
Metadata aliases: metadata, meta, extra, custom
Value Range Validation
After alias resolution, numeric values are validated against physical bounds. Out-of-range values are silently dropped (set to null):
| Field | Minimum | Maximum |
|---|---|---|
stateOfCharge | 0 | 100 |
stateOfHealth | 0 | 100 |
batteryVoltage | 0 | 100 V |
batteryCurrent | −200 | 200 A |
batteryTemperature | −40 | 80 °C |
batteryCapacity | 0 | 100,000 Wh |
remainingCapacity | 0 | 100,000 Wh |
solarVoltage | 0 | 600 V |
solarCurrent | 0 | 100 A |
solarPower | 0 | 100,000 W |
outputVoltage | 0 | 500 V |
outputCurrent | 0 | 200 A |
outputFrequency | 0 | 100 Hz |
loadPower | 0 | 100,000 W |
loadPercentage | 0 | 100 |
apparentPower | 0 | 100,000 VA |
gridVoltage | 0 | 500 V |
gridCurrent | 0 | 200 A |
gridFrequency | 0 | 100 Hz |
totalEnergyProduced | 0 | 10,000,000 kWh |
totalEnergyConsumed | 0 | 10,000,000 kWh |
dailyEnergyProduced | 0 | 10,000 kWh |
dailyEnergyConsumed | 0 | 10,000 kWh |
NaN and Infinity values are also rejected. String values that can be parsed as valid floats (e.g., "123.45") are automatically coerced.
Meaningful Data Check
A message is considered meaningful if at least one of the following fields is non-null:
StateOfCharge,StateOfHealthSolarPower,SolarVoltage,SolarCurrentBatteryVoltageLoadPowerGridVoltageOutputVoltage,OutputFrequencyTotalEnergyProduced,DailyEnergyProducedOperatingMode,DeviceStatus
Messages with no meaningful fields are discarded as noise.
Quality Controls
Throttling
The MongoIngestionService enforces a per-source throttle (default: 10 seconds). Messages arriving within the throttle window after the last successful insert are silently dropped.
| Setting | Default | Config Key |
|---|---|---|
| Throttle interval | 10 seconds | Ingestion:ThrottleSeconds |
Deduplication
A sliding-window deduplication check prevents identical data points from being stored. The deduplication key is composed of:
{sourceId}:{roundedTimestamp}:{solarPower}:{stateOfCharge}:{batteryVoltage}:{loadPower}
The timestamp is rounded down to the configured window (default: 30 seconds). The in-memory hash cache is pruned when it exceeds 10,000 entries, evicting entries older than 5 minutes.
| Setting | Default | Config Key |
|---|---|---|
| Dedup window | 30 seconds | Ingestion:DeduplicationWindowSeconds |
| Max payload | 65,536 bytes | Ingestion:MaxPayloadBytes |
MongoDB Indexing
On startup, MongoIngestionService ensures a compound index exists:
{ sunSourceId: 1, timestamp: -1 }
This index is created with Background = true and named idx_source_timestamp. It optimises the most common query pattern: fetching recent data for a specific source, ordered by time.
Configuration Reference
MqttSettings
| Property | Type | Default | Description |
|---|---|---|---|
BrokerHost | string | localhost | EMQX broker hostname |
BrokerPort | int | 1883 | MQTT TCP port |
ClientId | string | ampra-ingestion-worker | MQTT client identifier |
Username | string | ampra-ingestion | Broker authentication username |
Password | string | changeme | Broker authentication password |
TopicFilter | string | ampra/sources/+/data | Subscription topic with wildcard |
IngestionSettings
| Property | Type | Default | Description |
|---|---|---|---|
ThrottleSeconds | int | 10 | Minimum interval between inserts per source |
DeduplicationWindowSeconds | int | 30 | Time window for duplicate detection |
MaxPayloadBytes | int | 65536 | Maximum accepted MQTT payload size (64 KB) |
NormalizedSunSourceData Schema
The MongoDB document model stored in the normalized_sun_source_data collection:
| Field | BSON Type | Nullable | Description |
|---|---|---|---|
_id | ObjectId | No | MongoDB auto-generated |
sunSourceId | String (GUID) | No | Foreign key to PostgreSQL |
timestamp | DateTime | No | UTC measurement time |
stateOfCharge | Double | Yes | Battery SoC (0–100%) |
stateOfHealth | Double | Yes | Battery SoH (0–100%) |
batteryVoltage | Double | Yes | Battery voltage (V) |
batteryCurrent | Double | Yes | Battery current (A) |
batteryTemperature | Double | Yes | Battery temperature (°C) |
batteryCapacity | Double | Yes | Total battery capacity (Wh) |
remainingCapacity | Double | Yes | Remaining battery capacity (Wh) |
solarVoltage | Double | Yes | PV array voltage (V) |
solarCurrent | Double | Yes | PV array current (A) |
solarPower | Double | Yes | PV generation power (W) |
outputVoltage | Double | Yes | Inverter output voltage (V) |
outputCurrent | Double | Yes | Inverter output current (A) |
outputFrequency | Double | Yes | Output frequency (Hz) |
loadPower | Double | Yes | Load consumption (W) |
loadPercentage | Double | Yes | Load percentage (0–100%) |
apparentPower | Double | Yes | Apparent power (VA) |
gridVoltage | Double | Yes | Grid voltage (V) |
gridCurrent | Double | Yes | Grid current (A) |
gridFrequency | Double | Yes | Grid frequency (Hz) |
totalEnergyProduced | Double | Yes | Lifetime energy production (kWh) |
totalEnergyConsumed | Double | Yes | Lifetime energy consumption (kWh) |
dailyEnergyProduced | Double | Yes | Today's energy production (kWh) |
dailyEnergyConsumed | Double | Yes | Today's energy consumption (kWh) |
operatingMode | String | Yes | Inverter operating mode |
deviceStatus | String | Yes | Device health status |
faultCodes | Array<String> | Yes | Active fault codes |
warningCodes | Array<String> | Yes | Active warning codes |
rawData | BsonDocument | Yes | Original unparsed payload |
metadata | Document | Yes | Custom key-value pairs |