Skip to content

Commit

Permalink
apacheGH-36954: [Python] Add more FlightInfo / FlightEndpoint attribu…
Browse files Browse the repository at this point in the history
…tes (apache#43537)

### Rationale for this change
The C++ classes `FlightInfo` and `FlightEndpoint` have attributes that are not available via the Python API.

### What changes are included in this PR?
Make the following attributes available in Python:
- `FlightInfo.ordered`
- `FlightInfo.app_metadata`
- `FlightEndpoint.expiration_time`
- `FlightEndpoint.app_metadata`

Also makes existing attributes optional in constructor:
- `FlightInfo.total_records`
- `FlightInfo.total_bytes`

### Are these changes tested?
Existing tests that test existing attributes are extended.

### Are there any user-facing changes?
Yes, changes are backward compatible.
* GitHub Issue: apache#36954

Lead-authored-by: Enrico Minack <[email protected]>
Co-authored-by: Adam Reeve <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
4 people authored Nov 7, 2024
1 parent 32de498 commit b193c4f
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 42 deletions.
3 changes: 3 additions & 0 deletions python/examples/flight/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def list_flights(args, client, connection_args={}):
else:
print("Unknown")

print(f"Data are {'ordered' if flight.ordered else 'not ordered'}")
print("App metadata:", flight.app_metadata)

print("Number of endpoints:", len(flight.endpoints))
print("Schema:")
print(flight.schema)
Expand Down
108 changes: 95 additions & 13 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from libcpp cimport bool as c_bool
from pyarrow.lib cimport *
from pyarrow.lib import (ArrowCancelled, ArrowException, ArrowInvalid,
SignalStopHandler)
from pyarrow.lib import as_buffer, frombytes, tobytes
from pyarrow.lib import as_buffer, frombytes, timestamp, tobytes
from pyarrow.includes.libarrow_flight cimport *
from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
import pyarrow.lib as lib
Expand Down Expand Up @@ -704,7 +704,7 @@ cdef class FlightEndpoint(_Weakrefable):
cdef:
CFlightEndpoint endpoint

def __init__(self, ticket, locations):
def __init__(self, ticket, locations, expiration_time=None, app_metadata=""):
"""Create a FlightEndpoint from a ticket and list of locations.
Parameters
Expand All @@ -713,6 +713,12 @@ cdef class FlightEndpoint(_Weakrefable):
the ticket needed to access this flight
locations : list of string URIs
locations where this flight is available
expiration_time : TimestampScalar, default None
Expiration time of this stream. If present, clients may assume
they can retry DoGet requests. Otherwise, clients should avoid
retrying DoGet requests.
app_metadata : bytes or str, default ""
Application-defined opaque metadata.
Raises
------
Expand All @@ -724,28 +730,75 @@ cdef class FlightEndpoint(_Weakrefable):

if isinstance(ticket, Ticket):
self.endpoint.ticket.ticket = tobytes(ticket.ticket)
else:
elif isinstance(ticket, (str, bytes)):
self.endpoint.ticket.ticket = tobytes(ticket)
else:
raise TypeError("Argument ticket must be a Ticket instance, string or bytes, "
"not '{}'".format(type(ticket)))

for location in locations:
if isinstance(location, Location):
c_location = (<Location> location).location
else:
elif isinstance(location, (str, bytes)):
c_location = CLocation()
check_flight_status(
CLocation.Parse(tobytes(location)).Value(&c_location))
else:
raise TypeError("Argument locations must contain Location instances, strings or bytes, "
"not '{}'".format(type(location)))
self.endpoint.locations.push_back(c_location)

if expiration_time is not None:
if isinstance(expiration_time, lib.TimestampScalar):
# Convert into OS-dependent std::chrono::system_clock::time_point from
# std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>
# See Timestamp in cpp/src/arrow/flight/types.h
self.endpoint.expiration_time = TimePoint_to_system_time(TimePoint_from_ns(
expiration_time.cast(timestamp("ns")).value))
else:
raise TypeError("Argument expiration_time must be a TimestampScalar, "
"not '{}'".format(type(expiration_time)))

if not isinstance(app_metadata, (str, bytes)):
raise TypeError("Argument app_metadata must be a string or bytes, "
"not '{}'".format(type(app_metadata)))
self.endpoint.app_metadata = tobytes(app_metadata)

@property
def ticket(self):
"""Get the ticket in this endpoint."""
return Ticket(self.endpoint.ticket.ticket)

@property
def locations(self):
"""Get locations where this flight is available."""
return [Location.wrap(location)
for location in self.endpoint.locations]

@property
def expiration_time(self):
"""Get the expiration time of this stream.
If present, clients may assume they can retry DoGet requests.
Otherwise, clients should avoid retrying DoGet requests.
"""
cdef:
int64_t time_since_epoch
if self.endpoint.expiration_time.has_value():
time_since_epoch = TimePoint_to_ns(
# Convert from OS-dependent std::chrono::system_clock::time_point into
# std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>
# See Timestamp in cpp/src/arrow/flight/types.h
TimePoint_from_system_time(self.endpoint.expiration_time.value()))
return lib.scalar(time_since_epoch, timestamp("ns", "UTC"))
return None

@property
def app_metadata(self):
"""Get application-defined opaque metadata."""
return self.endpoint.app_metadata

def serialize(self):
"""Get the wire-format representation of this type.
Expand All @@ -770,7 +823,9 @@ cdef class FlightEndpoint(_Weakrefable):

def __repr__(self):
return (f"<pyarrow.flight.FlightEndpoint ticket={self.ticket!r} "
f"locations={self.locations!r}>")
f"locations={self.locations!r} "
f"expiration_time={self.expiration_time} "
f"app_metadata={self.app_metadata}>")

