-
Notifications
You must be signed in to change notification settings - Fork 523
/
clickhouse_test.go
209 lines (179 loc) · 6.07 KB
/
clickhouse_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package clickhouse_test
import (
"database/sql"
"errors"
"path/filepath"
"testing"
"time"
"github.com/avast/retry-go/v4"
"github.com/pressly/goose/v3"
"github.com/pressly/goose/v3/internal/check"
"github.com/pressly/goose/v3/internal/testdb"
)
func TestClickUpDownAll(t *testing.T) {
t.Parallel()
migrationDir := filepath.Join("testdata", "migrations")
db, cleanup, err := testdb.NewClickHouse()
check.NoError(t, err)
t.Cleanup(cleanup)
check.NoError(t, goose.SetDialect("clickhouse"))
retryCheckTableMutation := func(table string) func() error {
return func() error {
ok := checkTableMutation(t, db, table)
if !ok {
return errors.New("mutation not done for table: " + table)
}
return nil
}
}
/*
This test applies all up migrations, asserts we have all the entries in
the versions table, applies all down migration and asserts we have zero
migrations applied.
ClickHouse performs UPDATES and DELETES asynchronously,
but we can best-effort check mutations and their progress.
This is especially important for down migrations where rows are deleted
from the versions table.
For the sake of testing, there might be a way to modifying the server
(or queries) to perform all operations synchronously?
Ref: https://clickhouse.com/docs/en/operations/system-tables/mutations/
Ref: https://clickhouse.com/docs/en/sql-reference/statements/alter/#mutations
Ref: https://clickhouse.com/blog/how-to-update-data-in-click-house/
*/
// Collect migrations so we don't have to hard-code the currentVersion
// in an assertion later in the test.
migrations, err := goose.CollectMigrations(migrationDir, 0, goose.MaxVersion)
check.NoError(t, err)
currentVersion, err := goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 0)
err = goose.Up(db, migrationDir)
check.NoError(t, err)
currentVersion, err = goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, len(migrations))
err = goose.DownTo(db, migrationDir, 0)
check.NoError(t, err)
err = retry.Do(
retryCheckTableMutation(goose.TableName()),
retry.Delay(1*time.Second),
)
check.NoError(t, err)
currentVersion, err = goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 0)
}
func TestClickHouseFirstThree(t *testing.T) {
t.Parallel()
migrationDir := filepath.Join("testdata", "migrations")
db, cleanup, err := testdb.NewClickHouse()
check.NoError(t, err)
t.Cleanup(cleanup)
check.NoError(t, goose.SetDialect("clickhouse"))
err = goose.Up(db, migrationDir)
check.NoError(t, err)
currentVersion, err := goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 3)
type result struct {
customerID string `db:"customer_id"`
timestamp time.Time `db:"time_stamp"`
clickEventType string `db:"click_event_type"`
countryCode string `db:"country_code"`
sourceID int64 `db:"source_id"`
}
rows, err := db.Query(`SELECT * FROM clickstream ORDER BY customer_id`)
check.NoError(t, err)
var results []result
for rows.Next() {
var r result
err = rows.Scan(&r.customerID, &r.timestamp, &r.clickEventType, &r.countryCode, &r.sourceID)
check.NoError(t, err)
results = append(results, r)
}
check.Number(t, len(results), 3)
check.NoError(t, rows.Close())
check.NoError(t, rows.Err())
parseTime := func(t *testing.T, s string) time.Time {
t.Helper()
tm, err := time.Parse("2006-01-02", s)
check.NoError(t, err)
return tm
}
want := []result{
{"customer1", parseTime(t, "2021-10-02"), "add_to_cart", "US", 568239},
{"customer2", parseTime(t, "2021-10-30"), "remove_from_cart", "", 0},
{"customer3", parseTime(t, "2021-11-07"), "checkout", "", 307493},
}
for i, result := range results {
check.Equal(t, result.customerID, want[i].customerID)
check.Equal(t, result.timestamp, want[i].timestamp)
check.Equal(t, result.clickEventType, want[i].clickEventType)
if result.countryCode != "" && want[i].countryCode != "" {
check.Equal(t, result.countryCode, want[i].countryCode)
}
check.Number(t, result.sourceID, want[i].sourceID)
}
}
func TestRemoteImportMigration(t *testing.T) {
t.Parallel()
// TODO(mf): use TestMain and create a proper "long" or "remote" flag.
if !testing.Short() {
t.Skip("skipping test")
}
// This test is using a remote dataset from an s3 bucket:
// https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv
// From this tutorial: https://clickhouse.com/docs/en/tutorial/
// Note, these files are backed up in this repository in:
// tests/clickhouse/testdata/backup-files/taxi_zone_lookup.csv
// We may want to host this ourselves. Or just don't bother with SOURCE(HTTP(URL..
// and craft a long INSERT statement.
migrationDir := filepath.Join("testdata", "migrations-remote")
db, cleanup, err := testdb.NewClickHouse()
check.NoError(t, err)
t.Cleanup(cleanup)
check.NoError(t, goose.SetDialect("clickhouse"))
err = goose.Up(db, migrationDir)
check.NoError(t, err)
_, err = goose.GetDBVersion(db)
check.NoError(t, err)
var count int
err = db.QueryRow(`SELECT count(*) FROM taxi_zone_dictionary`).Scan(&count)
check.NoError(t, err)
check.Number(t, count, 265)
}
func checkTableMutation(t *testing.T, db *sql.DB, tableName string) bool {
t.Helper()
rows, err := db.Query(
`select mutation_id, command, is_done, create_time from system.mutations where table=$1`,
tableName,
)
check.NoError(t, err)
type result struct {
mutationID string `db:"mutation_id"`
command string `db:"command"`
isDone int64 `db:"is_done"`
createTime time.Time `db:"create_time"`
}
var results []result
for rows.Next() {
var r result
err = rows.Scan(&r.mutationID, &r.command, &r.isDone, &r.createTime)
check.NoError(t, err)
results = append(results, r)
}
check.NoError(t, rows.Close())
check.NoError(t, rows.Err())
// No results means there are no mutations. Assume they are all done.
if len(results) == 0 {
return true
}
// Loop through all the mutations, if at least one of them is
// not done, return false.
for _, r := range results {
if r.isDone != 1 {
return false
}
}
return true
}