Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67560: authors: add [email protected] to authors. r=rharding6373 a=rharding6373

Release note: None

67569: logictest: add a regression test around apply joins r=yuzefovich a=yuzefovich

This is a reduced version of #66923.

Release note: None

67574: geomfn: fix NULL cases for ST_GeneratePoints r=rafiss a=otan

Resolves #67177

Release note (bug fix): Fix a bug where ST_GeneratePoints returns a
garbage value or an error if an empty geometry or negative nPoints is
given.

67576: colserde: inline a function that is called only in one place r=yuzefovich a=yuzefovich

Release note: None

67577: colrpc: add memory accounting for temporary data in inbox/outbox r=yuzefovich a=yuzefovich

**colrpc: fix memory accounting in inbox a bit**

Previously, we forgot to update the allocator once we have deserialized
the batch in the inbox (i.e. we would register only the original
allocation, upon the batch's creation). This could result in
under-accounting for variable length types, and it is now fixed.

Release note: None

**colrpc: add memory accounting for temporary data in inbox/outbox**

Previously, we were missing the memory accounting for the temporary
usage of serialized/deserialized bytes in the inbox/outbox pair. The
worst offense was that in the outbox we reuse the same scratch Bytes
buffer that is never truncated and can only increase in capacity
throughout the lifetime of the outbox, yet it wasn't accounted for. This
commit fixes those omissions.

Fixes: #67051.

Release note: None

Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
4 people committed Jul 14, 2021
6 parents 8e27cdc + 7936440 + 43293e2 + 7752ddd + bf29aa2 + 32e05a9 commit 774c000
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 87 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ Piotr Zurek <[email protected]>
pocockn <[email protected]>
Pooja Maniar <[email protected]> <[email protected]>
Poornima Malepati <[email protected]> Poornima <[email protected]>
Rachael Harding <[email protected]>
Rachit Srivastava <[email protected]>
Radu Berinde <[email protected]> RaduBerinde <[email protected]>
Rafi Shamim <[email protected]> <[email protected]>
Expand Down
44 changes: 13 additions & 31 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,19 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
nulls.Truncate(batch.Length())
}

