Skip to main content

MQTT Ingestion Worker

Real-Time Telemetry Processing Pipeline

Architecture

The MQTT Ingestion Worker (Ampra.MQTT) is a standalone .NET Worker Service that runs independently of the web API. It connects to the EMQX broker, subscribes to all device telemetry topics, and writes normalised data into MongoDB.


Service Registration

The worker registers the following services in Program.cs:

builder.Services.Configure<MqttSettings>(builder.Configuration.GetSection("Mqtt"));
builder.Services.Configure<IngestionSettings>(builder.Configuration.GetSection("Ingestion"));
builder.Services.AddSingleton<IMongoClient>(...);
builder.Services.AddSingleton<IMongoDatabase>(...);
builder.Services.AddSingleton<SourceValidator>();
builder.Services.AddSingleton<MongoIngestionService>();
builder.Services.AddHostedService<Worker>();

All services are singletons — appropriate for a long-running worker with shared state (throttle timers, validation cache, deduplication hashes).


Connection Lifecycle

The Worker class extends BackgroundService and implements a resilient connection loop:

Connection parameters:

ParameterValue
ProtocolTCP (MQTTv5)
Keep-alive30 seconds
Clean sessionYes
Client IDampra-ingestion-worker
Topic filterampra/sources/+/data

Message Processing Pipeline

Every message received on the subscribed topic passes through a multi-stage pipeline:


Source Validation

The SourceValidator performs a PostgreSQL query to verify that the source ID extracted from the MQTT topic corresponds to an active, MQTT-type sun source.

SELECT COUNT(*)
FROM "SunSources"
WHERE "Id" = @id
AND "ConnectionType" = 0 -- MQTT type
AND "IsActive" = true

Caching: Results are cached in a ConcurrentDictionary with a 5-minute TTL. On database failure, the last cached result is returned (stale-on-error).


Data Normalisation

The DataNormalizer is the heart of the ingestion pipeline. It accepts arbitrary JSON and maps it to the canonical NormalizedSunSourceData schema using a comprehensive field alias dictionary.

Field Alias Map

Each canonical field accepts multiple alternative names, enabling compatibility with diverse inverter and monitoring device firmware:

Canonical FieldAccepted Aliases
stateOfChargestate_of_charge, soc, battery_soc, batterysoc
stateOfHealthstate_of_health, soh, battery_soh, batterysoh
batteryVoltagebattery_voltage, bat_voltage, bat_v, batt_voltage
batteryCurrentbattery_current, bat_current, bat_i, batt_current
batteryTemperaturebattery_temperature, bat_temp, battery_temp, batt_temp
batteryCapacitybattery_capacity, bat_capacity, batt_capacity
remainingCapacityremaining_capacity, rem_capacity
solarVoltagesolar_voltage, pv_voltage, pv_v, panel_voltage, mppt_voltage, voltage_v
solarCurrentsolar_current, pv_current, pv_i, panel_current, mppt_current, current_a
solarPowersolar_power, pv_power, pv_w, panel_power, mppt_power, power_w, power
outputVoltageoutput_voltage, out_voltage, ac_voltage, ac_out_voltage
outputCurrentoutput_current, out_current, ac_current, ac_out_current
outputFrequencyoutput_frequency, out_frequency, ac_frequency, frequency_hz, frequency
loadPowerload_power, load_w, consumption, consumption_w
loadPercentageload_percentage, load_pct, load_percent
apparentPowerapparent_power, va_power
gridVoltagegrid_voltage, mains_voltage, utility_voltage
gridCurrentgrid_current, mains_current, utility_current
gridFrequencygrid_frequency, mains_frequency, utility_frequency
totalEnergyProducedtotal_energy_produced, total_energy, total_kwh, energy_total, energy_kwh
totalEnergyConsumedtotal_energy_consumed, total_consumption
dailyEnergyProduceddaily_energy_produced, daily_energy, daily_kwh, energy_today, today_kwh
dailyEnergyConsumeddaily_energy_consumed, daily_consumption
operatingModeoperating_mode, mode, inverter_mode, work_mode
deviceStatusdevice_status, status, inverter_status

Timestamp aliases: timestamp, ts, time, date_time, datetime

Metadata aliases: metadata, meta, extra, custom

Value Range Validation

After alias resolution, numeric values are validated against physical bounds. Out-of-range values are silently dropped (set to null):

FieldMinimumMaximum
stateOfCharge0100
stateOfHealth0100
batteryVoltage0100 V
batteryCurrent−200200 A
batteryTemperature−4080 °C
batteryCapacity0100,000 Wh
remainingCapacity0100,000 Wh
solarVoltage0600 V
solarCurrent0100 A
solarPower0100,000 W
outputVoltage0500 V
outputCurrent0200 A
outputFrequency0100 Hz
loadPower0100,000 W
loadPercentage0100
apparentPower0100,000 VA
gridVoltage0500 V
gridCurrent0200 A
gridFrequency0100 Hz
totalEnergyProduced010,000,000 kWh
totalEnergyConsumed010,000,000 kWh
dailyEnergyProduced010,000 kWh
dailyEnergyConsumed010,000 kWh