def __eq__(self, FlightEndpoint other):
return self.endpoint == other.endpoint
Expand Down Expand Up @@ -844,7 +899,7 @@ cdef class FlightInfo(_Weakrefable):
return obj

def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
total_records, total_bytes):
total_records=None, total_bytes=None, ordered=False, app_metadata=""):
"""Create a FlightInfo object from a schema, descriptor, and endpoints.
Parameters
Expand All @@ -855,10 +910,14 @@ cdef class FlightInfo(_Weakrefable):
the descriptor for this flight.
endpoints : list of FlightEndpoint
a list of endpoints where this flight is available.
total_records : int
the total records in this flight, or -1 if unknown
total_bytes : int
the total bytes in this flight, or -1 if unknown
total_records : int, default None
the total records in this flight, -1 or None if unknown.
total_bytes : int, default None
the total bytes in this flight, -1 or None if unknown.
ordered : boolean, default False
Whether endpoints are in the same order as the data.
app_metadata : bytes or str, default ""
Application-defined opaque metadata.
"""
cdef:
shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
Expand All @@ -874,8 +933,10 @@ cdef class FlightInfo(_Weakrefable):
check_flight_status(CreateFlightInfo(c_schema,
descriptor.descriptor,
c_endpoints,
total_records,
total_bytes, &self.info))
total_records if total_records is not None else -1,
total_bytes if total_bytes is not None else -1,
ordered,
tobytes(app_metadata), &self.info))

@property
def total_records(self):
Expand All @@ -887,6 +948,25 @@ cdef class FlightInfo(_Weakrefable):
"""The size in bytes of the data in this flight, or -1 if unknown."""
return self.info.get().total_bytes()

@property
def ordered(self):
"""Whether endpoints are in the same order as the data."""
return self.info.get().ordered()

@property
def app_metadata(self):
"""
Application-defined opaque metadata.
There is no inherent or required relationship between this and the
app_metadata fields in the FlightEndpoints or resulting FlightData
messages. Since this metadata is application-defined, a given
application could define there to be a relationship, but there is
none required by the spec.
"""
return self.info.get().app_metadata()

@property
def schema(self):
"""The schema of the data in this flight."""
Expand Down Expand Up @@ -950,7 +1030,9 @@ cdef class FlightInfo(_Weakrefable):
f"descriptor={self.descriptor} "
f"endpoints={self.endpoints} "
f"total_records={self.total_records} "
f"total_bytes={self.total_bytes}>")
f"total_bytes={self.total_bytes} "
f"ordered={self.ordered} "
f"app_metadata={self.app_metadata}>")


