Skip to content

Commit

Permalink
Fix storelogs (#222)
Browse files Browse the repository at this point in the history
* Fix StoreWorkerLogs

The function has been storing into worker_metrics with duplicates and
wrong timestamps for some time. The fix changes the worker_lastlogs
DB table definition. DBs must be recreated.

Signed-off-by: IWAMOTO Toshihiro <[email protected]>

* Add foreign key constraints to worker log DB tables and tidy up formatting

This patch make sure worker_* rows have matching row in the worker table.
Also changes multi-line string formatting for readability.

Signed-off-by: IWAMOTO Toshihiro <[email protected]>
  • Loading branch information
toshiiw authored and k8s-ci-robot committed Oct 24, 2018
1 parent 4dc1aed commit 106235b
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 104 deletions.
111 changes: 56 additions & 55 deletions pkg/db/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,89 +6,90 @@ import (

func (d *dbConn) DBInit() {
db := d.db
_, err := db.Exec("CREATE TABLE IF NOT EXISTS studies" +
"(id CHAR(16) PRIMARY KEY, " +
"name VARCHAR(255), " +
"owner VARCHAR(255), " +
"optimization_type TINYINT, " +
"optimization_goal DOUBLE, " +
"parameter_configs TEXT, " +
"tags TEXT, " +
"objective_value_name VARCHAR(255), " +
"metrics TEXT, " +
"job_id TEXT)")
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS studies
(id CHAR(16) PRIMARY KEY,
name VARCHAR(255),
owner VARCHAR(255),
optimization_type TINYINT,
optimization_goal DOUBLE,
parameter_configs TEXT,
tags TEXT,
objective_value_name VARCHAR(255),
metrics TEXT,
job_id TEXT)`)
if err != nil {
log.Fatalf("Error creating studies table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS study_permissions" +
"(study_id CHAR(16) NOT NULL, " +
"access_permission VARCHAR(255), " +
"PRIMARY KEY (study_id, access_permission))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS study_permissions
(study_id CHAR(16) NOT NULL,
access_permission VARCHAR(255),
PRIMARY KEY (study_id, access_permission))`)
if err != nil {
log.Fatalf("Error creating study_permissions table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS trials" +
"(id CHAR(16) PRIMARY KEY, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"objective_value VARCHAR(255), " +
"tags TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS trials
(id CHAR(16) PRIMARY KEY,
study_id CHAR(16),
parameters TEXT,
objective_value VARCHAR(255),
tags TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating trials table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS workers" +
"(id CHAR(16) PRIMARY KEY, " +
"study_id CHAR(16), " +
"trial_id CHAR(16), " +
"type VARCHAR(255), " +
"status TINYINT, " +
"template_path TEXT, " +
"tags TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id), " +
"FOREIGN KEY(trial_id) REFERENCES trials(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS workers
(id CHAR(16) PRIMARY KEY,
study_id CHAR(16),
trial_id CHAR(16),
type VARCHAR(255),
status TINYINT,
template_path TEXT,
tags TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id),
FOREIGN KEY(trial_id) REFERENCES trials(id))`)
if err != nil {
log.Fatalf("Error creating workers table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_metrics" +
"(worker_id CHAR(16) NOT NULL, " +
"id INT AUTO_INCREMENT PRIMARY KEY, " +
"time DATETIME(6), " +
"name VARCHAR(255), " +
"value TEXT, " +
"is_objective TINYINT)")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_metrics
(worker_id CHAR(16) NOT NULL,
id INT AUTO_INCREMENT PRIMARY KEY,
time DATETIME(6),
name VARCHAR(255),
value TEXT,
is_objective TINYINT,
FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`)
if err != nil {
log.Fatalf("Error creating worker_metrics table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_lastlogs" +
"(worker_id CHAR(16) PRIMARY KEY, " +
"time DATETIME(6), " +
"value TEXT)")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_lastlogs
(worker_id CHAR(16) PRIMARY KEY,
time DATETIME(6),
FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`)
if err != nil {
log.Fatalf("Error creating worker_lastlogs table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS suggestion_param" +
"(id CHAR(16) PRIMARY KEY," +
"suggestion_algo TEXT, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS suggestion_param
(id CHAR(16) PRIMARY KEY,
suggestion_algo TEXT,
study_id CHAR(16),
parameters TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating suggestion_param table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS earlystop_param" +
"(id CHAR(16) PRIMARY KEY, " +
"earlystop_argo TEXT, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS earlystop_param
(id CHAR(16) PRIMARY KEY,
earlystop_argo TEXT,
study_id CHAR(16),
parameters TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating earlystop_param table: %v", err)
}
Expand Down
133 changes: 84 additions & 49 deletions pkg/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,61 @@ func (d *dbConn) GetWorkerLogs(id string, opts *GetWorkerLogOpts) ([]*WorkerLog,
return result, nil
}

func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error) {
var lastTimestamp string
func (d *dbConn) getWorkerLastlogs(id string) (time.Time, []*WorkerLog, error) {
var timeStr string
var timeVal time.Time
var err error

if value != nil {
row := d.db.QueryRow("SELECT time, value FROM worker_lastlogs WHERE worker_id = ?", id)
err = row.Scan(&lastTimestamp, value)
} else {
row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id)
err = row.Scan(&lastTimestamp)
// Use LEFT JOIN to ensure a result even if there's no matching
// in worker_metrics.
rows, err := d.db.Query(
`SELECT worker_lastlogs.time, name, value FROM worker_lastlogs
LEFT JOIN worker_metrics
ON (worker_lastlogs.worker_id = worker_metrics.worker_id AND worker_lastlogs.time = worker_metrics.time)
WHERE worker_lastlogs.worker_id = ?`, id)
if err != nil {
return timeVal, nil, err
}

var result []*WorkerLog
for rows.Next() {
log1 := new(WorkerLog)
var thisTime string
var name, value sql.NullString

err := rows.Scan(&thisTime, &name, &value)
if err != nil {
log.Printf("Error scanning log: %v", err)
continue
}
if timeStr == "" {
timeStr = thisTime
timeVal, err = time.Parse(mysqlTimeFmt, timeStr)
if err != nil {
log.Printf("Error parsing time %s: %v", timeStr, err)
return timeVal, nil, err
}
} else if timeStr != thisTime {
log.Printf("Unexpected query result %s != %s",
timeStr, thisTime)
}
log1.Time = timeVal
if !name.Valid {
continue
}
(*log1).Name = name.String
(*log1).Value = value.String
result = append(result, log1)
}
return timeVal, result, nil
}

func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) {
var lastTimestamp string

row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id)
err := row.Scan(&lastTimestamp)

switch {
case err == sql.ErrNoRows:
return nil, nil
Expand All @@ -483,10 +526,6 @@ func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error)
}
}

func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) {
return d.getWorkerLastlog(id, nil)
}

func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string, metricsValue string, objectiveValueName string) error {
isObjective := 0
if metricsName == objectiveValueName {
Expand All @@ -502,9 +541,8 @@ func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string

func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error {
var lasterr error
var lastValue string

dbT, err := d.getWorkerLastlog(workerID, &lastValue)
dbT, lastLogs, err := d.getWorkerLastlogs(workerID)
if err != nil {
log.Printf("Error getting last log timestamp: %v", err)
}
Expand All @@ -519,69 +557,66 @@ func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error
return err
}

// Store logs when
// 1. a log is newer than dbT, or,
// 2. a log is not yet in the DB when the timestamps are equal
var formattedTime string
var ls []string
var lastTime time.Time
for _, mlog := range logs {
metricsName := mlog.Name
logLoop:
for _, mv := range mlog.Values {
t, err := time.Parse(time.RFC3339Nano, mv.Time)
if err != nil {
log.Printf("Error parsing time %s: %v", mv.Time, err)
lasterr = err
continue
}
if dbT != nil && !t.After(*dbT) {
if t.Before(dbT) {
// dbT is from mysql and has microsec precision.
// This code assumes nanosec fractions are rounded down.
continue
}
// use UTC as mysql DATETIME lacks timezone
formattedTime = t.UTC().Format(mysqlTimeFmt)
if dbT != nil {
// Parse again to get rounding effect
//reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime)
//if reparsed_time == *dbT {
// if mv.Value == lastValue {
// stored_logs are already in DB
// This assignment ensures the remaining
// logs will be stored in DB.
// dbT = nil
// continue
// }
// // We don't know this is necessary or not yet.
// stored_logs = append(stored_logs, &mv.Value)
// continue
//}
// (reparsed_time > *dbT) can be assumed
err = d.storeWorkerLog(workerID,
dbT.UTC().Format(mysqlTimeFmt),
metricsName, mv.Value,
objectiveValueName)
if !dbT.IsZero() {
// Parse again to get rounding effect, otherwise
// the next comparison will be almost always false.
reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
log.Printf("Error parsing time %s: %v", formattedTime, err)
lasterr = err
continue
}
dbT = nil
} else {
err = d.storeWorkerLog(workerID,
formattedTime,
metricsName, mv.Value,
objectiveValueName)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
lasterr = err
if reparsed_time == dbT {
for _, l := range lastLogs {
if l.Name == metricsName && l.Value == mv.Value {
continue logLoop
}
}
}
}
err = d.storeWorkerLog(workerID,
formattedTime,
metricsName, mv.Value,
objectiveValueName)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
lasterr = err
} else if t.After(lastTime) {
lastTime = t
}
}
}
if lasterr != nil {
// If lastlog were updated, logs that couldn't be saved
// would be lost.
return lasterr
}
if len(ls) == 2 {
_, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?, ?)",
workerID, formattedTime, ls[1])
if !lastTime.IsZero() {
formattedTime = lastTime.UTC().Format(mysqlTimeFmt)
_, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?)",
workerID, formattedTime)
}
return err
}
Expand Down

0 comments on commit 106235b

Please sign in to comment.