NaN and Infinity values are also rejected. String values that can be parsed as valid floats (e.g., "123.45") are automatically coerced.

Meaningful Data Check

A message is considered meaningful if at least one of the following fields is non-null:

  • StateOfCharge, StateOfHealth
  • SolarPower, SolarVoltage, SolarCurrent
  • BatteryVoltage
  • LoadPower
  • GridVoltage
  • OutputVoltage, OutputFrequency
  • TotalEnergyProduced, DailyEnergyProduced
  • OperatingMode, DeviceStatus

Messages with no meaningful fields are discarded as noise.


Quality Controls

Throttling

The MongoIngestionService enforces a per-source throttle (default: 10 seconds). Messages arriving within the throttle window after the last successful insert are silently dropped.

SettingDefaultConfig Key
Throttle interval10 secondsIngestion:ThrottleSeconds

Deduplication

A sliding-window deduplication check prevents identical data points from being stored. The deduplication key is composed of:

{sourceId}:{roundedTimestamp}:{solarPower}:{stateOfCharge}:{batteryVoltage}:{loadPower}

The timestamp is rounded down to the configured window (default: 30 seconds). The in-memory hash cache is pruned when it exceeds 10,000 entries, evicting entries older than 5 minutes.

SettingDefaultConfig Key
Dedup window30 secondsIngestion:DeduplicationWindowSeconds
Max payload65,536 bytesIngestion:MaxPayloadBytes

MongoDB Indexing

On startup, MongoIngestionService ensures a compound index exists:

{ sunSourceId: 1, timestamp: -1 }

This index is created with Background = true and named idx_source_timestamp. It optimises the most common query pattern: fetching recent data for a specific source, ordered by time.


Configuration Reference

MqttSettings

PropertyTypeDefaultDescription
BrokerHoststringlocalhostEMQX broker hostname
BrokerPortint1883MQTT TCP port
ClientIdstringampra-ingestion-workerMQTT client identifier
Usernamestringampra-ingestionBroker authentication username
PasswordstringchangemeBroker authentication password
TopicFilterstringampra/sources/+/dataSubscription topic with wildcard

IngestionSettings

PropertyTypeDefaultDescription
ThrottleSecondsint10Minimum interval between inserts per source
DeduplicationWindowSecondsint30Time window for duplicate detection
MaxPayloadBytesint65536Maximum accepted MQTT payload size (64 KB)

NormalizedSunSourceData Schema

The MongoDB document model stored in the normalized_sun_source_data collection:

FieldBSON TypeNullableDescription
_idObjectIdNoMongoDB auto-generated
sunSourceIdString (GUID)NoForeign key to PostgreSQL
timestampDateTimeNoUTC measurement time
stateOfChargeDoubleYesBattery SoC (0–100%)
stateOfHealthDoubleYesBattery SoH (0–100%)
batteryVoltageDoubleYesBattery voltage (V)
batteryCurrentDoubleYesBattery current (A)
batteryTemperatureDoubleYesBattery temperature (°C)
batteryCapacityDoubleYesTotal battery capacity (Wh)
remainingCapacityDoubleYesRemaining battery capacity (Wh)
solarVoltageDoubleYesPV array voltage (V)
solarCurrentDoubleYesPV array current (A)
solarPowerDoubleYesPV generation power (W)
outputVoltageDoubleYesInverter output voltage (V)
outputCurrentDoubleYesInverter output current (A)
outputFrequencyDoubleYesOutput frequency (Hz)
loadPowerDoubleYesLoad consumption (W)
loadPercentageDoubleYesLoad percentage (0–100%)
apparentPowerDoubleYesApparent power (VA)
gridVoltageDoubleYesGrid voltage (V)
gridCurrentDoubleYesGrid current (A)
gridFrequencyDoubleYesGrid frequency (Hz)
totalEnergyProducedDoubleYesLifetime energy production (kWh)
totalEnergyConsumedDoubleYesLifetime energy consumption (kWh)
dailyEnergyProducedDoubleYesToday's energy production (kWh)
dailyEnergyConsumedDoubleYesToday's energy consumption (kWh)
operatingModeStringYesInverter operating mode
deviceStatusStringYesDevice health status
faultCodesArray<String>YesActive fault codes
warningCodesArray<String>YesActive warning codes
rawDataBsonDocumentYesOriginal unparsed payload
metadataDocumentYesCustom key-value pairs