Telemetry & Metrics
The telemetry subsystem handles real-time data ingestion, normalization, storage, aggregation, and retrieval — forming the backbone of Ampra's monitoring capabilities.
Ingestion Pipeline
MQTT Path
Webhook Path
Webhook Processing Implementation
public async Task<(bool Success, string Message)> ProcessWebhookPayloadAsync(
Guid sunSourceId, string payload)
{
// 1. Validate source exists, is Webhook type, and is active
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(ss => ss.Id == sunSourceId
&& ss.ConnectionType == SunSourceConnectionType.Webhook
&& ss.IsActive);
if (sunSource == null) return (false, "Source not found or inactive");
// 2. Parse JSON
var jsonDocument = JsonDocument.Parse(payload);
var root = jsonDocument.RootElement;
// 3. Constant-time secret comparison (prevents timing attacks)
var providedKey = root.GetProperty("key").GetString();
if (!CryptographicOperations.FixedTimeEquals(
Encoding.UTF8.GetBytes(providedKey),
Encoding.UTF8.GetBytes(sunSource.WebhookSecret ?? string.Empty)))
return (false, "Invalid webhook secret");
// 4. Throttle (10s minimum between messages per source)
if (_lastWebhookTimes.TryGetValue(sunSourceId, out var lastTime))
if (DateTime.UtcNow - lastTime < _messageThrottle)
return (true, "Throttled");
_lastWebhookTimes[sunSourceId] = DateTime.UtcNow;
// 5. Strip "key" field from stored data (never persist secrets)
// 6. Normalize and store
var normalizedData = NormalizeJsonPayload(sunSourceId, root, sanitizedPayload);
await StoreNormalizedDataAsync(normalizedData);
return (true, "OK");
}
MQTT Worker Service
The Ampra.MQTT project runs as a .NET BackgroundService:
Worker Implementation
public class Worker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var mqttClient = _mqttFactory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(_brokerHost, _brokerPort)
.WithClientId(_clientId)
.WithCredentials(_username, _password)
.WithCleanSession(true)
.Build();
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
// Extract sourceId from topic: "ampra/sources/{sourceId}/data"
var sourceId = ExtractSourceId(topic);
if (sourceId == null) return;
// Validate source exists (5-min cache)
if (!await _sourceValidator.ValidateAsync(sourceId.Value)) return;
// Normalize and store
var normalized = DataNormalizer.Normalize(sourceId.Value, payload);
if (normalized == null || !DataNormalizer.HasMeaningfulData(normalized)) return;
await _ingestionService.InsertAsync(normalized);
};
await mqttClient.ConnectAsync(options, stoppingToken);
await mqttClient.SubscribeAsync(_topicFilter, stoppingToken);
}
}
Connection Configuration
| Setting | Default | Description |
|---|---|---|
Mqtt:BrokerHost | ampra-emqx | EMQX broker hostname |
Mqtt:BrokerPort | 1883 | MQTT TCP port |
Mqtt:ClientId | ampra-ingestion-worker | Worker's client ID |
Mqtt:Username | ampra-ingestion | Super-user credentials |
Mqtt:TopicFilter | ampra/sources/+/data | Wildcard subscription |
Source Validation Caching
To avoid excessive PostgreSQL queries, the worker caches source existence checks:
public class SourceValidator
{
private readonly ConcurrentDictionary<Guid, (bool IsValid, DateTime CachedAt)> _cache = new();
private static readonly TimeSpan CacheTtl = TimeSpan.FromMinutes(5);
public async Task<bool> ValidateAsync(Guid sourceId)
{
if (_cache.TryGetValue(sourceId, out var cached) &&
DateTime.UtcNow - cached.CachedAt < CacheTtl)
return cached.IsValid;
// Raw Npgsql query (bypasses EF for performance)
var isValid = await CheckSourceExistsAsync(sourceId);
_cache[sourceId] = (isValid, DateTime.UtcNow);
return isValid;
}
}
Deduplication & Throttling (MongoIngestionService)
public class MongoIngestionService
{
private readonly ConcurrentDictionary<Guid, DateTime> _lastInsertTimes = new();
private static readonly TimeSpan ThrottleInterval = TimeSpan.FromSeconds(10);
public bool IsThrottled(Guid sourceId)
{
if (_lastInsertTimes.TryGetValue(sourceId, out var lastTime))
return DateTime.UtcNow - lastTime < ThrottleInterval;
return false;
}
public bool IsDuplicate(NormalizedSunSourceData data)
{
// Check for duplicate timestamp + data hash within 30s window
}
public async Task InsertAsync(NormalizedSunSourceData data)
{
if (IsThrottled(data.SunSourceId)) return;
if (IsDuplicate(data)) return;
await _collection.InsertOneAsync(data);
_lastInsertTimes[data.SunSourceId] = DateTime.UtcNow;
}
}
Data Normalization
Canonical Field Mapping
The normalizer accepts JSON payloads with varying field names and maps them to canonical names:
// Input from Device A (Chinese manufacturer)
{
"soc": 78.5,
"pvPower": 3200,
"battTemp": 32.1,
"workMode": "Solar"
}
// Input from Device B (European manufacturer)
{
"stateOfCharge": 78.5,
"solar_power": 3200,
"battery_temperature": 32.1,
"operating_mode": "Solar"
}
// Both normalize to:
{
"stateOfCharge": 78.5,
"solarPower": 3200,
"batteryTemperature": 32.1,
"operatingMode": "Solar"
}
Complete Alias Map
| Canonical Field | Aliases |
|---|---|
stateOfCharge | soc, state_of_charge, batteryLevel, battery_level, batteryPercent |
stateOfHealth | soh, state_of_health, batteryHealth |
batteryVoltage | battery_voltage, battVoltage, batt_voltage |
batteryCurrent | battery_current, battCurrent, batt_current |
batteryTemperature | battery_temperature, battTemp, batt_temp |
batteryCapacity | battery_capacity, battCapacity |
remainingCapacity | remaining_capacity, remainCapacity |
solarVoltage | pv_voltage, pvVoltage, solar_voltage, panelVoltage |
solarCurrent | pv_current, pvCurrent, solar_current, panelCurrent |
solarPower | pvPower, pv_power, solar_power, panelPower, pvWatts |
outputVoltage | output_voltage, acOutputVoltage, ac_output_voltage |
outputCurrent | output_current, acOutputCurrent |
outputFrequency | output_frequency, acFrequency |
loadPower | load_power, acOutputPower, ac_output_power, consumption |
loadPercentage | load_percentage, loadPercent, load_percent |
apparentPower | apparent_power, va |
gridVoltage | grid_voltage, mainsVoltage, utilityVoltage |
gridCurrent | grid_current, mainsCurrent |
gridFrequency | grid_frequency, mainsFrequency |
totalEnergyProduced | total_energy_produced, totalGeneration |
totalEnergyConsumed | total_energy_consumed, totalConsumption |
dailyEnergyProduced | daily_energy_produced, todayGeneration, dailyGeneration |
dailyEnergyConsumed | daily_energy_consumed, todayConsumption, dailyConsumption |
operatingMode | mode, operating_mode, workMode, work_mode, inverterMode |
deviceStatus | device_status, status, inverterStatus |
Metrics Retrieval
Latest Metrics
GET /api/sunsourcemetrics/{sunSourceId}/latest
Returns the most recent telemetry data point for a source. Queries MongoDB sorted by timestamp descending, limited to 1 document.
Metric History
GET /api/sunsourcemetrics/{sunSourceId}/history?startTime=...&endTime=...&limit=100
| Parameter | Default | Constraints |
|---|---|---|
startTime | 24 hours ago | — |
endTime | Now | — |
limit | 100 | Clamped to [1, 1000] |
Returns raw telemetry data points within the time range, sorted by timestamp descending.
Aggregated Historical Data
GET /api/sunsourcemetrics/{sunSourceId}/aggregated?startTime=...&endTime=...
Returns hourly aggregated data using MongoDB's $group pipeline:
$match (sourceId + time range)
→ $group by hour (average all numeric fields)
→ $sort by timestamp ascending
Maximum range: 90 days.
Daily Summaries
GET /api/sunsourcemetrics/{sunSourceId}/daily-summary?startTime=...&endTime=...
Returns daily statistical summaries. For ranges exceeding 60 days, automatically aggregates to weekly summaries using weighted averages based on sample count.
Maximum range: 400 days.
Auto-Weekly Aggregation
public async Task<IEnumerable<DailySummaryDto>> GetDailySummariesAsync(
Guid sunSourceId, DateTime startTime, DateTime endTime)
{
var data = /* query MongoDB for raw data in range */;
var dailySummaries = BuildDailySummaries(data);
// Auto-aggregate to weekly when range > 60 days
if ((endTime - startTime).TotalDays > 60)
return AggregateIntoWeeks(dailySummaries);
return dailySummaries;
}
private IEnumerable<DailySummaryDto> AggregateIntoWeeks(IEnumerable<DailySummaryDto> dailies)
{
// Group by ISO week
// Weighted average by sample count (preserves statistical accuracy)
// Sum energy totals, preserve min/max extremes
}
| Summary Field | Calculation |
|---|---|
AverageSolarPower | Mean solar power for the period |
MaxSolarPower | Peak solar power |
AverageStateOfCharge | Mean battery SOC |
MinStateOfCharge | Lowest battery SOC |
AverageLoadPower | Mean load consumption |
MaxLoadPower | Peak load consumption |
TotalEnergyProduced | Max dailyEnergyProduced value for the day |
TotalEnergyConsumed | Max dailyEnergyConsumed value for the day |
SampleCount | Number of data points in the period |
Batch Metrics
POST /api/sunsourcemetrics/batch/latest
Body: ["guid1", "guid2", ...]
Returns latest metrics for up to 50 sources in a single request. Each source is individually ownership-validated.
Fault Detection
GET /api/sunsourcemetrics/faults
Returns the latest metrics for all sources owned by the user that have active faultCodes or warningCodes.
Data Reports
GET /api/sunsourcedata/report/{sunSourceId}?period=today|yesterday|week|custom&startDateParam=...&endDateParam=...
The report engine generates comprehensive analytics:
Report Structure
{
"source": { "name": "...", "iconUrl": "..." },
"period": "today",
"startDate": "2026-03-09T00:00:00Z",
"endDate": "2026-03-09T23:59:59Z",
"generatedAt": "2026-03-09T15:30:00Z",
"dataPoints": 1440,
"summary": {
"avgSolarPower": 2100.5,
"maxSolarPower": 4800.0,
"avgStateOfCharge": 72.3,
"minStateOfCharge": 35.0,
"avgBatteryVoltage": 51.2,
"avgLoadPower": 1250.0,
"maxLoadPower": 3200.0,
"totalEnergyProduced": 18.5,
"totalEnergyConsumed": 12.2,
"operatingModes": [
{ "mode": "Solar", "count": 720, "percentage": 50.0 },
{ "mode": "Hybrid", "count": 432, "percentage": 30.0 },
{ "mode": "Battery", "count": 288, "percentage": 20.0 }
]
},
"hourlyData": [...]
}
Data Management
Downsampling
POST /api/sunsourcedata/downsample/{sunSourceId}
Compacts historical data to reduce storage:
| Data Age | Resolution |
|---|---|
| < 30 days | Original (full resolution) |
| ≥ 30 days | 5-minute buckets (averaged) |
The downsampling process:
- Queries all data older than 30 days
- Groups by 5-minute intervals
- Computes averages for all numeric fields
- Deletes original documents
- Inserts averaged documents
Scheduled automatically every Sunday at 2:00 AM via the DownsampleDataJob Quartz task.
Cleanup
DELETE /api/sunsourcedata/cleanup/{sunSourceId}
Deletes all telemetry data for a source. Returns the count of deleted documents.
Data Statistics
GET /api/sunsourcedata/stats
Returns per-source statistics for all sources owned by the user:
| Field | Description |
|---|---|
messageCount | Total telemetry records in MongoDB |
lastMessageTime | Timestamp of most recent data point |
| Latest metrics snapshot | SOC, solar power, load power, etc. |
GET /api/sunsourcedata/count/{sunSourceId}
GET /api/sunsourcedata/size/{sunSourceId}
Returns the document count and estimated storage size in bytes for a specific source.
Data Export
CSV Export
GET /api/export/{sunSourceId}/csv?startDate=...&endDate=...
Exports telemetry data as a CSV file with all normalized fields as columns. Maximum date range: 33 days. Hard limit: 500,000 records per export.
Excel Export
GET /api/export/{sunSourceId}/excel?startDate=...&endDate=...
Exports telemetry data as an XLSX file with two sheets:
| Sheet | Contents |
|---|---|
| Data | All telemetry records with formatted columns |
| Summary | Aggregated statistics (averages, min/max, totals) |
Same limits as CSV export (33 days, 500K records).
Debug Data Generation
Available to Admin and Overseer roles only:
Generate Historical Data
POST /api/sunsourcedata/debug/generate/{sunSourceId}
Creates 30 days of realistic synthetic data (8,640 data points at 5-minute intervals) using:
- Perlin noise-like functions for smooth power curves
- Diurnal solar patterns (sunrise to sunset bell curve)
- Seasonal variation based on current date
- Realistic battery charge/discharge cycles
- Random grid outage simulation
- Operating mode transitions
Debug Live Process
POST /api/sunsourcedata/debug/process/start/{sunSourceId}
POST /api/sunsourcedata/debug/process/stop/{sunSourceId}
GET /api/sunsourcedata/debug/process/status/{sunSourceId}
Starts a background process that generates real-time telemetry data simulating a live solar installation. Uses the same physics model as the historical generator but runs continuously with randomized noise.