Skip to content

Commit

Permalink
Merge #46942 #47086
Browse files Browse the repository at this point in the history
46942: hlc: use PTP user space API as HLC clock r=darinpp a=darinpp

Adds support for Linux for using PTP user space API clock
device for HLC. This is needed in case that the host clock is
prone to large jumps (as is the case when using vMotion).

Release note (cli change): added new start option --clock-device that
allows HLC to use PTP user space API clock

47086: colexec: remove lossful typeconv.ToColumnTypes conversion r=yuzefovich a=yuzefovich

Currently, our type conversion is not roundtrippable (i.e. `types.T ->
coltypes.T -> types.T` might return a different SQL type than what we
start with), and that's why this commit now removes
`typeconv.ToColumnType` and `typeconv.ToColumnTypes` methods. The only
callers of them were tests of joiners (which have been slightly
adjusted), but the main benefit is cleaning the API so that we don't
shoot ourselves in the foot later.

Release note: None

Co-authored-by: Darin <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Apr 7, 2020
3 parents 1269d76 + c861a12 + be0629c commit 2032daf
Show file tree
Hide file tree
Showing 12 changed files with 1,285 additions and 1,118 deletions.
4 changes: 3 additions & 1 deletion build/builder/mkrelease.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ case "${1-}" in
XGOARCH=amd64
XCMAKE_SYSTEM_NAME=Linux
TARGET_TRIPLE=x86_64-unknown-linux-gnu
LDFLAGS="-static-libgcc -static-libstdc++"
# -lrt is needed as clock_gettime isn't part of glibc prior to 2.17
# once we update - the -lrt can be removed
LDFLAGS="-static-libgcc -static-libstdc++ -lrt"
SUFFIX=-linux-2.6.32-gnu-amd64
) ;;

Expand Down
5 changes: 5 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ type Config struct {
// connections to determine connection health and update the local view
// of remote clocks.
RPCHeartbeatInterval time.Duration

// Enables the use of an PTP hardware clock user space API for HLC current time.
// This contains the path to the device to be used (i.e. /dev/ptp0)
ClockDevicePath string
}

func wrapError(err error) error {
Expand Down Expand Up @@ -243,6 +247,7 @@ func (cfg *Config) InitDefaults() {
cfg.RPCHeartbeatInterval = defaultRPCHeartbeatInterval
cfg.ClusterName = ""
cfg.DisableClusterNameVerification = false
cfg.ClockDevicePath = ""
}

// HTTPRequestScheme returns "http" or "https" based on the value of
Expand Down
14 changes: 14 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,20 @@ fields.
Description: `Path to the CA key.`,
}

ClockDevice = FlagInfo{
Name: "clock-device",
Description: `
Override HLC to use PTP hardware clock user space API when querying for current time.
The value corresponds to the clock device to be used. This is currently only tested
and supported on Linux.
<PRE>
--clock-device=/dev/ptp0
</PRE>
`,
}

MaxOffset = FlagInfo{
Name: "max-offset",
Description: `
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func init() {
VarFlag(f, &serverCfg.Stores, cliflags.Store)
VarFlag(f, &serverCfg.StorageEngine, cliflags.StorageEngine)
VarFlag(f, &serverCfg.MaxOffset, cliflags.MaxOffset)
StringFlag(f, &serverCfg.ClockDevicePath, cliflags.ClockDevice, "")

StringFlag(f, &startCtx.listeningURLFile, cliflags.ListeningURLFile, startCtx.listeningURLFile)

Expand Down
11 changes: 10 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
panic(errors.New("no tracer set in AmbientCtx"))
}

clock := hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset))
var clock *hlc.Clock
if cfg.ClockDevicePath != "" {
clockSrc, err := hlc.MakeClockSource(context.Background(), cfg.ClockDevicePath)
if err != nil {
return nil, errors.Wrap(err, "instantiating clock source")
}
clock = hlc.NewClock(clockSrc.UnixNano, time.Duration(cfg.MaxOffset))
} else {
clock = hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset))
}
s := &Server{
st: st,
clock: clock,
Expand Down
42 changes: 23 additions & 19 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestExternalHashJoiner(t *testing.T) {
// which the joiner spills to disk.
for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
for _, tcs := range [][]joinTestCase{hjTestCases, mjTestCases} {
for _, tcs := range [][]*joinTestCase{hjTestCases, mjTestCases} {
for _, tc := range tcs {
delegateFDAcquisitions := rng.Float64() < 0.5
t.Run(fmt.Sprintf("spillForced=%t/%s/delegateFDAcquisitions=%t", spillForced, tc.description, delegateFDAcquisitions), func(t *testing.T) {
Expand Down Expand Up @@ -139,15 +139,17 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) {
nBatches := 2
leftSource := newFiniteBatchSource(batch, nBatches)
rightSource := newFiniteBatchSource(batch, nBatches)
spec := createSpecForHashJoiner(joinTestCase{
joinType: sqlbase.JoinType_INNER,
leftTypes: sourceTypes,
leftOutCols: []uint32{0},
leftEqCols: []uint32{0},
rightTypes: sourceTypes,
rightOutCols: []uint32{0},
rightEqCols: []uint32{0},
})
tc := &joinTestCase{
joinType: sqlbase.JoinType_INNER,
leftPhysTypes: sourceTypes,
leftOutCols: []uint32{0},
leftEqCols: []uint32{0},
rightPhysTypes: sourceTypes,
rightOutCols: []uint32{0},
rightEqCols: []uint32{0},
}
tc.init()
spec := createSpecForHashJoiner(tc)
var spilled bool
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(t, true /* inMem */)
defer cleanup()
Expand Down Expand Up @@ -241,15 +243,17 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
if fullOuter {
joinType = sqlbase.JoinType_FULL_OUTER
}
spec := createSpecForHashJoiner(joinTestCase{
joinType: joinType,
leftTypes: sourceTypes,
leftOutCols: []uint32{0, 1},
leftEqCols: []uint32{0, 2},
rightTypes: sourceTypes,
rightOutCols: []uint32{2, 3},
rightEqCols: []uint32{0, 1},
})
tc := &joinTestCase{
joinType: joinType,
leftPhysTypes: sourceTypes,
leftOutCols: []uint32{0, 1},
leftEqCols: []uint32{0, 2},
rightPhysTypes: sourceTypes,
rightOutCols: []uint32{2, 3},
rightEqCols: []uint32{0, 1},
}
tc.init()
spec := createSpecForHashJoiner(tc)
b.Run(name, func(b *testing.B) {
// 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows /
// batch) * nCols (number of columns / row) * 2 (number of sources).
Expand Down
Loading

0 comments on commit 2032daf

Please sign in to comment.