Skip to content

Commit

Permalink
Refactor collector to use ticker for periodic data recording and cleanup
Browse files Browse the repository at this point in the history
- Replaced the sleep-based loop with a ticker in the collector function to record metrics data at regular intervals.
- Added error handling and recovery mechanism to prevent panics in the collector function.
- Updated the debug log message in the main function to include the current timestamp when debug mode is enabled.
  • Loading branch information
andrasbacsai committed Oct 15, 2024
1 parent 6bc31ee commit 7e682b2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 61 deletions.
122 changes: 64 additions & 58 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,71 @@ func collector() {
fmt.Printf("[%s] Starting metrics recorder with refresh rate of %d seconds and retention period of %d days.\n", time.Now().Format("2006-01-02 15:04:05"), refreshRateSeconds, collectorRetentionPeriodDays)

go func() {
for {
time.Sleep(time.Duration(refreshRateSeconds) * time.Second)
// fmt.Printf("[%s] Recording metrics data.\n", time.Now().Format("2006-01-02 15:04:05"))

queryTimeInUnixString := getUnixTimeInMilliUTC()
overallPercentage, err := cpu.Percent(0, false)
if err != nil {
log.Printf("Error getting CPU percentage: %v", err)
continue
}

_, err = db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0]))
if err != nil {
log.Printf("Error inserting into database: %v", err)
}

memory, err := mem.VirtualMemory()
if err != nil {
log.Printf("Error getting memory usage: %v", err)
continue
}

_, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`, queryTimeInUnixString, memory.Total, memory.Available, memory.Used, math.Round(memory.UsedPercent*100)/100, memory.Free)
if err != nil {
log.Printf("Error inserting into database: %v", err)
}
ticker := time.NewTicker(time.Duration(refreshRateSeconds) * time.Second)
defer ticker.Stop()

totalRowsToKeep := 10
currentTime := time.Now().UTC().UnixMilli()
cutoffTime := currentTime - int64(collectorRetentionPeriodDays*24*60*60*1000)

// Count the total number of rows
var totalRows int
err = db.QueryRow("SELECT COUNT(*) FROM cpu_usage").Scan(&totalRows)
if err != nil {
log.Printf("Error counting rows: %v", err)
continue
}

if totalRows > totalRowsToKeep {
// Delete old data while keeping at least 10 rows
_, err = db.Exec(`DELETE FROM cpu_usage WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM cpu_usage ORDER BY time DESC LIMIT ?)`, cutoffTime, totalRowsToKeep)
if err != nil {
log.Printf("Error deleting old data: %v", err)
}
}

err = db.QueryRow("SELECT COUNT(*) FROM memory_usage").Scan(&totalRows)
if err != nil {
log.Printf("Error counting rows: %v", err)
continue
}

if totalRows > totalRowsToKeep {
// Delete old data while keeping at least 10 rows
_, err = db.Exec(`DELETE FROM memory_usage WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM memory_usage ORDER BY time DESC LIMIT ?)`, cutoffTime, totalRowsToKeep)
if err != nil {
log.Printf("Error deleting old data: %v", err)
}
for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic in collector: %v", r)
}
}()

queryTimeInUnixString := getUnixTimeInMilliUTC()

// CPU usage
overallPercentage, err := cpu.Percent(0, false)
if err != nil {
log.Printf("Error getting CPU percentage: %v", err)
return
}

_, err = db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0]))
if err != nil {
log.Printf("Error inserting CPU usage into database: %v", err)
}

// Memory usage
memory, err := mem.VirtualMemory()
if err != nil {
log.Printf("Error getting memory usage: %v", err)
return
}

_, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`,
queryTimeInUnixString, memory.Total, memory.Available, memory.Used, math.Round(memory.UsedPercent*100)/100, memory.Free)
if err != nil {
log.Printf("Error inserting memory usage into database: %v", err)
}

// Cleanup old data
totalRowsToKeep := 10
currentTime := time.Now().UTC().UnixMilli()
cutoffTime := currentTime - int64(collectorRetentionPeriodDays*24*60*60*1000)

cleanupTable := func(tableName string) {
var totalRows int
err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&totalRows)
if err != nil {
log.Printf("Error counting rows in %s: %v", tableName, err)
return
}

if totalRows > totalRowsToKeep {
_, err = db.Exec(fmt.Sprintf(`DELETE FROM %s WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM %s ORDER BY time DESC LIMIT ?)`, tableName, tableName),
cutoffTime, totalRowsToKeep)
if err != nil {
log.Printf("Error deleting old data from %s: %v", tableName, err)
}
}
}

cleanupTable("cpu_usage")
cleanupTable("memory_usage")
}()
}
}
}()
Expand Down
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/gin-gonic/gin"
_ "github.com/marcboeker/go-duckdb"
Expand Down Expand Up @@ -59,9 +60,7 @@ func main() {
}
}
if debug {
log.Printf("Debug is enabled")
} else {
log.Printf("Debug is disabled")
log.Printf("[%s] Debug is enabled.", time.Now().Format("2006-01-02 15:04:05"))
}

tokenFromEnv := os.Getenv("TOKEN")
Expand Down

0 comments on commit 7e682b2

Please sign in to comment.