diff --git a/.circleci/config.yml b/.circleci/config.yml
index 70a6bee..e977a93 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -6,7 +6,7 @@ jobs:
build:
docker:
# specify the version
- - image: circleci/golang:1.9
+ - image: circleci/golang:1.13
# Specify service dependencies here if necessary
# CircleCI maintains a library of pre-built images
@@ -17,13 +17,13 @@ jobs:
#### expecting it in the form of
#### /go/src/github.com/circleci/go-tool
#### /go/src/bitbucket.org/circleci/go-tool
- working_directory: /go/src/github.com/hellobike/amazonriver
+ working_directory: /go/proj/github.com/hellobike/amazonriver
steps:
- checkout
# specify any bash command here prefixed with `run: `
- - run: go get -u -v -t github.com/Masterminds/glide
- - run: glide install
- - run: go test -v -cover ./...
- run: go get -u github.com/mattn/goveralls
- - run: goveralls -service=circleci -repotoken $COVERALLS_TOKEN
\ No newline at end of file
+ - run: go get -u golang.org/x/tools/cmd/cover
+ - run: GO111MODULE=on go mod vendor
+ - run: go test -v -covermode=count -coverprofile=coverage.out ./...
+ - run: goveralls -coverprofile=coverage.out -service=circleci -repotoken $COVERALLS_TOKEN
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 5497a08..8c148a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -131,4 +131,7 @@ $RECYCLE.BIN/
# Windows shortcuts
*.lnk
-# End of https://www.gitignore.io/api/go,osx,linux,windows,sublimetext,visualstudiocode
\ No newline at end of file
+# End of https://www.gitignore.io/api/go,osx,linux,windows,sublimetext,visualstudiocode
+.idea/inspectionProfiles/
+.idea/workspace.xml
+amazonriver
\ No newline at end of file
diff --git a/.idea/amazonriver.iml b/.idea/amazonriver.iml
new file mode 100644
index 0000000..c956989
--- /dev/null
+++ b/.idea/amazonriver.iml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..c741a3b
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/glide.lock b/glide.lock
deleted file mode 100644
index 2979184..0000000
--- a/glide.lock
+++ /dev/null
@@ -1,113 +0,0 @@
-hash: eeb3688936aa4fc40c3eb7dfca34f8a2af5dd06b796c6b69e85d039033406c24
-updated: 2018-12-26T18:40:36.292070002+08:00
-imports:
-- name: github.com/beorn7/perks
- version: 3a771d992973f24aa725d07868b467d1ddfceafb
- subpackages:
- - quantile
-- name: github.com/DataDog/zstd
- version: c7161f8c63c045cbc7ca051dcc969dd0e4054de2
-- name: github.com/davecgh/go-spew
- version: d8f796af33cc11cb798c1aaeb27a4ebc5099927d
- subpackages:
- - spew
-- name: github.com/eapache/go-resiliency
- version: 487be0453c7b062bff8dcd0ca2570f09e780a9e2
- subpackages:
- - breaker
-- name: github.com/eapache/go-xerial-snappy
- version: 776d5712da21bc4762676d614db1d8a64f4238b0
-- name: github.com/eapache/queue
- version: 093482f3f8ce946c05bcba64badd2c82369e084d
-- name: github.com/golang/protobuf
- version: 1d3f30b51784bec5aad268e59fd3c2fc1c2fe73f
- subpackages:
- - proto
-- name: github.com/golang/snappy
- version: 2e65f85255dbc3072edf28d6b5b8efc472979f5a
-- name: github.com/jackc/pgx
- version: c59c9cac59ab95eceb0c12ff338923c62f411ea2
- subpackages:
- - chunkreader
- - internal/sanitize
- - pgio
- - pgproto3
- - pgtype
-- name: github.com/json-iterator/go
- version: 1624edc4454b8682399def8740d46db5e4362ba4
-- name: github.com/konsorten/go-windows-terminal-sequences
- version: 5c8c8bd35d3832f5d134ae1e1e375b69a4d25242
-- name: github.com/mailru/easyjson
- version: 60711f1a8329503b04e1c88535f419d0bb440bff
- subpackages:
- - buffer
- - jlexer
- - jwriter
-- name: github.com/matttproud/golang_protobuf_extensions
- version: c12348ce28de40eed0136aa2b644d0ee0650e56c
- subpackages:
- - pbutil
-- name: github.com/modern-go/concurrent
- version: bacd9c7ef1dd9b15be4a9909b8ac7a4e313eec94
-- name: github.com/modern-go/reflect2
- version: 94122c33edd36123c84d5368cfb2b69df93a0ec8
-- name: github.com/nickelser/parselogical
- version: b07373e53c91bdd5f918a10504c1329616d4da81
-- name: github.com/olivere/elastic
- version: 1619150b007041b6dba8aa447f0e2d151cc2b4c5
- subpackages:
- - config
- - uritemplates
-- name: github.com/pierrec/lz4
- version: 623b5a2f4d2a41e411730dcdfbfdaeb5c0c4564e
- subpackages:
- - internal/xxh32
-- name: github.com/pkg/errors
- version: 059132a15dd08d6704c67711dae0cf35ab991756
-- name: github.com/prometheus/client_golang
- version: 505eaef017263e299324067d40ca2c48f6a2cf50
- subpackages:
- - prometheus
- - prometheus/internal
- - prometheus/promhttp
-- name: github.com/prometheus/client_model
- version: 5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f
- subpackages:
- - go
-- name: github.com/prometheus/common
- version: 67670fe90761d7ff18ec1d640135e53b9198328f
- subpackages:
- - expfmt
- - internal/bitbucket.org/ww/goautoneg
- - model
-- name: github.com/prometheus/procfs
- version: 1dc9a6cbc91aacc3e8b2d63db4d2e957a5394ac4
- subpackages:
- - internal/util
- - nfs
- - xfs
-- name: github.com/rcrowley/go-metrics
- version: 3113b8401b8a98917cde58f8bbd42a1b1c03b1fd
-- name: github.com/Shopify/sarama
- version: 879f631812a30a580659e8035e7cda9994bb99ac
-- name: github.com/sirupsen/logrus
- version: bcd833dfe83d3cebad139e4a29ed79cb2318bf95
-- name: github.com/xwb1989/sqlparser
- version: 120387863bf27d04bc07db8015110a6e96d0146c
- subpackages:
- - dependency/bytes2
- - dependency/hack
- - dependency/querypb
- - dependency/sqltypes
-- name: golang.org/x/crypto
- version: 505ab145d0a99da450461ae2c1a9f6cd10d1f447
- repo: https://github.com/golang/crypto
- subpackages:
- - ssh/terminal
-- name: golang.org/x/sys
- version: b4a75ba826a64a70990f11a225237acd6ef35c9f
- repo: https://github.com/golang/sys
- subpackages:
- - unix
- - windows
-testImports: []
diff --git a/glide.yaml b/glide.yaml
deleted file mode 100644
index 6bbcd9f..0000000
--- a/glide.yaml
+++ /dev/null
@@ -1,27 +0,0 @@
-package: github.com/hellobike/amazonriver
-import:
-- package: github.com/Shopify/sarama
- version: ~1.20.0
-- package: github.com/jackc/pgx
- version: ~3.3.0
-- package: github.com/json-iterator/go
- version: ~1.1.5
-- package: github.com/olivere/elastic
- version: ~6.2.14
-- package: github.com/xwb1989/sqlparser
-- package: github.com/nickelser/parselogical
-- package: github.com/prometheus/client_golang
- version: ~0.9.2
-- package: github.com/prometheus/common
-- package: github.com/sirupsen/logrus
- version: ~1.2.0
-
-- package: golang.org/x/crypto
- repo: https://github.com/golang/crypto
- subpackages:
- - ssh/terminal
-- package: golang.org/x/sys
- repo: https://github.com/golang/sys
- subpackages:
- - unix
- - windows
\ No newline at end of file
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..1dde822
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,22 @@
+module github.com/hellobike/amazonriver
+
+go 1.13
+
+require (
+ github.com/Shopify/sarama v1.23.1
+ github.com/cockroachdb/apd v1.1.0 // indirect
+ github.com/fortytw2/leaktest v1.3.0 // indirect
+ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
+ github.com/jackc/pgx v3.5.0+incompatible
+ github.com/json-iterator/go v1.1.7
+ github.com/lib/pq v1.2.0 // indirect
+ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect
+ github.com/nickelser/parselogical v0.0.0-20171014195826-b07373e53c91
+ github.com/olivere/elastic v6.2.23+incompatible
+ github.com/prometheus/client_golang v1.1.0
+ github.com/satori/go.uuid v1.2.0 // indirect
+ github.com/shopspring/decimal v0.0.0-20190905144223-a36b5d85f337 // indirect
+ github.com/sirupsen/logrus v1.4.2
+ github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
+ gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..ac8c747
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,137 @@
+github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
+github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
+github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
+github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
+github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
+github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
+github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
+github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
+github.com/jackc/pgx v3.5.0+incompatible h1:BRJ4G3UPtvml5R1ey0biqqGuYUGayMYekm3woO75orY=
+github.com/jackc/pgx v3.5.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
+github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
+github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
+github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
+github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e h1:hB2xlXdHp/pmPZq0y3QnmWAArdw9PqbmotexnWx/FU8=
+github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
+github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nickelser/parselogical v0.0.0-20171014195826-b07373e53c91 h1:xeJWBTrDLjgYa3AY4fO0DFDsunzmzkmzLVyZEjcIzdw=
+github.com/nickelser/parselogical v0.0.0-20171014195826-b07373e53c91/go.mod h1:YRjwqwchea1kJ+rhRSOFfAjjLRiN/nG0s+5IJv3YYdw=
+github.com/olivere/elastic v6.2.23+incompatible h1:oRGUA/8fKcnkDcqLuwGb5YCzgbgEBo+Y9gamsWqZ0qU=
+github.com/olivere/elastic v6.2.23+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
+github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
+github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
+github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.1.0 h1:BQ53HtBmfOitExawJ6LokA4x8ov/z0SYYb0+HxJfRI8=
+github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
+github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
+github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE=
+github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
+github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
+github.com/shopspring/decimal v0.0.0-20190905144223-a36b5d85f337 h1:Da9XEUfFxgyDOqUfwgoTDcWzmnlOnCGi6i4iPS+8Fbw=
+github.com/shopspring/decimal v0.0.0-20190905144223-a36b5d85f337/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
+github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
+github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 h1:zzrxE1FKn5ryBNl9eKOeqQ58Y/Qpo3Q9QNxKHX5uzzQ=
+github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2/go.mod h1:hzfGeIUDq/j97IG+FhNqkowIyEcD88LrW6fyU3K3WqY=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
+gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
+gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/main.go b/main.go
index 4710258..fe241c0 100644
--- a/main.go
+++ b/main.go
@@ -30,7 +30,6 @@ import (
var configfile = flag.String("config", "", "config")
var loglevel = flag.String("level", "debug", "log level")
-
func main() {
flag.Parse()
diff --git a/river/stream.go b/river/stream.go
index c1c542b..872e81d 100644
--- a/river/stream.go
+++ b/river/stream.go
@@ -40,7 +40,8 @@ type stream struct {
// 订阅配置
sub *conf.Subscribe
// 当前 wal 位置
- maxWal uint64
+ receivedWal uint64
+ flushWal uint64
// 复制连接
replicationConn *pgx.ReplicationConn
// 消息处理
@@ -53,17 +54,29 @@ type stream struct {
datas []*model.WalData
}
-func (s *stream) getMaxWal() uint64 {
- return atomic.LoadUint64(&s.maxWal)
+func (s *stream) getReceivedWal() uint64 {
+ return atomic.LoadUint64(&s.receivedWal)
}
-func (s *stream) setMaxWal(val uint64) {
- atomic.StoreUint64(&s.maxWal, val)
+func (s *stream) setReceivedWal(val uint64) {
+ atomic.StoreUint64(&s.receivedWal, val)
+}
+
+func (s *stream) getFlushWal() uint64 {
+ return atomic.LoadUint64(&s.flushWal)
+}
+
+func (s *stream) setFlushWal(val uint64) {
+ atomic.StoreUint64(&s.flushWal, val)
+}
+
+func (s *stream) getStatus() (*pgx.StandbyStatus, error) {
+ return pgx.NewStandbyStatus(s.getReceivedWal(), s.getFlushWal(), s.getFlushWal())
}
func newStream(pgDump string, sub *conf.Subscribe) *stream {
var ret = &stream{pgDump: pgDump, sub: sub, dump: sub.Dump}
- ret.handler = handler.NewHandler(sub, ret.setMaxWal)
+ ret.handler = handler.NewHandler(sub, ret.setFlushWal)
return ret
}
@@ -94,7 +107,7 @@ func (s *stream) start(ctx context.Context, wg *sync.WaitGroup) error {
}
}
- s.sendStatus()
+ _ = s.sendStatus()
// Handle old data from db
if err := s.exportSnapshot(snapshotID); err != nil {
@@ -113,8 +126,7 @@ func (s *stream) start(ctx context.Context, wg *sync.WaitGroup) error {
func (s *stream) stop() error {
s.cancel()
s.handler.Stop()
- s.replicationConn.Close()
- return nil
+ return s.replicationConn.Close()
}
func (s *stream) exportSnapshot(snapshotID string) error {
@@ -134,7 +146,7 @@ func (s *stream) runloop(ctx context.Context) error {
for {
select {
case <-ticker.C:
- s.sendStatus()
+ _ = s.sendStatus()
case <-ctx.Done():
return
}
@@ -190,7 +202,7 @@ func (s *stream) checkAndResetConn() error {
}
if err := conn.StartReplication(s.sub.SlotName, 0, -1); err != nil {
- conn.Close()
+ _ = conn.Close()
return err
}
@@ -201,6 +213,18 @@ func (s *stream) checkAndResetConn() error {
// ReplicationMsgHandle handle replication msg
func (s *stream) replicationMsgHandle(msg *pgx.ReplicationMessage) error {
+
+ // 回复心跳
+ if msg.ServerHeartbeat != nil {
+
+ if msg.ServerHeartbeat.ServerWalEnd > s.getReceivedWal() {
+ s.setReceivedWal(msg.ServerHeartbeat.ServerWalEnd)
+ }
+ if msg.ServerHeartbeat.ReplyRequested == 1 {
+ _ = s.sendStatus()
+ }
+ }
+
if msg.WalMessage != nil {
logmsg, err := model.Parse(msg.WalMessage)
@@ -214,12 +238,6 @@ func (s *stream) replicationMsgHandle(msg *pgx.ReplicationMessage) error {
}
}
- // 回复心跳
- if msg.ServerHeartbeat != nil {
- if msg.ServerHeartbeat.ReplyRequested == 1 {
- s.sendStatus()
- }
- }
return nil
}
@@ -240,7 +258,7 @@ func (s *stream) handleMessage(data *model.WalData) (err error) {
}
if needFlush {
- s.flush()
+ _ = s.flush()
}
return nil
@@ -248,7 +266,7 @@ func (s *stream) handleMessage(data *model.WalData) (err error) {
func (s *stream) flush() error {
if len(s.datas) > 0 {
- s.handler.Handle(s.datas...)
+ _ = s.handler.Handle(s.datas...)
s.datas = nil
}
return nil
@@ -260,7 +278,7 @@ func (s *stream) sendStatus() error {
defer s.sendStatusLock.Unlock()
log.Logger.Debug("send heartbeat")
- status, err := pgx.NewStandbyStatus(s.getMaxWal())
+ status, err := s.getStatus()
if err != nil {
return err
}