From fc94f0935cb0e2a2e6e221c4614b7ff13ab5b945 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 21 Aug 2018 16:30:09 +0200 Subject: [PATCH 1/3] Use the updated ledger state in ingest --- services/horizon/internal/ingest/system.go | 32 ++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/services/horizon/internal/ingest/system.go b/services/horizon/internal/ingest/system.go index 5e9c591b42..32f14cf266 100644 --- a/services/horizon/internal/ingest/system.go +++ b/services/horizon/internal/ingest/system.go @@ -230,8 +230,6 @@ func (i *System) runOnce() { } }() - ls := ledger.CurrentState() - // 1. stash a copy of the current ingestion session (assigned from the tick) // 2. decide what to import // 3. import until none available @@ -241,6 +239,24 @@ func (i *System) runOnce() { is := i.current i.lock.Unlock() + // Warning: do not check the current ledger state using ledger.CurrentState()! It is updated + // in another go routine and can return the same data for two different ingesiton sessions. + var coreLatest, historyLatest int32 + + coreQ := core.Q{Session: i.CoreDB} + err := coreQ.LatestLedger(&coreLatest) + if err != nil { + log.WithFields(ilog.F{"err": err}).Error("Error getting core latest ledger") + return + } + + historyQ := history.Q{Session: i.HorizonDB} + err = historyQ.LatestLedger(&historyLatest) + if err != nil { + log.WithFields(ilog.F{"err": err}).Error("Error getting history latest ledger") + return + } + defer func() { i.lock.Lock() i.current = nil @@ -252,25 +268,25 @@ func (i *System) runOnce() { return } - if ls.CoreLatest == 1 { + if coreLatest == 1 { log.Warn("ingest: waiting for stellar-core sync") return } - if ls.HistoryLatest == ls.CoreLatest { + if historyLatest == coreLatest { log.Debug("ingest: no new ledgers") return } // 2. - if ls.HistoryLatest == 0 { + if historyLatest == 0 { log.Infof( "history db is empty, establishing base at ledger %d", - ls.CoreLatest, + coreLatest, ) - is.Cursor = NewCursor(ls.CoreLatest, ls.CoreLatest, i) + is.Cursor = NewCursor(coreLatest, coreLatest, i) } else { - is.Cursor = NewCursor(ls.HistoryLatest+1, ls.CoreLatest, i) + is.Cursor = NewCursor(historyLatest+1, coreLatest, i) } // 3. From ae50482aa7c06147b96843e5074efe119f3e3b32 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 24 Aug 2018 17:54:29 +0200 Subject: [PATCH 2/3] Typo fix --- services/horizon/internal/ingest/system.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/system.go b/services/horizon/internal/ingest/system.go index 32f14cf266..5c73ea5b52 100644 --- a/services/horizon/internal/ingest/system.go +++ b/services/horizon/internal/ingest/system.go @@ -240,7 +240,7 @@ func (i *System) runOnce() { i.lock.Unlock() // Warning: do not check the current ledger state using ledger.CurrentState()! It is updated - // in another go routine and can return the same data for two different ingesiton sessions. + // in another go routine and can return the same data for two different ingestion sessions. var coreLatest, historyLatest int32 coreQ := core.Q{Session: i.CoreDB} From 79e453a2993221a19c5b59d0f2957f60925ba448 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 24 Aug 2018 18:15:30 +0200 Subject: [PATCH 3/3] Fix possible deadlock --- services/horizon/internal/ingest/system.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/services/horizon/internal/ingest/system.go b/services/horizon/internal/ingest/system.go index 5c73ea5b52..ac96172445 100644 --- a/services/horizon/internal/ingest/system.go +++ b/services/horizon/internal/ingest/system.go @@ -239,6 +239,12 @@ func (i *System) runOnce() { is := i.current i.lock.Unlock() + defer func() { + i.lock.Lock() + i.current = nil + i.lock.Unlock() + }() + // Warning: do not check the current ledger state using ledger.CurrentState()! It is updated // in another go routine and can return the same data for two different ingestion sessions. var coreLatest, historyLatest int32 @@ -257,12 +263,6 @@ func (i *System) runOnce() { return } - defer func() { - i.lock.Lock() - i.current = nil - i.lock.Unlock() - }() - if is == nil { log.Warn("ingest: runOnce ran with a nil current session") return