Skip to main content

ML Predictions & Forecasting

The ML subsystem provides 7-day energy production, consumption, and battery state-of-charge forecasts using a physics-aware hybrid approach that blends gradient-boosted tree models with solar physics calculations.


Architecture

ML Service (Flask)

EndpointMethodPurpose
/trainPOSTTrain an XGBoost model on historical data
/predictPOSTGenerate 7-day forecast using trained model

The ML service runs as a standalone Python microservice with Gunicorn (2 workers, 600s timeout).


Training Pipeline

Trigger

POST /api/predictions/train
Body: { "sunSourceId": "..." }

StartTrainingAsync Implementation

The training flow uses Redis as a concurrency guard and fires the ML call asynchronously:

public async Task<PredictionResultDto> StartTrainingAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);

if (sunSource == null)
throw new NotFoundException("Sun source not found");

// Redis concurrency guard — only one training job per source at a time
var db = _redis.GetDatabase();
var activeKey = RedisKeys.MlActive(sunSourceId, "train");
// Key format: "ampra:ml:active:{sunSourceId}:train"
if (await db.KeyExistsAsync(activeKey))
{
return new PredictionResultDto
{
JobId = (await db.StringGetAsync(activeKey)).ToString(),
Status = "already_running"
};
}

var jobId = Guid.NewGuid().ToString();

// Reserve the active slot (30-min TTL as safety net)
await db.StringSetAsync(activeKey, jobId, TimeSpan.FromMinutes(30));

// Store initial job status in Redis (24h TTL)
var initialStatus = JsonSerializer.Serialize(new
{
jobId, status = "queued", progress = 0,
sunSourceId = sunSourceId.ToString(), type = "train",
updatedAt = DateTime.UtcNow.ToString("o"),
message = "Starting training..."
});
await db.StringSetAsync(RedisKeys.MlJob(jobId), initialStatus, JobStatusTtl);

// Pre-fetch weather data for the ML service
var weatherHistory = await BuildWeatherDataAsync(sunSourceId, sunSource);

// Fire-and-forget: POST to ML /train endpoint
_ = Task.Run(async () =>
{
try
{
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId,
latitude = sunSource.Latitude, longitude = sunSource.Longitude,
installedCapacityWatts = sunSource.InstalledCapacityWatts,
weatherHistory
});

var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
var response = await _httpClient.PostAsync(MlEndpoints.Train, content);
}
catch (Exception ex)
{
// On failure, update Redis job status so the client sees the error
var errorStatus = JsonSerializer.Serialize(new
{
jobId, status = "failed", progress = 0,
sunSourceId = sunSourceId.ToString(), type = "train",
updatedAt = DateTime.UtcNow.ToString("o"),
error = "ML service is temporarily unavailable"
});
await db.StringSetAsync(RedisKeys.MlJob(jobId), errorStatus, JobStatusTtl);
}
finally
{
// Always release the active slot
await db.KeyDeleteAsync(RedisKeys.MlActive(sunSourceId, "train"));
}
});

return new PredictionResultDto { JobId = jobId, Status = "queued" };
}

Key design decisions:

DecisionRationale
Redis active key with 30-min TTLPrevents duplicate concurrent jobs; TTL acts as a safety net if the process crashes
Fire-and-forget via Task.RunClient gets immediate response; ML training can take minutes
Weather data pre-fetchedEnsures fresh weather data is available before the ML call
Error status written to RedisClient polling sees the failure instead of hanging indefinitely

ML Training Process (Python)

Feature Engineering Details

CategoryFeaturesDescription
Time Cyclicalshour_sin, hour_cos, month_sin, month_cos, day_of_year_sin, day_of_year_cosCircular encoding of temporal features
Solar Physicssolar_elevation, solar_azimuth, clear_sky_ghi, extraterrestrial_radiationCalculated from latitude/longitude and timestamp
Weathertemperature, shortwave_radiation, uv_index, precipitation, weather_codeFrom Open-Meteo forecast data
Historical Profilesavg_solar_by_hour, avg_load_by_hour, avg_soc_by_hourAverage values for each hour of day from historical data
Rolling Statisticssolar_power_1h, solar_power_3h, load_power_1h, soc_1hRolling mean windows
Lag Featuressolar_power_lag_1, load_power_lag_1, soc_lag_1Previous timestep values

Prediction Pipeline

Trigger

POST /api/predictions/predict
Body: { "sunSourceId": "..." }

StartPredictionAsync Implementation

The prediction flow follows the same pattern as training — Redis guard, fire-and-forget, status tracking:

