From 7c3bfc80801d9edbbc8ce016d6ea8fd26807a431 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 5 Jul 2023 09:47:36 +0100 Subject: [PATCH] support/db: Add batch insert builder which uses COPY to insert rows (#4916) * Add batch insert builder which uses COPY to insert rows * Update support/db/fast_batch_insert_builder.go Co-authored-by: George * Update fast_batch_insert_builder_test.go --------- Co-authored-by: George --- support/db/fast_batch_insert_builder.go | 150 +++++++++++++++++++ support/db/fast_batch_insert_builder_test.go | 127 ++++++++++++++++ 2 files changed, 277 insertions(+) create mode 100644 support/db/fast_batch_insert_builder.go create mode 100644 support/db/fast_batch_insert_builder_test.go diff --git a/support/db/fast_batch_insert_builder.go b/support/db/fast_batch_insert_builder.go new file mode 100644 index 0000000000..ec235ee31d --- /dev/null +++ b/support/db/fast_batch_insert_builder.go @@ -0,0 +1,150 @@ +package db + +import ( + "context" + "reflect" + "sort" + + "github.com/lib/pq" + + "github.com/stellar/go/support/errors" +) + +// ErrSealed is returned when trying to add rows to the FastBatchInsertBuilder after Exec() is called. +// Once Exec() is called no more rows can be added to the FastBatchInsertBuilder unless you call Reset() +// which clears out the old rows from the FastBatchInsertBuilder. +var ErrSealed = errors.New("cannot add more rows after Exec() without calling Reset() first") + +// ErrNoTx is returned when Exec() is called outside of a transaction. +var ErrNoTx = errors.New("cannot call Exec() outside of a transaction") + +// FastBatchInsertBuilder works like sq.InsertBuilder but has a better support for batching +// large number of rows. +// It is NOT safe for concurrent use. +// It does NOT support updating existing rows. +type FastBatchInsertBuilder struct { + columns []string + rows [][]interface{} + rowStructType reflect.Type + sealed bool +} + +// Row adds a new row to the batch. All rows must have exactly the same columns +// (map keys). Otherwise, error will be returned. Please note that rows are not +// added one by one but in batches when `Exec` is called. +func (b *FastBatchInsertBuilder) Row(row map[string]interface{}) error { + if b.sealed { + return ErrSealed + } + + if b.columns == nil { + b.columns = make([]string, 0, len(row)) + b.rows = make([][]interface{}, 0) + + for column := range row { + b.columns = append(b.columns, column) + } + + sort.Strings(b.columns) + } + + if len(b.columns) != len(row) { + return errors.Errorf("invalid number of columns (expected=%d, actual=%d)", len(b.columns), len(row)) + } + + rowSlice := make([]interface{}, 0, len(b.columns)) + for _, column := range b.columns { + val, ok := row[column] + if !ok { + return errors.Errorf(`column "%s" does not exist`, column) + } + rowSlice = append(rowSlice, val) + } + b.rows = append(b.rows, rowSlice) + + return nil +} + +// RowStruct adds a new row to the batch. All rows must have exactly the same columns +// (map keys). Otherwise, error will be returned. Please note that rows are not +// added one by one but in batches when `Exec` is called. +func (b *FastBatchInsertBuilder) RowStruct(row interface{}) error { + if b.sealed { + return ErrSealed + } + + if b.columns == nil { + b.columns = ColumnsForStruct(row) + b.rows = make([][]interface{}, 0) + } + + rowType := reflect.TypeOf(row) + if b.rowStructType == nil { + b.rowStructType = rowType + } else if b.rowStructType != rowType { + return errors.Errorf(`expected value of type "%s" but got "%s" value`, b.rowStructType.String(), rowType.String()) + } + + rrow := reflect.ValueOf(row) + rvals := mapper.FieldsByName(rrow, b.columns) + + // convert fields values to interface{} + columnValues := make([]interface{}, len(b.columns)) + for i, rval := range rvals { + columnValues[i] = rval.Interface() + } + + b.rows = append(b.rows, columnValues) + + return nil +} + +// Len returns the number of rows held in memory by the FastBatchInsertBuilder. +func (b *FastBatchInsertBuilder) Len() int { + return len(b.rows) +} + +// Exec inserts rows in a single COPY statement. Once Exec is called no more rows +// can be added to the FastBatchInsertBuilder unless Reset is called. +// Exec must be called within a transaction. +func (b *FastBatchInsertBuilder) Exec(ctx context.Context, session SessionInterface, tableName string) error { + b.sealed = true + if session.GetTx() == nil { + return ErrNoTx + } + + if len(b.rows) == 0 { + return nil + } + + tx := session.GetTx() + stmt, err := tx.PrepareContext(ctx, pq.CopyIn(tableName, b.columns...)) + if err != nil { + return err + } + + for _, row := range b.rows { + if _, err = stmt.ExecContext(ctx, row...); err != nil { + // we need to close the statement otherwise the session + // will always return bad connection errors when executing + // any other sql statements, + // see https://github.com/stellar/go/pull/316#issuecomment-368990324 + stmt.Close() + return err + } + } + + if err = stmt.Close(); err != nil { + return err + } + return nil +} + +// Reset clears out all the rows contained in the FastBatchInsertBuilder. +// After Reset is called new rows can be added to the FastBatchInsertBuilder. +func (b *FastBatchInsertBuilder) Reset() { + b.sealed = false + b.columns = nil + b.rows = nil + b.rowStructType = nil +} diff --git a/support/db/fast_batch_insert_builder_test.go b/support/db/fast_batch_insert_builder_test.go new file mode 100644 index 0000000000..c31f502735 --- /dev/null +++ b/support/db/fast_batch_insert_builder_test.go @@ -0,0 +1,127 @@ +package db + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/support/db/dbtest" +) + +func TestFastBatchInsertBuilder(t *testing.T) { + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + sess := &Session{DB: db.Open()} + defer sess.DB.Close() + + insertBuilder := &FastBatchInsertBuilder{} + + assert.NoError(t, + insertBuilder.Row(map[string]interface{}{ + "name": "bubba", + "hunger_level": "1", + }), + ) + + assert.EqualError(t, + insertBuilder.Row(map[string]interface{}{ + "name": "bubba", + }), + "invalid number of columns (expected=2, actual=1)", + ) + + assert.EqualError(t, + insertBuilder.Row(map[string]interface{}{ + "name": "bubba", + "city": "London", + }), + "column \"hunger_level\" does not exist", + ) + + assert.NoError(t, + insertBuilder.RowStruct(hungerRow{ + Name: "bubba2", + HungerLevel: "9", + }), + ) + + assert.EqualError(t, + insertBuilder.RowStruct(invalidHungerRow{ + Name: "bubba", + HungerLevel: "2", + LastName: "b", + }), + "expected value of type \"db.hungerRow\" but got \"db.invalidHungerRow\" value", + ) + assert.Equal(t, 2, insertBuilder.Len()) + assert.Equal(t, false, insertBuilder.sealed) + + assert.EqualError(t, + insertBuilder.Exec(context.Background(), sess, "people"), + "cannot call Exec() outside of a transaction", + ) + assert.Equal(t, true, insertBuilder.sealed) + + assert.NoError(t, sess.Begin()) + assert.NoError(t, insertBuilder.Exec(context.Background(), sess, "people")) + assert.Equal(t, 2, insertBuilder.Len()) + assert.Equal(t, true, insertBuilder.sealed) + + var found []person + assert.NoError(t, sess.SelectRaw(context.Background(), &found, `SELECT * FROM people WHERE name like 'bubba%'`)) + assert.Equal( + t, + found, + []person{ + {Name: "bubba", HungerLevel: "1"}, + {Name: "bubba2", HungerLevel: "9"}, + }, + ) + + assert.EqualError(t, + insertBuilder.Row(map[string]interface{}{ + "name": "bubba3", + "hunger_level": "100", + }), + "cannot add more rows after Exec() without calling Reset() first", + ) + assert.Equal(t, 2, insertBuilder.Len()) + assert.Equal(t, true, insertBuilder.sealed) + + insertBuilder.Reset() + assert.Equal(t, 0, insertBuilder.Len()) + assert.Equal(t, false, insertBuilder.sealed) + + assert.NoError(t, + insertBuilder.Row(map[string]interface{}{ + "name": "bubba3", + "hunger_level": "3", + }), + ) + assert.Equal(t, 1, insertBuilder.Len()) + assert.Equal(t, false, insertBuilder.sealed) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + assert.EqualError(t, + insertBuilder.Exec(ctx, sess, "people"), + "context canceled", + ) + assert.Equal(t, 1, insertBuilder.Len()) + assert.Equal(t, true, insertBuilder.sealed) + + assert.NoError(t, sess.SelectRaw(context.Background(), &found, `SELECT * FROM people WHERE name like 'bubba%'`)) + assert.Equal( + t, + found, + []person{ + {Name: "bubba", HungerLevel: "1"}, + {Name: "bubba2", HungerLevel: "9"}, + }, + ) + assert.NoError(t, sess.Rollback()) + + assert.NoError(t, sess.SelectRaw(context.Background(), &found, `SELECT * FROM people WHERE name like 'bubba%'`)) + assert.Empty(t, found) +}