data, err := c.batchToArrowSpecialType(vec, n, nulls)
if err != nil {
return nil, err
}
if data != nil {
c.scratch.arrowData[vecIdx] = data
// Bools require special handling.
if typ.Family() == types.BoolFamily {
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
c.scratch.arrowData[vecIdx] = c.builders.boolBuilder.NewBooleanArray().Data()
// Overwrite incorrect null bitmap (that has all values as "valid")
// with the actual null bitmap. Note that if we actually don't have
// any nulls, we use a bitmap with zero length for it in order to
// reduce the size of serialized representation.
var arrowBitmap []byte
if nulls != nil {
arrowBitmap = nulls.NullBitmap()
}
c.scratch.arrowData[vecIdx].Buffers()[0] = memory.NewBufferBytes(arrowBitmap)
continue
}

Expand Down Expand Up @@ -283,31 +290,6 @@ func unsafeCastOffsetsArray(offsetsInt32 []int32, offsetsBytes *[]byte) {
bytesHeader.Cap = int32Header.Cap * sizeOfInt32
}

// batchToArrowSpecialType checks whether the vector requires special handling
// and performs the conversion to the Arrow format if so. If we support "native"
// conversion for this vector, then nil is returned.
func (c *ArrowBatchConverter) batchToArrowSpecialType(
vec coldata.Vec, n int, nulls *coldata.Nulls,
) (*array.Data, error) {
switch typeconv.TypeFamilyToCanonicalTypeFamily(vec.Type().Family()) {
case types.BoolFamily:
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data := c.builders.boolBuilder.NewBooleanArray().Data()
// Overwrite incorrect null bitmap (that has all values as "valid")
// with the actual null bitmap. Note that if we actually don't have
// any nulls, we use a bitmap with zero length for it in order to
// reduce the size of serialized representation.
var arrowBitmap []byte
if nulls != nil {
arrowBitmap = nulls.NullBitmap()
}
data.Buffers()[0] = memory.NewBufferBytes(arrowBitmap)
return data, nil
}

return nil, nil
}

// ArrowToBatch converts []*array.Data to a coldata.Batch. There must not be
// more than coldata.BatchSize() elements in data. It's safe to call ArrowToBatch
// concurrently.
Expand Down
41 changes: 14 additions & 27 deletions pkg/geo/geomfn/generate_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,12 @@ import (
// maxAllowedGridSize is the upper bound limit for a generated grid size.
const maxAllowedGridSize = 100 * geo.MaxAllowedSplitPoints

// ErrGenerateRandomPointsInvalidPoints is returned if we have a negative number of points
// or an empty geometry.
var ErrGenerateRandomPointsInvalidPoints = errors.New("points must be positive and geometry must not be empty")

// GenerateRandomPoints generates provided number of pseudo-random points for the input area.
func GenerateRandomPoints(g geo.Geometry, nPoints int, rng *rand.Rand) (geo.Geometry, error) {
if nPoints < 0 {
return geo.Geometry{}, nil
}
if nPoints > geo.MaxAllowedSplitPoints {
return geo.Geometry{}, errors.Newf(
"failed to generate random points, too many points to generate: requires %d points, max %d",
nPoints,
geo.MaxAllowedSplitPoints,
)
}
pointsAsGeometry, err := generateRandomPoints(g, nPoints, rng)
if err != nil {
return geo.Geometry{}, errors.Wrap(err, "generating random points error")
}
return pointsAsGeometry, nil
}

// generateRandomPoints returns a MultiPoint geometry consisting of randomly generated points
// that are covered by the geometry provided.
// nPoints is the number of points to return.
// rng is the random numbers generator.
func generateRandomPoints(g geo.Geometry, nPoints int, rng *rand.Rand) (geo.Geometry, error) {
var generateRandomPointsFunction func(g geo.Geometry, nPoints int, rng *rand.Rand) (*geom.MultiPoint, error)
switch g.ShapeType() {
case geopb.ShapeType_Polygon:
Expand All @@ -57,17 +39,22 @@ func generateRandomPoints(g geo.Geometry, nPoints int, rng *rand.Rand) (geo.Geom
default:
return geo.Geometry{}, errors.Newf("unsupported type: %v", g.ShapeType().String())
}
// This is to be checked once we know Geometry type is supported,
// so that we can keep consistency with PostGIS implementation.
if nPoints == 0 {
return geo.Geometry{}, nil
if nPoints <= 0 {
return geo.Geometry{}, ErrGenerateRandomPointsInvalidPoints
}
if nPoints > geo.MaxAllowedSplitPoints {
return geo.Geometry{}, errors.Newf(
"failed to generate random points, too many points to generate: requires %d points, max %d",
nPoints,
geo.MaxAllowedSplitPoints,
)
}
empty, err := IsEmpty(g)
if err != nil {
return geo.Geometry{}, errors.Wrap(err, "could not check if geometry is empty")
}
if empty {
return geo.Geometry{}, nil
return geo.Geometry{}, ErrGenerateRandomPointsInvalidPoints
}
mpt, err := generateRandomPointsFunction(g, nPoints, rng)
if err != nil {
Expand Down
32 changes: 16 additions & 16 deletions pkg/geo/geomfn/generate_points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,6 @@ func TestGenerateRandomPoints(t *testing.T) {
args args
want geo.Geometry
}{
{
"number of points to generate less than minimum",
args{geo.MustParseGeometry("POLYGON((0 0, 1 0, 1 1.1, 0 0))"), -1, 1},
geo.Geometry{},
},
{
"supported geometry, zero points to generate",
args{geo.MustParseGeometry("POLYGON((0 0, 1 0, 1 1.1, 0 0))"), 0, 1},
geo.Geometry{},
},
{
"empty geometry",
args{geo.MustParseGeometry("POLYGON EMPTY"), 4, 1},
geo.Geometry{},
},
{
"Polygon - square",
args{geo.MustParseGeometry("POLYGON((1 1,1 2,2 2,2 1,1 1))"), 4, 2},
Expand Down Expand Up @@ -118,7 +103,22 @@ func TestGenerateRandomPoints(t *testing.T) {
{
"generated area is too large",
args{geo.MustParseGeometry("POLYGON((0 0,0 100,0.00001 0.00000001,0.99999 0.00000001,1 100,1 0,0 0))"), 100, 1996},
"generating random points error: generated area is too large: 10001406, max 6533600",
"generated area is too large: 10001406, max 6533600",
},
{
"number of points to generate less than minimum",
args{geo.MustParseGeometry("POLYGON((0 0, 1 0, 1 1.1, 0 0))"), -1, 1},
"points must be positive and geometry must not be empty",
},
{
"supported geometry, zero points to generate",
args{geo.MustParseGeometry("POLYGON((0 0, 1 0, 1 1.1, 0 0))"), 0, 1},
"points must be positive and geometry must not be empty",
},
{
"empty geometry",
args{geo.MustParseGeometry("POLYGON EMPTY"), 4, 1},
"points must be positive and geometry must not be empty",
},
}
for _, tt := range errorTestCases {
Expand Down
18 changes: 14 additions & 4 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ func (i *Inbox) Next() coldata.Batch {
// Protect against Deserialization panics by skipping empty messages.
continue
}
atomic.AddInt64(&i.statsAtomics.bytesRead, int64(len(m.Data.RawBytes)))
numSerializedBytes := int64(len(m.Data.RawBytes))
atomic.AddInt64(&i.statsAtomics.bytesRead, numSerializedBytes)
// Update the allocator since we're holding onto the serialized bytes
// for now.
i.allocator.AdjustMemoryUsage(numSerializedBytes)
i.scratch.data = i.scratch.data[:0]
batchLength, err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes)
// Eagerly throw away the RawBytes memory.
Expand All @@ -340,9 +344,15 @@ func (i *Inbox) Next() coldata.Batch {
// TODO(yuzefovich): refactor this.
const maxBatchMemSize = math.MaxInt64
i.scratch.b, _ = i.allocator.ResetMaybeReallocate(i.typs, i.scratch.b, batchLength, maxBatchMemSize)
if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil {
colexecerror.InternalError(err)
}
i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() {
if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil {
colexecerror.InternalError(err)
}
})
// At this point, we have lost all references to the serialized bytes
// (because ArrowToBatch nils out elements in i.scratch.data once
// processed), so we update the allocator accordingly.
i.allocator.AdjustMemoryUsage(-numSerializedBytes)
atomic.AddInt64(&i.statsAtomics.rowsRead, int64(i.scratch.b.Length()))
return i.scratch.b
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Outbox struct {

typs []*types.T

allocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer

Expand Down Expand Up @@ -99,6 +100,7 @@ func NewOutbox(
// be).
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input, typs)),
typs: typs,
allocator: allocator,
converter: c,
serializer: s,
getStats: getStats,
Expand All @@ -111,6 +113,14 @@ func NewOutbox(
}

func (o *Outbox) close(ctx context.Context) {
o.scratch.buf = nil
o.scratch.msg = nil
// Unset the input (which is a deselector operator) so that its output batch
// could be garbage collected. This allows us to release all memory
// registered with the allocator (the allocator is shared by the outbox and
// the deselector).
o.Input = nil
o.allocator.ReleaseMemory(o.allocator.Used())
o.closers.CloseAndLogOnErr(ctx, "outbox")
}

Expand Down Expand Up @@ -276,14 +286,26 @@ func (o *Outbox) sendBatches(
return
}

o.scratch.buf.Reset()
// Note that for certain types (like Decimals, Intervals,
// datum-backed types) BatchToArrow allocates some memory in order
// to perform the conversion, and we consciously choose to ignore it
// for the purposes of the memory accounting because the references
// to those slices are lost in Serialize call below.
d, err := o.converter.BatchToArrow(batch)
if err != nil {
colexecerror.InternalError(errors.Wrap(err, "Outbox BatchToArrow data serialization error"))
}

oldBufCap := o.scratch.buf.Cap()
o.scratch.buf.Reset()
if _, _, err := o.serializer.Serialize(o.scratch.buf, d, n); err != nil {
colexecerror.InternalError(errors.Wrap(err, "Outbox Serialize data error"))
}
// Account for the increase in the capacity of the scratch buffer.
// Note that because we never truncate the buffer, we are only
// adjusting the memory usage whenever the buffer's capacity
// increases (if it didn't increase, this call becomes a noop).
o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes()

// o.scratch.msg can be reused as soon as Send returns since it returns as
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (a *Allocator) AdjustMemoryUsage(delta int64) {
if err := a.acc.Grow(a.ctx, delta); err != nil {
colexecerror.InternalError(err)
}
} else {
} else if delta < 0 {
a.ReleaseMemory(-delta)
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/apply_join
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,23 @@ FROM
t39433 AS tab_57069;
----
NULL

# Regression test for mixing subqueries in "inner" and "outer" contexts
# (#66923).
query error unimplemented: apply joins with subqueries in the \"inner\" and \"outer\" contexts are not supported
VALUES
(
(
SELECT
(
SELECT
NULL
FROM
(VALUES (tab_54747.col_95055)) AS tab_54752 (col_95061)
WHERE
(SELECT 0) < tab_54752.col_95061
)
FROM
(VALUES (0:::OID), (3790322641:::OID)) AS tab_54747 (col_95055)
)
);
27 changes: 20 additions & 7 deletions pkg/sql/logictest/testdata/logic_test/geospatial
Original file line number Diff line number Diff line change
Expand Up @@ -5716,30 +5716,43 @@ LINESTRING (5 7.5, 10 15)
LINESTRING (10 15, 10 7.5)

statement error pq: max_vertices number cannot be less than 5
SELECT ST_AsText(ST_SubDivide(ST_GeomFromText('POLYGON((1 1, 1 3, 3 3, 3 1, 1 1))'), 4));
SELECT ST_AsText(ST_SubDivide(ST_GeomFromText('POLYGON((1 1, 1 3, 3 3, 3 1, 1 1))'), 4))

query T
SELECT ST_AsText(ST_VoronoiPolygons(ST_GeomFromText('MULTIPOINT(50 30, 60 30, 100 100,10 150, 110 120)')));
SELECT ST_AsText(ST_VoronoiPolygons(ST_GeomFromText('MULTIPOINT(50 30, 60 30, 100 100,10 150, 110 120)')))
----
GEOMETRYCOLLECTION (POLYGON ((-110 43.333333333333321, -110 270, 100.5 270, 59.347826086956523 132.826086956521749, 36.81818181818182 92.272727272727266, -110 43.333333333333321)), POLYGON ((55 -90, -110 -90, -110 43.333333333333321, 36.81818181818182 92.272727272727266, 55 79.285714285714278, 55 -90)), POLYGON ((230 47.5, 230 -20.714285714285733, 55 79.285714285714278, 36.81818181818182 92.272727272727266, 59.347826086956523 132.826086956521749, 230 47.5)), POLYGON ((230 -20.714285714285733, 230 -90, 55 -90, 55 79.285714285714278, 230 -20.714285714285733)), POLYGON ((100.5 270, 230 270, 230 47.5, 59.347826086956523 132.826086956521749, 100.5 270)))

subtest st_voronoilines

query T
SELECT ST_AsText(ST_VoronoiLines(ST_GeomFromText('MULTIPOINT(50 30, 60 30, 100 100,10 150, 110 120)')));
SELECT ST_AsText(ST_VoronoiLines(ST_GeomFromText('MULTIPOINT(50 30, 60 30, 100 100,10 150, 110 120)')))
----
MULTILINESTRING ((100.5 270, 59.347826086956523 132.826086956521749), (59.347826086956523 132.826086956521749, 36.81818181818182 92.272727272727266), (36.81818181818182 92.272727272727266, -110 43.333333333333321), (36.81818181818182 92.272727272727266, 55 79.285714285714278), (55 79.285714285714278, 55 -90), (59.347826086956523 132.826086956521749, 230 47.5), (230 -20.714285714285733, 55 79.285714285714278))

query T
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0,2 5,2.5 4,3 5,3 1,0 0))'::geometry, 5, 1996));
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0,2 5,2.5 4,3 5,3 1,0 0))'::geometry, 5, 1996))
----
MULTIPOINT (0.694657794722715 0.920013195454463, 2.441759392181104 3.716423716858722, 2.797878906884249 3.842501313516636, 1.057760326599197 1.771731314822431, 1.7969577019942 2.428531642176798)

statement error pq: st_generatepoints\(\): generating random points error: zero area input Polygon
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0, 1 1, 1 1, 0 0))'::geometry, 4, 1));
statement error pq: st_generatepoints\(\): zero area input Polygon
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0, 1 1, 1 1, 0 0))'::geometry, 4, 1))

statement error pq: st_generatepoints\(\): seed must be greater than zero
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0,2 5,2.5 4,3 5,3 1,0 0))'::geometry, 5, 0));
SELECT ST_AsText(ST_GeneratePoints('POLYGON((0 0,2 5,2.5 4,3 5,3 1,0 0))'::geometry, 5, 0))

