Skip to main content

Telemetry & Metrics

The telemetry subsystem handles real-time data ingestion, normalization, storage, aggregation, and retrieval — forming the backbone of Ampra's monitoring capabilities.


Ingestion Pipeline

MQTT Path

Webhook Path

Webhook Processing Implementation

public async Task<(bool Success, string Message)> ProcessWebhookPayloadAsync(
Guid sunSourceId, string payload)
{
// 1. Validate source exists, is Webhook type, and is active
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(ss => ss.Id == sunSourceId
&& ss.ConnectionType == SunSourceConnectionType.Webhook
&& ss.IsActive);
if (sunSource == null) return (false, "Source not found or inactive");

// 2. Parse JSON
var jsonDocument = JsonDocument.Parse(payload);
var root = jsonDocument.RootElement;

// 3. Constant-time secret comparison (prevents timing attacks)
var providedKey = root.GetProperty("key").GetString();
if (!CryptographicOperations.FixedTimeEquals(
Encoding.UTF8.GetBytes(providedKey),
Encoding.UTF8.GetBytes(sunSource.WebhookSecret ?? string.Empty)))
return (false, "Invalid webhook secret");

// 4. Throttle (10s minimum between messages per source)
if (_lastWebhookTimes.TryGetValue(sunSourceId, out var lastTime))
if (DateTime.UtcNow - lastTime < _messageThrottle)
return (true, "Throttled");
_lastWebhookTimes[sunSourceId] = DateTime.UtcNow;

// 5. Strip "key" field from stored data (never persist secrets)
// 6. Normalize and store
var normalizedData = NormalizeJsonPayload(sunSourceId, root, sanitizedPayload);
await StoreNormalizedDataAsync(normalizedData);
return (true, "OK");
}

MQTT Worker Service

The Ampra.MQTT project runs as a .NET BackgroundService:

Worker Implementation

public class Worker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var mqttClient = _mqttFactory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(_brokerHost, _brokerPort)
.WithClientId(_clientId)
.WithCredentials(_username, _password)
.WithCleanSession(true)
.Build();

mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

// Extract sourceId from topic: "ampra/sources/{sourceId}/data"
var sourceId = ExtractSourceId(topic);
if (sourceId == null) return;

// Validate source exists (5-min cache)
if (!await _sourceValidator.ValidateAsync(sourceId.Value)) return;

// Normalize and store
var normalized = DataNormalizer.Normalize(sourceId.Value, payload);
if (normalized == null || !DataNormalizer.HasMeaningfulData(normalized)) return;

await _ingestionService.InsertAsync(normalized);
};

await mqttClient.ConnectAsync(options, stoppingToken);
await mqttClient.SubscribeAsync(_topicFilter, stoppingToken);
}
}

Connection Configuration

SettingDefaultDescription
Mqtt:BrokerHostampra-emqxEMQX broker hostname
Mqtt:BrokerPort1883MQTT TCP port
Mqtt:ClientIdampra-ingestion-workerWorker's client ID
Mqtt:Usernameampra-ingestionSuper-user credentials
Mqtt:TopicFilterampra/sources/+/dataWildcard subscription

Source Validation Caching

To avoid excessive PostgreSQL queries, the worker caches source existence checks:

public class SourceValidator
{
private readonly ConcurrentDictionary<Guid, (bool IsValid, DateTime CachedAt)> _cache = new();
private static readonly TimeSpan CacheTtl = TimeSpan.FromMinutes(5);

public async Task<bool> ValidateAsync(Guid sourceId)
{
if (_cache.TryGetValue(sourceId, out var cached) &&
DateTime.UtcNow - cached.CachedAt < CacheTtl)
return cached.IsValid;

// Raw Npgsql query (bypasses EF for performance)
var isValid = await CheckSourceExistsAsync(sourceId);
_cache[sourceId] = (isValid, DateTime.UtcNow);
return isValid;
}
}

Deduplication & Throttling (MongoIngestionService)

public class MongoIngestionService
{
private readonly ConcurrentDictionary<Guid, DateTime> _lastInsertTimes = new();
private static readonly TimeSpan ThrottleInterval = TimeSpan.FromSeconds(10);

public bool IsThrottled(Guid sourceId)
{
if (_lastInsertTimes.TryGetValue(sourceId, out var lastTime))
return DateTime.UtcNow - lastTime < ThrottleInterval;
return false;
}

public bool IsDuplicate(NormalizedSunSourceData data)
{
// Check for duplicate timestamp + data hash within 30s window
}

public async Task InsertAsync(NormalizedSunSourceData data)
{
if (IsThrottled(data.SunSourceId)) return;
if (IsDuplicate(data)) return;

await _collection.InsertOneAsync(data);
_lastInsertTimes[data.SunSourceId] = DateTime.UtcNow;
}
}

