-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfunctions.go
410 lines (381 loc) · 16.2 KB
/
functions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
/**
* This file contains functions for managing partitions.
* Creating, removing, and displaying.
*/
package main
import (
"github.com/imdario/mergo"
"gopkg.in/guregu/null.v2"
"regexp"
"strconv"
)
// Creates a parent from a given table and creatse partitions based on the given settings.
func (db DB) CreateParent(p *Partition) {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
if count > 0 {
l.Info("Partition already exists for " + p.Table + " you must first run `undo` on it.")
return
}
// SELECT partman.create_parent('test.part_test', 'col3', 'time-static', 'daily');
_, err = db.NamedExec(`SELECT partman.create_parent(:table, :column, :type, :interval);`, p)
if err != nil {
l.Error(err)
}
// If a retention period was set, the record in partman.part_config table must be updated to include it. It does not get set with create_parent()
db.SetRetention(p)
}
// Creates parents from all configured partitions for a database.
func (db DB) CreateParents() {
if len(db.Partitions) == 0 {
l.Info("There are no configured partitions to be created.")
} else {
for _, p := range db.Partitions {
db.CreateParent(&p)
}
}
}
// Calls the `run_maintenance()` function and adds new partition tables and drops old partitions if a retention period was set. If a partition name is passed, it will run maintenance for that partition table ONLY. "NULL" will run maintenance on all tables.
func (db DB) RunMaintenance(p *Partition, opts ...map[string]interface{}) {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.RunMaintenance); err != nil {
l.Error(err)
}
// Defaults
if err := mergo.Merge(&m, map[string]interface{}{"analyze": true, "jobmon": true}); err != nil {
l.Error(err)
}
_, err := db.NamedExec(`SELECT partman.run_maintenance(:table, :analyze, :jobmon);`, m)
if err != nil {
l.Error(err)
}
}
// Undo any partition by copying data from the child partition tables to the parent. Note: Batches can not be smaller than the partition interval because this copies entire tables.
func (db DB) UndoPartition(p *Partition, opts ...map[string]interface{}) {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.UndoPartition); err != nil {
l.Error(err)
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/undo_partition.sql#L5)
if err := mergo.Merge(&m, map[string]interface{}{"batchCount": 1, "keepTable": true, "jobmon": true, "lockWait": 0}); err != nil {
l.Error(err)
}
_, err := db.NamedExec(`SELECT partman.undo_partition(:table, :batchCount, :keepTable, :jobmon, :lockWait);`, m)
if err != nil {
l.Error(err)
}
// undo_partition() doesn't seem to remove the part_config record. It seems as if it should be removed too because a new partition on the same table can't be made until it is.
_, err = db.NamedExec(`DELETE FROM partman.part_config WHERE parent_table = :table;`, m)
if err != nil {
l.Error(err)
}
}
// Gets information about a partition.
func (db DB) PartitionInfo(p *Partition) PartConfig {
pc := PartConfig{}
err := db.Get(&pc, "SELECT parent_table,control,type,part_interval,premake FROM partman.part_config WHERE parent_table = $1 LIMIT 1", p.Table)
if err != nil {
l.Error(err)
}
return pc
}
// Shows child partitions for a partition table.
func (db DB) GetChildPartitions(p *Partition) []ChildInfo {
c := []ChildInfo{}
err := db.Select(&c, "SELECT partman.show_partitions($1) AS table", p.Table)
if err != nil {
l.Error(err)
} else {
// Also get the record count and size on disk for each partition
for i, child := range c {
err := db.Get(&c[i].Records, "SELECT COUNT(*) FROM "+child.Table)
if err != nil {
l.Error(err)
}
// pg_size_pretty() will say "bytes" or "kB" etc.
//err = db.Get(&bytesStr, "SELECT pg_size_pretty(pg_total_relation_size('"+child.Table+"'));")
err = db.Get(&c[i].BytesOnDisk, "SELECT pg_total_relation_size('"+child.Table+"');")
if err != nil {
l.Error(err)
}
}
}
return c
}
// Checks parent partition tables to see if any records were inserted there instead of the proper child partition tables. Can be fixed with PartitionDataTime() or PartitionDataId().
func (db DB) CheckParent() []ParentInfo {
ps := []ParentInfo{}
// check_parent() returns a string: (parentTable,4) ... meaning a "parentTable" has 4 records. This needs to be parsed.
res := []struct {
value string `db:"value"`
}{}
// Make the query and get the row(s)
err := db.Select(&res, "SELECT partman.check_parent() AS value")
if err != nil {
l.Error(err)
return ps
}
// Parse each row with regex
r, _ := regexp.Compile(`\((.*)\,([0-9]*)\)`)
for _, record := range res {
pInfo := r.FindStringSubmatch(record.value)
if len(pInfo) == 3 {
recordCount, err := strconv.Atoi(pInfo[2])
if err == nil {
ps = append(ps, ParentInfo{Table: pInfo[1], Records: recordCount})
}
}
}
return ps
}
// This function is used to reapply ownership & grants on all child tables based on what the parent table has set (for large partition sets, this can be a very long running operation).
func (db DB) ReapplyPrivileges(p *Partition) {
m := map[string]interface{}{"table": p.Table}
_, err := db.NamedExec(`SELECT partman.reapply_privileges(:table);`, m)
if err != nil {
l.Error(err)
}
}
// Applies any foreign keys that exist on a parent table in a partition set to all the child tables. This function is automatically called whenever a new child table is created, so there is no need to manually run it unless you need to fix an existing child table.
func (db DB) ApplyForeignKeys(p *Partition, opts ...map[string]interface{}) {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/apply_foreign_keys.sql#L4)
if err := mergo.Merge(&m, map[string]interface{}{"childTable": null.String{}, "debug": false}); err != nil {
l.Error(err)
}
_, err := db.NamedExec(`SELECT partman.apply_foreign_keys(:table, :childTable, :debug);`, m)
if err != nil {
l.Error(err)
}
}
// Sets a retention period on a partition
func (db DB) SetRetention(p *Partition, opts ...map[string]interface{}) {
if p.Retention == "" {
l.Info("No retention period configured.")
return
}
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
// Pull basic arguments (TODO: Maybe allow more to be set)
m := map[string]interface{}{"table": p.Table, "retention": p.Retention, "retentionSchema": p.Options.RetentionSchema, "retentionKeepTable": p.Options.RetentionKeepTable}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.SetRetention); err != nil {
l.Error(err)
}
// Defaults are actually going to come from the existing record in this case
pc := PartConfig{}
err := db.Select(&pc, "SELECT * FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
if err := mergo.Merge(&m, map[string]interface{}{"retention_schema": pc.RetentionSchema, "retention_keep_table": pc.RetentionKeepTable}); err != nil {
l.Error(err)
}
_, err = db.NamedExec(`UPDATE partman.part_config SET retention = :retention, retention_schema = :retentionSchema, retention_keep_table = :retentionKeepTable WHERE parent_table = :table;`, m)
if err != nil {
l.Error(err)
} else {
l.Info("A retention period has been set for " + p.Table + ". Maintenance will remove old child partition tables.")
}
}
}
// Removes retention on a partition. Maintenance will no longer remove old child partition tables.
func (db DB) RemoveRetention(p *Partition) {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
m := map[string]interface{}{"table": p.Table, "retention": null.String{}, "retentionSchema": null.String{}, "retentionKeepTable": true}
_, err = db.NamedExec(`UPDATE partman.part_config SET retention = :retention, retention_schema = :retentionSchema, retention_keep_table = :retentionKeepTable WHERE parent_table = :table;`, m)
if err != nil {
l.Error(err)
} else {
l.Info("The retention period has been removed for " + p.Table + ".")
}
} else {
l.Info("There was no retention period set for " + p.Table + ".")
}
}
// For time based partitions, this fixes/cleans up partitions which may have accidentally had data written to the parent table. Or, maybe it was data before the partition was created.
func (db DB) PartitionDataTime(p *Partition, opts ...map[string]interface{}) {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.PartitionDataTime); err != nil {
l.Error(err)
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/partition_data_time.sql#L4)
if err := mergo.Merge(&m, map[string]interface{}{"batchCount": 1, "batchInterval": null.String{}, "lockWait": 0, "order": "ASC"}); err != nil {
l.Error(err)
}
_, err = db.NamedExec(`SELECT partman.partition_data_time(:table, :batchCount, :batchInterval, :lockWait, :order);`, m)
if err != nil {
l.Error(err)
} else {
l.Info("The partition on " + p.Table + " has been cleaned up. Any data written to the parent has now been moved to child partition tables (if they were available).")
}
} else {
l.Info("There appears to be no partition set for " + p.Table + ".")
}
}
// For id based partitions, this fixes/cleans up partitions which may have accidentally had data written to the parent table. Or, maybe it was data before the partition was created.
func (db DB) PartitionDataId(p *Partition, opts ...map[string]interface{}) {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.PartitionDataId); err != nil {
l.Error(err)
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/partition_data_id.sql#L4)
if err := mergo.Merge(&m, map[string]interface{}{"batchCount": 1, "batchInterval": null.String{}, "lockWait": 0, "order": "ASC"}); err != nil {
l.Error(err)
}
_, err = db.NamedExec(`SELECT partman.partition_data_id(:table, :batchCount, :batchInterval, :lockWait, :order);`, m)
if err != nil {
l.Error(err)
} else {
l.Info("The partition on " + p.Table + " has been cleaned up. Any data written to the parent has now been moved to child partition tables (if they were available).")
}
} else {
l.Info("There appears to be no partition set for " + p.Table + ".")
}
}
// Manually uninherits (and optionally drops) child partition tables from a time based partition set.
func (db DB) DropPartitionTime(p *Partition, opts ...map[string]interface{}) {
//drop_partition_time(p_parent_table text, p_retention interval DEFAULT NULL, p_keep_table boolean DEFAULT NULL, p_keep_index boolean DEFAULT NULL, p_retention_schema text DEFAULT NULL) RETURNS int
//This function is used to drop child tables from a time-based partition set. By default, the table is just uninherited and not actually dropped. For automatically dropping old tables, it is recommended to use the run_maintenance() function with retention configured instead of calling this directly.
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.DropPartitionTime); err != nil {
l.Error(err)
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/drop_partition_time.sql#L5)
if err := mergo.Merge(&m, map[string]interface{}{"retention": null.String{}, "keepTable": null.String{}, "keepIndex": null.String{}, "retentionSchema": null.String{}}); err != nil {
l.Error(err)
}
_, err = db.NamedExec(`SELECT partman.drop_partition_time(:table, :retention, :keepTable, :keepIndex, :retentionSchema);`, m)
if err != nil {
l.Error(err)
} else {
l.Info("The partition on " + p.Table + " has been dropped.")
}
} else {
l.Info("There appears to be no partition set for " + p.Table + ".")
}
}
// Manually uninherits (and optionally drops) a child partition table from an id based partition set.
func (db DB) DropPartitionId(p *Partition, opts ...map[string]interface{}) {
//drop_partition_id(p_parent_table text, p_retention bigint DEFAULT NULL, p_keep_table boolean DEFAULT NULL, p_keep_index boolean DEFAULT NULL, p_retention_schema text DEFAULT NULL) RETURNS int
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM partman.part_config WHERE parent_table = $1", p.Table)
if err != nil {
l.Error(err)
}
// Make sure it exists.
if count > 0 {
// Pull basic arguments
m := map[string]interface{}{"table": p.Table}
// Pull overrides passed to this function (won't come from standalone gopartman, but could from any other package which may use it)
if len(opts) > 0 {
if err := mergo.Merge(&m, opts[0]); err != nil {
l.Error(err)
}
}
// Pull custom function arguments if set in configuration
if err := mergo.Merge(&m, p.Options.Functions.DropPartitionTime); err != nil {
l.Error(err)
}
// Defaults (https://github.com/keithf4/pg_partman/blob/master/sql/functions/drop_partition_id.sql#L5)
if err := mergo.Merge(&m, map[string]interface{}{"retention": null.String{}, "keepTable": null.String{}, "keepIndex": null.String{}, "retentionSchema": null.String{}}); err != nil {
l.Error(err)
}
_, err = db.NamedExec(`SELECT partman.drop_partition_time(:table, :retention, :keepTable, :keepIndex, :retentionSchema);`, m)
if err != nil {
l.Error(err)
} else {
l.Info("The partition on " + p.Table + " has been dropped.")
}
} else {
l.Info("There appears to be no partition set for " + p.Table + ".")
}
}