diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index ff85a5a98f..2d9812912d 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -346,6 +346,8 @@ type EnvironmentConfig struct { DisableMarketDataStore bool `json:"disableMarketDataStore"` MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"` + + SyncBufferPeriod *types.Duration `json:"syncBufferPeriod"` } type Config struct { diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index ab86153132..f819f3f280 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -37,6 +37,8 @@ func init() { rand.Seed(time.Now().UnixNano()) } +var defaultSyncBufferPeriod = 30 * time.Minute + // IsBackTesting is a global variable that indicates the current environment is back-test or not. var IsBackTesting = false @@ -645,7 +647,17 @@ func (environ *Environment) syncSession( log.Infof("syncing symbols %v from session %s", symbols, session.Name) - return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, symbols...) + syncBufferPeriod := -defaultSyncBufferPeriod + if environ.environmentConfig.SyncBufferPeriod != nil { + syncBufferPeriod = -environ.environmentConfig.SyncBufferPeriod.Duration() + } + + if syncBufferPeriod > 0 { + log.Warnf("syncBufferPeriod should be a negative number, given: %d", syncBufferPeriod) + } + + syncEndTime := time.Now().Add(syncBufferPeriod) + return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, syncEndTime, symbols...) } func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error { diff --git a/pkg/service/order.go b/pkg/service/order.go index 8cdff47ec1..90e4730d00 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -19,7 +19,10 @@ type OrderService struct { DB *sqlx.DB } -func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { +func (s *OrderService) Sync( + ctx context.Context, exchange types.Exchange, symbol string, + startTime, endTime time.Time, +) error { isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange) // override symbol if isolatedSymbol is not empty if isIsolated && len(isolatedSymbol) > 0 { @@ -77,7 +80,7 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol } for _, sel := range tasks { - if err := sel.execute(ctx, s.DB, startTime); err != nil { + if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil { return err } } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index 718c4dc8fa..71ac0efc2c 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -27,7 +27,9 @@ type SyncService struct { // SyncSessionSymbols syncs the trades from the given exchange session func (s *SyncService) SyncSessionSymbols( - ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string, + ctx context.Context, exchange types.Exchange, + startTime, endTime time.Time, + symbols ...string, ) error { markets, err := cache.LoadExchangeMarketsWithCache(ctx, exchange) if err != nil { @@ -41,12 +43,12 @@ func (s *SyncService) SyncSessionSymbols( } log.Infof("syncing %s %s trades from %s...", exchange.Name(), symbol, startTime) - if err := s.TradeService.Sync(ctx, exchange, symbol, startTime); err != nil { + if err := s.TradeService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil { return err } log.Infof("syncing %s %s orders from %s...", exchange.Name(), symbol, startTime) - if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil { + if err := s.OrderService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil { return err } } diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index 8ccfc99804..1e4ce765be 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -54,7 +54,10 @@ type SyncTask struct { LogInsert bool } -func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error { +func (sel SyncTask) execute( + ctx context.Context, + db *sqlx.DB, startTime time.Time, endTimeArgs ...time.Time, +) error { batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer) // query from db @@ -84,8 +87,8 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim startTime = lastRecordTime(sel, recordSliceRef, startTime) endTime := time.Now() - if len(args) > 0 { - endTime = args[0] + if len(endTimeArgs) > 0 { + endTime = endTimeArgs[0] } // asset "" means all assets diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 9020290a77..01a347bca9 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -58,7 +58,11 @@ func NewTradeService(db *sqlx.DB) *TradeService { return &TradeService{db} } -func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { +func (s *TradeService) Sync( + ctx context.Context, + exchange types.Exchange, symbol string, + startTime, endTime time.Time, +) error { isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange) // override symbol if isolatedSymbol is not empty if isIsolated && len(isolatedSymbol) > 0 { @@ -106,7 +110,7 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol } for _, sel := range tasks { - if err := sel.execute(ctx, s.DB, startTime); err != nil { + if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil { return err } }