Data Normalization

Canonical Field Mapping

The normalizer accepts JSON payloads with varying field names and maps them to canonical names:

// Input from Device A (Chinese manufacturer)
{
"soc": 78.5,
"pvPower": 3200,
"battTemp": 32.1,
"workMode": "Solar"
}

// Input from Device B (European manufacturer)
{
"stateOfCharge": 78.5,
"solar_power": 3200,
"battery_temperature": 32.1,
"operating_mode": "Solar"
}

// Both normalize to:
{
"stateOfCharge": 78.5,
"solarPower": 3200,
"batteryTemperature": 32.1,
"operatingMode": "Solar"
}

Complete Alias Map

Canonical FieldAliases
stateOfChargesoc, state_of_charge, batteryLevel, battery_level, batteryPercent
stateOfHealthsoh, state_of_health, batteryHealth
batteryVoltagebattery_voltage, battVoltage, batt_voltage
batteryCurrentbattery_current, battCurrent, batt_current
batteryTemperaturebattery_temperature, battTemp, batt_temp
batteryCapacitybattery_capacity, battCapacity
remainingCapacityremaining_capacity, remainCapacity
solarVoltagepv_voltage, pvVoltage, solar_voltage, panelVoltage
solarCurrentpv_current, pvCurrent, solar_current, panelCurrent
solarPowerpvPower, pv_power, solar_power, panelPower, pvWatts
outputVoltageoutput_voltage, acOutputVoltage, ac_output_voltage
outputCurrentoutput_current, acOutputCurrent
outputFrequencyoutput_frequency, acFrequency
loadPowerload_power, acOutputPower, ac_output_power, consumption
loadPercentageload_percentage, loadPercent, load_percent
apparentPowerapparent_power, va
gridVoltagegrid_voltage, mainsVoltage, utilityVoltage
gridCurrentgrid_current, mainsCurrent
gridFrequencygrid_frequency, mainsFrequency
totalEnergyProducedtotal_energy_produced, totalGeneration
totalEnergyConsumedtotal_energy_consumed, totalConsumption
dailyEnergyProduceddaily_energy_produced, todayGeneration, dailyGeneration
dailyEnergyConsumeddaily_energy_consumed, todayConsumption, dailyConsumption
operatingModemode, operating_mode, workMode, work_mode, inverterMode
deviceStatusdevice_status, status, inverterStatus

Metrics Retrieval

Latest Metrics

GET /api/sunsourcemetrics/{sunSourceId}/latest

Returns the most recent telemetry data point for a source. Queries MongoDB sorted by timestamp descending, limited to 1 document.

Metric History

GET /api/sunsourcemetrics/{sunSourceId}/history?startTime=...&endTime=...&limit=100
ParameterDefaultConstraints
startTime24 hours ago
endTimeNow
limit100Clamped to [1, 1000]

Returns raw telemetry data points within the time range, sorted by timestamp descending.

Aggregated Historical Data

GET /api/sunsourcemetrics/{sunSourceId}/aggregated?startTime=...&endTime=...

Returns hourly aggregated data using MongoDB's $group pipeline:

$match (sourceId + time range)
→ $group by hour (average all numeric fields)
→ $sort by timestamp ascending

Maximum range: 90 days.

Daily Summaries

GET /api/sunsourcemetrics/{sunSourceId}/daily-summary?startTime=...&endTime=...

Returns daily statistical summaries. For ranges exceeding 60 days, automatically aggregates to weekly summaries using weighted averages based on sample count.

Maximum range: 400 days.

Auto-Weekly Aggregation

public async Task<IEnumerable<DailySummaryDto>> GetDailySummariesAsync(
Guid sunSourceId, DateTime startTime, DateTime endTime)
{
var data = /* query MongoDB for raw data in range */;
var dailySummaries = BuildDailySummaries(data);

// Auto-aggregate to weekly when range > 60 days
if ((endTime - startTime).TotalDays > 60)
return AggregateIntoWeeks(dailySummaries);

return dailySummaries;
}

