Fix concurrent stat insertion duplicate key errors

This commit resolves duplicate key errors that occur when multiple monitors
attempt to insert statistics simultaneously into stat_* tables.

The fix addresses three interconnected issues:

1. **Circular Dependency Resolution**: database.js imports UptimeCalculator at
   module level, but UptimeCalculator needs Database.dbConfig. Fixed by using
   local imports in UptimeCalculator methods to ensure Database.dbConfig is
   properly initialized when accessed.

2. **Database Configuration Initialization**: Database.dbConfig was not set
   in the catch block when db-config.json is missing, causing undefined access
   errors. Fixed by ensuring Database.dbConfig is always set.

3. **Schema Column Naming Mismatch**: RedBean ORM uses camelCase (pingMin/pingMax)
   but Knex migrations create snake_case columns (ping_min/ping_max). Fixed by
   using correct snake_case column names in SQL queries.

4. **Atomic Upsert Operations**: Implemented database-specific upsert logic:
   - SQLite: INSERT ... ON CONFLICT DO UPDATE
   - MariaDB: INSERT ... ON DUPLICATE KEY UPDATE

The solution maintains backward compatibility by falling back to R.store()
when upsert fails, ensuring no data loss while eliminating race conditions
for users with many monitors (200+).

Fixes #5357
This commit is contained in:
undaunt 2025-06-17 22:41:41 -07:00
parent 19889a57b2
commit 2dc3e1ab1f
2 changed files with 33 additions and 33 deletions

View file

@ -7,7 +7,6 @@ const path = require("path");
const { EmbeddedMariaDB } = require("./embedded-mariadb");
const mysql = require("mysql2/promise");
const { Settings } = require("./settings");
const { UptimeCalculator } = require("./uptime-calculator");
const dayjs = require("dayjs");
const { SimpleMigrationServer } = require("./utils/simple-migration-server");
const KumaColumnCompiler = require("./utils/knex/lib/dialects/mysql2/schema/mysql2-columncompiler");
@ -217,6 +216,7 @@ class Database {
dbConfig = {
type: "sqlite",
};
Database.dbConfig = dbConfig; // Fix: Also set Database.dbConfig in catch block
}
let config = {};
@ -823,7 +823,8 @@ class Database {
]);
for (let date of dates) {
// New Uptime Calculator
// New Uptime Calculator - import locally to avoid circular dependency
const { UptimeCalculator } = require("./uptime-calculator");
let calculator = new UptimeCalculator();
calculator.monitorID = monitor.monitor_id;
calculator.setMigrationMode(true);

View file

@ -3,7 +3,6 @@ const { UP, MAINTENANCE, DOWN, PENDING } = require("../src/util");
const { LimitQueue } = require("./utils/limit-queue");
const { log } = require("../src/util");
const { R } = require("redbean-node");
const { Database } = require("./database");
/**
* Calculates the uptime of a monitor.
@ -300,19 +299,17 @@ class UptimeCalculator {
dailyStatBean.ping = dailyData.avgPing;
dailyStatBean.pingMin = dailyData.minPing;
dailyStatBean.pingMax = dailyData.maxPing;
let dailyExtras = null;
{
// eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = dailyData;
if (Object.keys(extras).length > 0) {
dailyStatBean.extras = JSON.stringify(extras);
dailyExtras = JSON.stringify(extras);
}
}
try {
await this.upsertStat("stat_daily", this.monitorID, dailyKey,
dailyData.up, dailyData.down, dailyData.avgPing,
dailyData.minPing, dailyData.maxPing, dailyExtras);
dailyData.minPing, dailyData.maxPing);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for daily stat, falling back to R.store(): ${error.message}`);
await R.store(dailyStatBean);
@ -329,19 +326,17 @@ class UptimeCalculator {
hourlyStatBean.ping = hourlyData.avgPing;
hourlyStatBean.pingMin = hourlyData.minPing;
hourlyStatBean.pingMax = hourlyData.maxPing;
let hourlyExtras = null;
{
// eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = hourlyData;
if (Object.keys(extras).length > 0) {
hourlyStatBean.extras = JSON.stringify(extras);
hourlyExtras = JSON.stringify(extras);
}
}
try {
await this.upsertStat("stat_hourly", this.monitorID, hourlyKey,
hourlyData.up, hourlyData.down, hourlyData.avgPing,
hourlyData.minPing, hourlyData.maxPing, hourlyExtras);
hourlyData.minPing, hourlyData.maxPing);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for hourly stat, falling back to R.store(): ${error.message}`);
await R.store(hourlyStatBean);
@ -357,19 +352,17 @@ class UptimeCalculator {
minutelyStatBean.ping = minutelyData.avgPing;
minutelyStatBean.pingMin = minutelyData.minPing;
minutelyStatBean.pingMax = minutelyData.maxPing;
let minutelyExtras = null;
{
// eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = minutelyData;
if (Object.keys(extras).length > 0) {
minutelyStatBean.extras = JSON.stringify(extras);
minutelyExtras = JSON.stringify(extras);
}
}
try {
await this.upsertStat("stat_minutely", this.monitorID, divisionKey,
minutelyData.up, minutelyData.down, minutelyData.avgPing,
minutelyData.minPing, minutelyData.maxPing, minutelyExtras);
minutelyData.minPing, minutelyData.maxPing);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for minutely stat, falling back to R.store(): ${error.message}`);
await R.store(minutelyStatBean);
@ -569,40 +562,46 @@ class UptimeCalculator {
* @param {number} ping Average ping
* @param {number} pingMin Minimum ping
* @param {number} pingMax Maximum ping
* @param {string|null} extras JSON string of extra data
* @returns {Promise<void>}
*/
async upsertStat(table, monitorId, timestamp, up, down, ping, pingMin, pingMax, extras = null) {
async upsertStat(table, monitorId, timestamp, up, down, ping, pingMin, pingMax) {
// Import Database locally to avoid circular dependency
const Database = require("./database");
// Check if database is initialized - dbConfig.type must exist and not be empty
if (!Database.dbConfig || !Database.dbConfig.type) {
log.warn("uptime-calc", `Database not initialized yet for ${table}, falling back to R.store()`);
throw new Error("Database not initialized");
}
const dbType = Database.dbConfig.type;
try {
if (dbType === "sqlite") {
await R.exec(`
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, pingMin, pingMax, extras)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, ping_min, ping_max)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(monitor_id, timestamp) DO UPDATE SET
up = ?,
down = ?,
ping = ?,
pingMin = ?,
pingMax = ?,
extras = ?
ping_min = ?,
ping_max = ?
`, [
monitorId, timestamp, up, down, ping, pingMin, pingMax, extras,
up, down, ping, pingMin, pingMax, extras
monitorId, timestamp, up, down, ping, pingMin, pingMax,
up, down, ping, pingMin, pingMax
]);
} else if (dbType.endsWith("mariadb")) {
await R.exec(`
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, pingMin, pingMax, extras)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, ping_min, ping_max)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
up = VALUES(up),
down = VALUES(down),
ping = VALUES(ping),
pingMin = VALUES(pingMin),
pingMax = VALUES(pingMax),
extras = VALUES(extras)
`, [monitorId, timestamp, up, down, ping, pingMin, pingMax, extras]);
ping_min = VALUES(ping_min),
ping_max = VALUES(ping_max)
`, [ monitorId, timestamp, up, down, ping, pingMin, pingMax ]);
} else {
throw new Error(`Unsupported database type: ${dbType}`);
}