forked from stellar/go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fast_batch_insert_builder.go
111 lines (91 loc) · 2.7 KB
/
fast_batch_insert_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package db
import (
"context"
"github.com/lib/pq"
"reflect"
"sort"
"github.com/stellar/go/support/errors"
)
// FastBatchInsertBuilder works like sq.InsertBuilder but has a better support for batching
// large number of rows.
// It is NOT safe for concurrent use.
type FastBatchInsertBuilder struct {
columns []string
rows [][]interface{}
rowStructType reflect.Type
}
// Row adds a new row to the batch. All rows must have exactly the same columns
// (map keys). Otherwise, error will be returned. Please note that rows are not
// added one by one but in batches when `Exec` is called (or `MaxBatchSize` is
// reached).
func (b *FastBatchInsertBuilder) Row(row map[string]interface{}) error {
if b.columns == nil {
b.columns = make([]string, 0, len(row))
b.rows = make([][]interface{}, 0)
for column := range row {
b.columns = append(b.columns, column)
}
sort.Strings(b.columns)
}
if len(b.columns) != len(row) {
return errors.Errorf("invalid number of columns (expected=%d, actual=%d)", len(b.columns), len(row))
}
rowSlice := make([]interface{}, 0, len(b.columns))
for _, column := range b.columns {
val, ok := row[column]
if !ok {
return errors.Errorf(`column "%s" does not exist`, column)
}
rowSlice = append(rowSlice, val)
}
b.rows = append(b.rows, rowSlice)
return nil
}
func (b *FastBatchInsertBuilder) RowStruct(row interface{}) error {
if b.columns == nil {
b.columns = ColumnsForStruct(row)
b.rows = make([][]interface{}, 0)
}
rowType := reflect.TypeOf(row)
if b.rowStructType == nil {
b.rowStructType = rowType
} else if b.rowStructType != rowType {
return errors.Errorf(`expected value of type "%s" but got "%s" value`, b.rowStructType.String(), rowType.String())
}
rrow := reflect.ValueOf(row)
rvals := mapper.FieldsByName(rrow, b.columns)
// convert fields values to interface{}
columnValues := make([]interface{}, len(b.columns))
for i, rval := range rvals {
columnValues[i] = rval.Interface()
}
b.rows = append(b.rows, columnValues)
return nil
}
func (b *FastBatchInsertBuilder) Len() int {
return len(b.rows)
}
// Exec inserts rows in batches. In case of errors it's possible that some batches
// were added so this should be run in a DB transaction for easy rollbacks.
func (b *FastBatchInsertBuilder) Exec(ctx context.Context, session SessionInterface, tableName string) error {
if len(b.rows) == 0 {
return nil
}
tx := session.GetTx()
stmt, err := tx.Prepare(pq.CopyIn(tableName, b.columns...))
if err != nil {
return err
}
for _, row := range b.rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
return err
}
}
err = stmt.Close()
if err != nil {
return err
}
b.rows = [][]interface{}{}
return nil
}