Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize gzip compression #1293

Merged
merged 78 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
0b089ef
WIP
Mar 2, 2023
e4e513b
remove max size from NewZstdCompressor
Mar 2, 2023
ef84d36
WIP support multiple compression types
Mar 2, 2023
4a659b0
rename compressionType to Type
Mar 2, 2023
c6de927
fix metric
Mar 2, 2023
13eb890
WIP remove CompressionEnabled and add --network-compression-type
Mar 2, 2023
bc55ea4
rename types
Mar 2, 2023
400a918
add zstd compression/decompression metrics
Mar 3, 2023
6f8bd3e
Merge remote-tracking branch 'origin/dev' into add-zstd-compression
Mar 3, 2023
38e7a12
don't allow 2 network compression flags
Mar 3, 2023
3850c79
remove benchmark
Mar 3, 2023
6065be8
cleanup
Mar 3, 2023
d7c5d52
don't use zstd until v1.10
Mar 7, 2023
385ba27
tweak error message
Mar 7, 2023
37bcf39
nit
Mar 7, 2023
81afb71
nits
Mar 7, 2023
4da5f21
nits
Mar 7, 2023
d8a9ccf
Merge remote-tracking branch 'origin/dev' into add-zstd-compression
Mar 8, 2023
d9b778c
flag wording nit
Mar 8, 2023
41bc6c5
add zstd tests; fix bugs
Mar 8, 2023
11c171d
consolidate metrics
Mar 8, 2023
ddd35ee
remove old todo
Mar 8, 2023
6dd3f25
update test
Mar 8, 2023
eab4879
update tests
Mar 8, 2023
1ad19c1
add tests
Mar 8, 2023
6b29cfc
appease linter
Mar 8, 2023
7c150cd
Merge branch 'dev' into add-zstd-compression
Mar 13, 2023
874632a
Merge remote-tracking branch 'origin/dev' into add-zstd-compression
Mar 15, 2023
a9a96f6
Merge branch 'add-zstd-compression' of github.com:ava-labs/avalancheg…
Mar 15, 2023
9526ba9
nits
Mar 15, 2023
e1f0d55
Merge branch 'dev' into add-zstd-compression
Mar 15, 2023
c000a83
Merge branch 'dev' into add-zstd-compression
Mar 21, 2023
483f040
use default compression type in tests
Mar 21, 2023
041c37e
Merge branch 'add-zstd-compression' of github.com:ava-labs/avalancheg…
Mar 21, 2023
6b3f9a3
flag nit
Mar 21, 2023
5325c6b
only allow zstd after cortina
Mar 21, 2023
4345b1e
Merge remote-tracking branch 'upstream/dev' into add-zstd-compression
Mar 31, 2023
51d3cbb
merged
StephenButtolph Apr 2, 2023
9dc7108
Merge remote-tracking branch 'upstream/dev' into add-zstd-compression
Apr 3, 2023
b401ea6
address PR comments
Apr 3, 2023
b0225da
remove switch
Apr 3, 2023
45d9b3f
Merge branch 'add-zstd-compression' of github.com:ava-labs/avalancheg…
Apr 3, 2023
8472760
add max message size test
Apr 3, 2023
0e61411
add max message size to zstd
Apr 3, 2023
32d8d37
add test
Apr 3, 2023
72a0b7c
Merge remote-tracking branch 'upstream/dev' into add-zstd-compression
Apr 3, 2023
db6504a
remove switch
Apr 4, 2023
53df4c6
use stream interface for Decompress to avoid unzip bomb
Apr 4, 2023
adbc82a
fix copy pasta bug
Apr 4, 2023
e510398
move switch case to default
Apr 4, 2023
82d7f68
remove impossible switch case
Apr 4, 2023
bdcf189
appease linter
Apr 4, 2023
b91159d
make reader a local var
Apr 4, 2023
c259199
return nit
Apr 4, 2023
9105741
add invalid max size check to zstd compressor creation
Apr 4, 2023
8ad8fe6
Merge branch 'dev' into add-zstd-compression
StephenButtolph Apr 4, 2023
eaec3ef
Parallelize gzip compression
StephenButtolph Apr 4, 2023
c6c6fdf
merged
StephenButtolph Apr 4, 2023
c6f4800
log warning for unknown op during metrics observation
Apr 5, 2023
8ad1cb6
use sync.Pool of gzip writer
Apr 5, 2023
8c499af
gzipWriter.Flush() --> gzipWriter.Close()
Apr 5, 2023
812a37d
test cleanup
Apr 5, 2023
084be58
Merge remote-tracking branch 'upstream/add-zstd-compression' into sim…
Apr 5, 2023
a0c1dda
appease linter
Apr 5, 2023
c1d7a98
Merge remote-tracking branch 'upstream/add-zstd-compression' into sim…
Apr 5, 2023
26e7235
remove magic number
Apr 5, 2023
f3f7785
Merge remote-tracking branch 'upstream/add-zstd-compression' into sim…
Apr 5, 2023
a2cc314
Merge branch 'dev' into add-zstd-compression
StephenButtolph Apr 5, 2023
436685b
imports nit
StephenButtolph Apr 5, 2023
d30e0a9
Merge remote-tracking branch 'upstream/add-zstd-compression' into sim…
Apr 5, 2023
b40e14b
nit
StephenButtolph Apr 5, 2023
0c0a396
test nits
Apr 5, 2023
422a840
Merge remote-tracking branch 'upstream/add-zstd-compression' into sim…
Apr 5, 2023
0fca08a
nits
StephenButtolph Apr 5, 2023
dfc909d
nit
StephenButtolph Apr 5, 2023
781023e
Merge branch 'add-zstd-compression' into simplify-gzip-compression
StephenButtolph Apr 5, 2023
e550864
merged
StephenButtolph Apr 6, 2023
0f43f37
Merge branch 'dev' into simplify-gzip-compression
StephenButtolph Apr 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ava-labs/avalanchego/staking"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/dynamicip"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/storage"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/platformvm/reward"
"github.com/ava-labs/avalanchego/vms/proposervm"
)
Expand All @@ -62,7 +64,9 @@ const (

var (
// Deprecated key --> deprecation message (i.e. which key replaces it)
deprecatedKeys = map[string]string{}
deprecatedKeys = map[string]string{
NetworkCompressionEnabledKey: fmt.Sprintf("use --%s instead", NetworkCompressionTypeKey),
}

errInvalidStakerWeights = errors.New("staking weights must be positive")
errStakingDisableOnPublicNetwork = errors.New("staking disabled on public network")
Expand All @@ -81,6 +85,7 @@ var (
errMissingStakingSigningKeyFile = errors.New("missing staking signing key file")
errTracingEndpointEmpty = fmt.Errorf("%s cannot be empty", TracingEndpointKey)
errPluginDirNotADirectory = errors.New("plugin dir is not a directory")
errZstdNotSupported = errors.New("zstd compression not supported until v1.10")
)

func getConsensusConfig(v *viper.Viper) avalanche.Parameters {
Expand Down Expand Up @@ -302,13 +307,45 @@ func getGossipConfig(v *viper.Viper) subnets.GossipConfig {
}
}

func getNetworkConfig(v *viper.Viper, stakingEnabled bool, halflife time.Duration) (network.Config, error) {
func getNetworkConfig(
v *viper.Viper,
stakingEnabled bool,
halflife time.Duration,
networkID uint32, // TODO remove after cortina upgrade
) (network.Config, error) {
// Set the max number of recent inbound connections upgraded to be
// equal to the max number of inbound connections per second.
maxInboundConnsPerSec := v.GetFloat64(InboundThrottlerMaxConnsPerSecKey)
upgradeCooldown := v.GetDuration(InboundConnUpgradeThrottlerCooldownKey)
upgradeCooldownInSeconds := upgradeCooldown.Seconds()
maxRecentConnsUpgraded := int(math.Ceil(maxInboundConnsPerSec * upgradeCooldownInSeconds))

var (
compressionType compression.Type
err error
)
if v.IsSet(NetworkCompressionTypeKey) {
if v.IsSet(NetworkCompressionEnabledKey) {
return network.Config{}, fmt.Errorf("cannot set both %q and %q", NetworkCompressionTypeKey, NetworkCompressionEnabledKey)
}

compressionType, err = compression.TypeFromString(v.GetString(NetworkCompressionTypeKey))
if err != nil {
return network.Config{}, err
}
} else {
if v.GetBool(NetworkCompressionEnabledKey) {
compressionType = constants.DefaultNetworkCompressionType
} else {
compressionType = compression.TypeNone
}
}

cortinaTime := version.GetCortinaTime(networkID)
if compressionType == compression.TypeZstd && !time.Now().After(cortinaTime) {
// TODO remove after cortina upgrade
return network.Config{}, errZstdNotSupported
}
config := network.Config{
// Throttling
ThrottlerConfig: network.ThrottlerConfig{
Expand Down Expand Up @@ -383,7 +420,7 @@ func getNetworkConfig(v *viper.Viper, stakingEnabled bool, halflife time.Duratio
},

MaxClockDifference: v.GetDuration(NetworkMaxClockDifferenceKey),
CompressionEnabled: v.GetBool(NetworkCompressionEnabledKey),
CompressionType: compressionType,
PingFrequency: v.GetDuration(NetworkPingFrequencyKey),
AllowPrivateIPs: v.GetBool(NetworkAllowPrivateIPsKey),
UptimeMetricFreq: v.GetDuration(UptimeMetricFreqKey),
Expand Down Expand Up @@ -1345,7 +1382,7 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Network Config
nodeConfig.NetworkConfig, err = getNetworkConfig(v, nodeConfig.EnableStaking, healthCheckAveragerHalflife)
nodeConfig.NetworkConfig, err = getNetworkConfig(v, nodeConfig.EnableStaking, healthCheckAveragerHalflife, nodeConfig.NetworkID)
if err != nil {
return node.Config{}, err
}
Expand Down
3 changes: 3 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/genesis"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/ulimit"
"github.com/ava-labs/avalanchego/utils/units"
Expand Down Expand Up @@ -152,6 +153,8 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(NetworkPingFrequencyKey, constants.DefaultPingFrequency, "Frequency of pinging other peers")

fs.Bool(NetworkCompressionEnabledKey, constants.DefaultNetworkCompressionEnabled, "If true, compress certain outbound messages. This node will be able to parse compressed inbound messages regardless of this flag's value")
fs.String(NetworkCompressionTypeKey, constants.DefaultNetworkCompressionType.String(), fmt.Sprintf("Compression type for outbound messages. Must be one of [%s, %s, %s]", compression.TypeGzip, compression.TypeZstd, compression.TypeNone))

fs.Duration(NetworkMaxClockDifferenceKey, constants.DefaultNetworkMaxClockDifference, "Max allowed clock difference value between this node and peers")
fs.Bool(NetworkAllowPrivateIPsKey, constants.DefaultNetworkAllowPrivateIPs, "Allows the node to initiate outbound connection attempts to peers with private IPs")
fs.Bool(NetworkRequireValidatorToConnectKey, constants.DefaultNetworkRequireValidatorToConnect, "If true, this node will only maintain a connection with another node if this node is a validator, the other node is a validator, or the other node is a beacon")
Expand Down
3 changes: 2 additions & 1 deletion config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ const (
NetworkPingTimeoutKey = "network-ping-timeout"
NetworkPingFrequencyKey = "network-ping-frequency"
NetworkMaxReconnectDelayKey = "network-max-reconnect-delay"
NetworkCompressionEnabledKey = "network-compression-enabled"
NetworkCompressionEnabledKey = "network-compression-enabled" // TODO this is deprecated. Eventually remove it and constants.DefaultNetworkCompressionEnabled
NetworkCompressionTypeKey = "network-compression-type"
NetworkMaxClockDifferenceKey = "network-max-clock-difference"
NetworkAllowPrivateIPsKey = "network-allow-private-ips"
NetworkRequireValidatorToConnectKey = "network-require-validator-to-connect"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module github.com/ava-labs/avalanchego
go 1.19

require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/avalanche-network-runner-sdk v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
Expand Down
9 changes: 7 additions & 2 deletions message/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/logging"
)

var _ Creator = (*creator)(nil)
Expand All @@ -23,13 +26,15 @@ type creator struct {
}

func NewCreator(
log logging.Logger,
metrics prometheus.Registerer,
parentNamespace string,
compressionEnabled bool,
compressionType compression.Type,
maxMessageTimeout time.Duration,
) (Creator, error) {
namespace := fmt.Sprintf("%s_codec", parentNamespace)
builder, err := newMsgBuilder(
log,
namespace,
metrics,
maxMessageTimeout,
Expand All @@ -39,7 +44,7 @@ func NewCreator(
}

return &creator{
OutboundMsgBuilder: newOutboundBuilder(compressionEnabled, builder),
OutboundMsgBuilder: newOutboundBuilder(compressionType, builder),
InboundMsgBuilder: newInboundBuilder(builder),
}, nil
}
2 changes: 2 additions & 0 deletions message/inbound_msg_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)

Expand All @@ -21,6 +22,7 @@ func Test_newMsgBuilder(t *testing.T) {
require := require.New(t)

mb, err := newMsgBuilder(
logging.NoLog{},
"test",
prometheus.NewRegistry(),
10*time.Second,
Expand Down
Loading