private IEnumerable<DailySummaryDto> AggregateIntoWeeks(IEnumerable<DailySummaryDto> dailies)
{
// Group by ISO week
// Weighted average by sample count (preserves statistical accuracy)
// Sum energy totals, preserve min/max extremes
}
Summary FieldCalculation
AverageSolarPowerMean solar power for the period
MaxSolarPowerPeak solar power
AverageStateOfChargeMean battery SOC
MinStateOfChargeLowest battery SOC
AverageLoadPowerMean load consumption
MaxLoadPowerPeak load consumption
TotalEnergyProducedMax dailyEnergyProduced value for the day
TotalEnergyConsumedMax dailyEnergyConsumed value for the day
SampleCountNumber of data points in the period

Batch Metrics

POST /api/sunsourcemetrics/batch/latest
Body: ["guid1", "guid2", ...]

Returns latest metrics for up to 50 sources in a single request. Each source is individually ownership-validated.

Fault Detection

GET /api/sunsourcemetrics/faults

Returns the latest metrics for all sources owned by the user that have active faultCodes or warningCodes.


Data Reports

GET /api/sunsourcedata/report/{sunSourceId}?period=today|yesterday|week|custom&startDateParam=...&endDateParam=...

The report engine generates comprehensive analytics:

Report Structure

{
"source": { "name": "...", "iconUrl": "..." },
"period": "today",
"startDate": "2026-03-09T00:00:00Z",
"endDate": "2026-03-09T23:59:59Z",
"generatedAt": "2026-03-09T15:30:00Z",
"dataPoints": 1440,
"summary": {
"avgSolarPower": 2100.5,
"maxSolarPower": 4800.0,
"avgStateOfCharge": 72.3,
"minStateOfCharge": 35.0,
"avgBatteryVoltage": 51.2,
"avgLoadPower": 1250.0,
"maxLoadPower": 3200.0,
"totalEnergyProduced": 18.5,
"totalEnergyConsumed": 12.2,
"operatingModes": [
{ "mode": "Solar", "count": 720, "percentage": 50.0 },
{ "mode": "Hybrid", "count": 432, "percentage": 30.0 },
{ "mode": "Battery", "count": 288, "percentage": 20.0 }
]
},
"hourlyData": [...]
}

Data Management

Downsampling

POST /api/sunsourcedata/downsample/{sunSourceId}

Compacts historical data to reduce storage:

Data AgeResolution
< 30 daysOriginal (full resolution)
≥ 30 days5-minute buckets (averaged)

The downsampling process:

  1. Queries all data older than 30 days
  2. Groups by 5-minute intervals
  3. Computes averages for all numeric fields
  4. Deletes original documents
  5. Inserts averaged documents

Scheduled automatically every Sunday at 2:00 AM via the DownsampleDataJob Quartz task.

Cleanup

DELETE /api/sunsourcedata/cleanup/{sunSourceId}

Deletes all telemetry data for a source. Returns the count of deleted documents.

Data Statistics

GET /api/sunsourcedata/stats

Returns per-source statistics for all sources owned by the user:

FieldDescription
messageCountTotal telemetry records in MongoDB
lastMessageTimeTimestamp of most recent data point
Latest metrics snapshotSOC, solar power, load power, etc.
GET /api/sunsourcedata/count/{sunSourceId}
GET /api/sunsourcedata/size/{sunSourceId}

Returns the document count and estimated storage size in bytes for a specific source.


Data Export

CSV Export

GET /api/export/{sunSourceId}/csv?startDate=...&endDate=...

Exports telemetry data as a CSV file with all normalized fields as columns. Maximum date range: 33 days. Hard limit: 500,000 records per export.

Excel Export

GET /api/export/{sunSourceId}/excel?startDate=...&endDate=...

Exports telemetry data as an XLSX file with two sheets:

SheetContents
DataAll telemetry records with formatted columns
SummaryAggregated statistics (averages, min/max, totals)

Same limits as CSV export (33 days, 500K records).


Debug Data Generation

Available to Admin and Overseer roles only:

Generate Historical Data

POST /api/sunsourcedata/debug/generate/{sunSourceId}

Creates 30 days of realistic synthetic data (8,640 data points at 5-minute intervals) using:

  • Perlin noise-like functions for smooth power curves
  • Diurnal solar patterns (sunrise to sunset bell curve)
  • Seasonal variation based on current date
  • Realistic battery charge/discharge cycles
  • Random grid outage simulation
  • Operating mode transitions

Debug Live Process

POST /api/sunsourcedata/debug/process/start/{sunSourceId}
POST /api/sunsourcedata/debug/process/stop/{sunSourceId}
GET /api/sunsourcedata/debug/process/status/{sunSourceId}

Starts a background process that generates real-time telemetry data simulating a live solar installation. Uses the same physics model as the historical generator but runs continuously with randomized noise.