public async Task<PredictionResultDto> StartPredictionAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);

if (sunSource == null)
throw new NotFoundException("Sun source not found");

var db = _redis.GetDatabase();
var activeKey = RedisKeys.MlActive(sunSourceId, "predict");
// Key: "ampra:ml:active:{sunSourceId}:predict"
if (await db.KeyExistsAsync(activeKey))
{
return new PredictionResultDto
{
JobId = (await db.StringGetAsync(activeKey)).ToString(),
Status = "already_running"
};
}

var jobId = Guid.NewGuid().ToString();

// Reserve active slot (10-min TTL — predictions are faster than training)
await db.StringSetAsync(activeKey, jobId, TimeSpan.FromMinutes(10));
await db.StringSetAsync(RedisKeys.MlJob(jobId), /* initial status JSON */, JobStatusTtl);

var weatherForecast = await BuildWeatherDataAsync(sunSourceId, sunSource);

_ = Task.Run(async () =>
{
try
{
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId,
weatherForecast
});
var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
await _httpClient.PostAsync(MlEndpoints.Predict, content);
}
catch (Exception ex) { /* Write failed status to Redis */ }
finally { await db.KeyDeleteAsync(activeKey); }
});

return new PredictionResultDto { JobId = jobId, Status = "queued" };
}

Note: The prediction active key uses a 10-minute TTL (vs 30 minutes for training) because predictions are significantly faster.

Physics-Aware Hybrid Approach

The prediction engine does not rely solely on ML output. Instead, it uses a physics-aware blending strategy:

Why Physics-Aware?

ProblemML-Only IssuePhysics-Aware Solution
SOC driftCumulative prediction errors compound over 7 daysEnergy-balance simulation respects conservation of energy
Night productionML may predict non-zero solar at nightClear-sky GHI is zero after sunset
Extreme weatherLimited training data for rare eventsCloud attenuation from weather codes provides physical bounds
New installationsInsufficient historical dataPhysics model provides baseline even with no training data

Job Tracking

All ML jobs are tracked in Redis with a 24-hour TTL:

Redis Key Structure

// Ampra.Core.Constants.RedisKeys
public static class RedisKeys
{
public const string MlJobPrefix = "ampra:ml:job:";
public const string MlActivePrefix = "ampra:ml:active:";

public static string MlJob(string jobId) => $"{MlJobPrefix}{jobId}";
public static string MlActive(Guid sunSourceId, string type)
=> $"{MlActivePrefix}{sunSourceId}:{type}";
}
Key PatternPurposeTTL
ampra:ml:job:{jobId}Full job status JSON24 hours
ampra:ml:active:{sourceId}:trainPrevents duplicate training jobs30 minutes
ampra:ml:active:{sourceId}:predictPrevents duplicate prediction jobs10 minutes

Job Status Lifecycle

Polling

GET /api/predictions/status/{jobId}

GetJobStatusAsync Implementation

public async Task<PredictionJobDto?> GetJobStatusAsync(string jobId, string userId)
{
var db = _redis.GetDatabase();
var data = await db.StringGetAsync(RedisKeys.MlJob(jobId));

if (data.IsNullOrEmpty)
return null;

var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var job = JsonSerializer.Deserialize<PredictionJobDto>(data.ToString(), options);

if (job == null) return null;

// Ownership check: user must own the source this job belongs to
var ownsSource = await _context.SunSources
.AnyAsync(s => s.Id == job.SunSourceId && s.UserId == userId);

if (!ownsSource) return null;

return job;
}

Returns the current job state:

{
"jobId": "abc-123",
"status": "completed",
"progress": 100,
"message": "Prediction complete",
"type": "predict",
"sunSourceId": "...",
"updatedAt": "2026-03-09T15:30:00Z",
"predictions": [...],
"metrics": {
"solar_mae": 150.2,
"solar_rmse": 210.5,
"solar_r2": 0.92,
"load_mae": 85.3,
"load_rmse": 120.1,
"load_r2": 0.88
},
"dataPointsUsed": 8640
}

Prediction Results

Latest Prediction

GET /api/predictions/latest/{sunSourceId}

GetLatestPredictionAsync Implementation

public async Task<object?> GetLatestPredictionAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);

if (sunSource == null)
throw new NotFoundException("Sun source not found");

var collection = _mongoDatabase.GetCollection<BsonDocument>(MongoCollections.Predictions);
var filter = Builders<BsonDocument>.Filter.Eq("sunSourceId", sunSourceId.ToString());