query T
SELECT t AS should_be_null FROM ( VALUES
(ST_GeneratePoints('POLYGON ((0 0, 1 0, 1 1, 0 0))', -1)),
(ST_GeneratePoints('POLYGON ((0 0, 1 0, 1 1, 0 0))', -2, 3)),
(ST_GeneratePoints('POLYGON EMPTY', 2)),
(ST_GeneratePoints('POLYGON EMPTY', 2))
) t(t)
----
NULL
NULL
NULL
NULL

subtest st_orientedenvelope

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/sem/builtins/geo_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,9 @@ Flags shown square brackets after the geometry type have the following meaning:
seed := timeutil.Now().Unix()
generatedPoints, err := geomfn.GenerateRandomPoints(geometry, npoints, rand.New(rand.NewSource(seed)))
if err != nil {
if errors.Is(err, geomfn.ErrGenerateRandomPointsInvalidPoints) {
return tree.DNull, nil
}
return nil, err
}
return tree.NewDGeometry(generatedPoints), nil
Expand All @@ -2154,6 +2157,9 @@ The requested number of points must be not larger than 65336.`,
}
generatedPoints, err := geomfn.GenerateRandomPoints(geometry, npoints, rand.New(rand.NewSource(seed)))
if err != nil {
if errors.Is(err, geomfn.ErrGenerateRandomPointsInvalidPoints) {
return tree.DNull, nil
}
return nil, err
}
return tree.NewDGeometry(generatedPoints), nil
Expand Down

0 comments on commit 774c000

Please sign in to comment.