cdef class FlightStreamChunk(_Weakrefable):
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/includes/chrono.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# distutils: language = c++


cdef extern from "<chrono>" namespace "std::chrono::system_clock":
cdef cppclass time_point:
pass
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow_flight.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.chrono cimport time_point


cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
Expand Down Expand Up @@ -134,6 +135,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:

CTicket ticket
vector[CLocation] locations
optional[time_point] expiration_time
c_string app_metadata

bint operator==(CFlightEndpoint)
CResult[c_string] SerializeToString()
Expand All @@ -146,6 +149,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CFlightInfo(CFlightInfo info)
int64_t total_records()
int64_t total_bytes()
c_bool ordered()
c_string app_metadata()
CResult[shared_ptr[CSchema]] GetSchema(CDictionaryMemo* memo)
CFlightDescriptor& descriptor()
const vector[CFlightEndpoint]& endpoints()
Expand Down Expand Up @@ -608,6 +613,8 @@ cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil:
vector[CFlightEndpoint] endpoints,
int64_t total_records,
int64_t total_bytes,
c_bool ordered,
const c_string& app_metadata,
unique_ptr[CFlightInfo]* out)

cdef CStatus CreateSchemaResult" arrow::py::flight::CreateSchemaResult"(
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow_python.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# distutils: language = c++

from pyarrow.includes.chrono cimport time_point
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *

Expand Down Expand Up @@ -244,6 +245,9 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py::internal" nogil:
CTimePoint TimePoint_from_s(double val)
CTimePoint TimePoint_from_ns(int64_t val)

CTimePoint TimePoint_from_system_time(time_point val)
time_point TimePoint_to_system_time(CTimePoint val)

CResult[c_string] TzinfoToString(PyObject* pytzinfo)
CResult[PyObject*] StringToTzinfo(c_string)

Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/src/arrow/python/datetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ inline TimePoint TimePoint_from_ns(int64_t val) {
return TimePoint(TimePoint::duration(val));
}

ARROW_PYTHON_EXPORT
// Note: Needed by FlightEndpoint.expiration_time, which is an OS-dependent
// std::chrono::system_clock::time_point
inline std::chrono::system_clock::time_point TimePoint_to_system_time(TimePoint val) {
return std::chrono::time_point_cast<std::chrono::system_clock::duration>(val);
}

ARROW_PYTHON_EXPORT
// Note: Needed by FlightEndpoint.expiration_time, which is an OS-dependent
// std::chrono::system_clock::time_point
inline TimePoint TimePoint_from_system_time(std::chrono::system_clock::time_point val) {
return std::chrono::time_point_cast<TimePoint::duration>(val);
}

ARROW_PYTHON_EXPORT
inline int64_t PyDelta_to_s(PyDateTime_Delta* pytimedelta) {
return (PyDateTime_DELTA_GET_DAYS(pytimedelta) * 86400LL +
Expand Down
9 changes: 5 additions & 4 deletions python/pyarrow/src/arrow/python/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,12 @@ void PyClientMiddleware::CallCompleted(const Status& call_status) {
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
int64_t total_records, int64_t total_bytes, bool ordered,
const std::string& app_metadata,
std::unique_ptr<arrow::flight::FlightInfo>* out) {
ARROW_ASSIGN_OR_RAISE(auto result,
arrow::flight::FlightInfo::Make(*schema, descriptor, endpoints,
total_records, total_bytes));
ARROW_ASSIGN_OR_RAISE(auto result, arrow::flight::FlightInfo::Make(
*schema, descriptor, endpoints, total_records,
total_bytes, ordered, app_metadata));
*out = std::unique_ptr<arrow::flight::FlightInfo>(
new arrow::flight::FlightInfo(std::move(result)));
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/src/arrow/python/flight.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ ARROW_PYFLIGHT_EXPORT
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
int64_t total_records, int64_t total_bytes, bool ordered,
const std::string& app_metadata,
std::unique_ptr<arrow::flight::FlightInfo>* out);

/// \brief Create a SchemaResult from schema.
Expand Down
Loading

0 comments on commit b193c4f

Please sign in to comment.