// Always get the most recent prediction
var doc = await collection.Find(filter)
.SortByDescending(d => d["createdAt"])
.FirstOrDefaultAsync();

if (doc == null) return null;

doc.Remove("_id"); // Strip MongoDB internal ID
return BsonTypeMapper.MapToDotNetValue(doc);
}

Returns stored predictions from MongoDB (predictions collection):

{
"exists": true,
"data": {
"predictions": [
{
"timestamp": "2026-03-10T00:00:00Z",
"solarPower": 0.0,
"loadPower": 450.0,
"stateOfCharge": 65.2,
"energyProduced": 0.0,
"energyConsumed": 0.45
},
{
"timestamp": "2026-03-10T01:00:00Z",
"solarPower": 0.0,
"loadPower": 420.0,
"stateOfCharge": 63.8,
"energyProduced": 0.0,
"energyConsumed": 0.42
}
// ... hourly for 7 days (168 data points)
]
}
}

Model Info

GET /api/predictions/model-info/{sunSourceId}

Returns metadata about the trained model from MongoDB (model_metadata collection), including training date, data points used, and accuracy metrics.


Scheduled Predictions

The DailyPredictionJob runs every 6 hours via Quartz:

DailyPredictionJob Implementation

[DisallowConcurrentExecution]
public class DailyPredictionJob : IJob
{
private readonly ApplicationDbContext _dbContext;
private readonly IPredictionService _predictionService;
private readonly ILogger<DailyPredictionJob> _logger;

public async Task Execute(IJobExecutionContext context)
{
// 1. Find users who opted into auto-predictions
var optedInUserIds = await _dbContext.UserSettings
.Where(s => s.SettingType == (int)SettingType.AutoRunPredictions
&& s.Value == "1")
.Select(s => s.UserId)
.ToListAsync();

// 2. Get all active sources for those users
var sunSources = await _dbContext.SunSources
.Where(s => optedInUserIds.Contains(s.UserId) && s.IsActive)
.ToListAsync();

// 3. Run predictions for each source
foreach (var sunSource in sunSources)
{
try
{
await _predictionService.RunScheduledPredictionAsync(sunSource.Id);
_logger.LogInformation("Completed prediction for source {SourceId}", sunSource.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed prediction for source {SourceId}", sunSource.Id);
}
}
}
}

RunScheduledPredictionAsync Implementation

Unlike the user-triggered StartPredictionAsync, scheduled predictions run synchronously (no fire-and-forget) and skip the Redis concurrency guard:

public async Task RunScheduledPredictionAsync(Guid sunSourceId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId);

if (sunSource == null)
throw new NotFoundException("Sun source not found");

var weatherForecast = await BuildWeatherDataAsync(sunSourceId, sunSource);

var jobId = $"scheduled-{Guid.NewGuid()}";
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId, weatherForecast
});

var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
var response = await _httpClient.PostAsync(MlEndpoints.Predict, content);

if (!response.IsSuccessStatusCode)
throw new InvalidOperationException(
"Prediction service encountered an error. Please ensure the model has been trained.");
}

BuildWeatherDataAsync — Weather Data Preparation

Both user-triggered and scheduled predictions share this method to prepare weather data for the ML service. It auto-refreshes if no data exists:

private async Task<List<object>> BuildWeatherDataAsync(Guid sunSourceId, SunSource sunSource)
{
var weatherData = await _weatherService.GetWeatherDataAsync(sunSourceId);

// Auto-refresh weather if none exists and source has coordinates
if (weatherData.Count == 0 && sunSource.Latitude.HasValue && sunSource.Longitude.HasValue)
{
try
{
await _weatherService.RefreshWeatherDataAsync(sunSource);
weatherData = await _weatherService.GetWeatherDataAsync(sunSourceId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to refresh weather data for {SourceId}", sunSourceId);
}
}

return weatherData.Select(w => (object)new
{
date = w.Date.ToString("yyyy-MM-dd"),
temperatureMax = w.TemperatureMax,
temperatureMin = w.TemperatureMin,
weatherCode = w.WeatherCode,
shortwaveRadiationSum = w.ShortwaveRadiationSum,
precipitationSum = w.PrecipitationSum,
uvIndexMax = w.UvIndexMax,
sunrise = w.Sunrise?.ToString("o"),
sunset = w.Sunset?.ToString("o"),
}).ToList();
}

Weather Data Dependency

Predictions require weather data. The UpdateWeatherDataJob also runs every 6 hours and is triggered before the prediction job in the Quartz schedule, ensuring fresh weather data is available for the ML models.