From 803c0e252fc78a424a181a34a93e689fa9aaaa09 Mon Sep 17 00:00:00 2001 From: dclandau <94046012+dclandau@users.noreply.github.com> Date: Wed, 3 Aug 2022 07:06:26 +0100 Subject: [PATCH] Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469) * Fix BaseSQLToGCSOperator approx_max_file_size_bytes When using the parquet file_format, using `tmp_file_handle.tell()` always points to the beginning of the file after the data has been saved and therefore is not a good indicator for the files current size. Save the current file pointer position and set the file pointer position to `os.SEEK_END`. file_size is set to the new position, and the file pointer's position goes back to the saved position. Currently, after a parquet write operation the pointer is set to 0, and therefore, simply executing `tmp_file_handle.tell()` is not sufficient to determine the current size. This sequence is added to allow file splitting when the export format is set to parquet. --- airflow/providers/google/cloud/transfers/sql_to_gcs.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index c2044790242b0..bfee9dd1d2930 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -198,6 +198,8 @@ def _write_local_data_files(self, cursor): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ + import os + org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) schema = [column for column in org_schema if column not in self.exclude_columns] @@ -250,7 +252,12 @@ def _write_local_data_files(self, cursor): tmp_file_handle.write(b'\n') # Stop if the file exceeds the file size limit. - if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: + fppos = tmp_file_handle.tell() + tmp_file_handle.seek(0, os.SEEK_END) + file_size = tmp_file_handle.tell() + tmp_file_handle.seek(fppos, os.SEEK_SET) + + if file_size >= self.approx_max_file_size_bytes: file_no += 1 if self.export_format == 'parquet':