Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone #19607

Closed
wants to merge 29 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Oct 30, 2017

What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using toPandas() or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use "America/Los_Angeles" as session timezone and have a timestamp value "1970-01-01 00:00:01" in the timezone. Btw, I'm in Japan so Python timezone would be "Asia/Tokyo".

The timestamp value from current toPandas() will be the following:

>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01

As you can see, the value becomes "1970-01-01 17:00:01" because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be "1970-01-01 00:00:01".

How was this patch tested?

Added tests and existing tests.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83205 has finished for PR 19607 at commit 5c08ecf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Oct 30, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83207 has finished for PR 19607 at commit 5c08ecf.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83212 has finished for PR 19607 at commit 5c08ecf.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83211 has finished for PR 19607 at commit e28fc87.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2017

Test build #83280 has finished for PR 19607 at commit ee1a1c8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2017

Test build #83286 has finished for PR 19607 at commit b1436b8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Nov 1, 2017

@BryanCutler I'm fixing the behavior of toPandas() and pandas udfs as we discussed in #18664 but I guess we still need to support old Pandas as well.
I tried to find out a workaround for old Pandas, but I haven't done yet.
Do you have any ideas for the workaround? cc @wesm @HyukjinKwon @viirya

@SparkQA
Copy link

SparkQA commented Nov 1, 2017

Test build #83295 has finished for PR 19607 at commit 6872516.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

Hi @ueshin , what is the oldest version of Pandas that's required to support and what exactly wasn't working with it?

@ueshin
Copy link
Member Author

ueshin commented Nov 2, 2017

@BryanCutler I guess the oldest version of Pandas is 0.13.0 currently according to #18403, cc @HyukjinKwon.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 2, 2017

Yea, that was my proposal. If anything is blocked by this, I think we can consider bumping it up as an option because, IMHO, technically the fixed version specification is not yet released and published.

^ cc @cloud-fan, @srowen and @viirya

@HyukjinKwon
Copy link
Member

I tried to find out a workaround for old Pandas, but I haven't done yet.

I haven't looked at this closely yet but will definitely try to take a look and help soon together. I would appreciate it if the problem (or just symptoms, or just a pointer ..) can be given though if it is not too complex.

@ueshin ueshin changed the title [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone [WIP][SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone Nov 2, 2017
@SparkQA
Copy link

SparkQA commented Nov 2, 2017

Test build #83320 has finished for PR 19607 at commit 1f096bf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@ueshin
Copy link
Member Author

ueshin commented Nov 2, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 2, 2017

Test build #83324 has finished for PR 19607 at commit 1f096bf.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1629,35 +1629,121 @@ def to_arrow_type(dt):
return arrow_type


def _check_dataframe_localize_timestamps(pdf):
def to_arrow_schema(schema):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, currently it isn't used. I'll remove it for now.

return s.dt.tz_convert('UTC')
else:
return s
except ImportError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should bump up pandas version if we can't find a workaround.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me look into it a little more and summarize what version we can support.

@@ -948,6 +948,14 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we clean up the code more if we don't have this config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll try it.

@ueshin ueshin changed the title [WIP][SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone Nov 22, 2017
buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
.internal()
.doc("When true, make Pandas DataFrame with timestamp type respecting session local " +
"timezone when converting to/from Pandas DataFrame.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emphasize the conf will be deprecated?

When true, make Pandas DataFrame with timestamp type respecting session local timezone when converting to/from Pandas DataFrame. This configuration will be deprecated in the future releases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll update it.

@@ -201,7 +201,7 @@ def _supports_symlinks():
extras_require={
'ml': ['numpy>=1.7'],
'mllib': ['numpy>=1.7'],
'sql': ['pandas>=0.13.0']
'sql': ['pandas>=0.19.2']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this requirement and behavior changes in Migration Guide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add it.

@SparkQA
Copy link

SparkQA commented Nov 27, 2017

Test build #84205 has finished for PR 19607 at commit 40a9735.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class VectorIndexer(JavaEstimator, HasInputCol, HasOutputCol, HasHandleInvalid, JavaMLReadable,
  • class _ImageSchema(object):
  • raise RuntimeError(\"Creating instance of _ImageSchema class is disallowed.\")

@SparkQA
Copy link

SparkQA commented Nov 27, 2017

Test build #84204 has finished for PR 19607 at commit f92eae3.

  • This patch fails due to an unknown error code, -9.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me.

elif is_datetime64tz_dtype(s.dtype):
return s.dt.tz_convert('UTC')
else:
return s


def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe from_timezone .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll update it. Maybe toTimestamp -> to_timestamp as well.

from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype
except ImportError as e:
raise ImportError(_old_pandas_exception_message(e))
fromTz = fromTimezone or 'tzlocal()'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it.


self.assertNotEqual(result_ny, result_la)

result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comments here would be helpful .. BTW, to be clear, this 3 hours timedelta is from America/Los_Angeles and America/New_York time difference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the 3 hours timedelta is the time difference.
I'll add some comments.

df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \
.withColumn("internal_value", internal_value(col("timestamp")))
result_la = df_la.select(col("idx"), col("internal_value")).collect()
diff = 3 * 60 * 60 * 1000 * 1000 * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too. it took me a while to check where this 3 came from ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add some comments.

s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
if not copied and s is not pdf[field.name]:
pdf = pdf.copy()
copied = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if I ask why we should copy here? Probably, some comments explaining it would be helpful. To be clear, Is it to prevent the original Pandas DataFrame being updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's to prevent the original one from being updated.
I'll add some comments.

@SparkQA
Copy link

SparkQA commented Nov 27, 2017

Test build #84210 has finished for PR 19607 at commit 40a9735.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class VectorIndexer(JavaEstimator, HasInputCol, HasOutputCol, HasHandleInvalid, JavaMLReadable,
  • class _ImageSchema(object):
  • raise RuntimeError(\"Creating instance of _ImageSchema class is disallowed.\")

@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
record_type_list.append((str(col_names[i]), curr_type))
return np.dtype(record_type_list) if has_rec_fix else None

def _convert_from_pandas(self, pdf):
def _convert_from_pandas(self, pdf, schema, timezone):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea not blocking this PR. Probably, we have enough codes to make a separate Python file / class to put Pandas / Arrow stuff into one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree with it but maybe I'll leave those as they are in this pr.

@HyukjinKwon
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84242 has finished for PR 19607 at commit 9200f38.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

felixcheung commented Nov 28, 2017

I guess we should look at R to see if it should behave similarly? WDYT @HyukjinKwon ?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 28, 2017

Yup, I think we should take a look between POSIXct / POSIXlt in R and timestamp within Spark too. Seems not respecting the session timezone in a quick look.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

Let's fix the R timestamp issue in a new ticket.

@HyukjinKwon
Copy link
Member

Yup, I was testing and trying to produce details. Let me describe this in the JIRA, not here :D.

@asfgit asfgit closed this in 64817c4 Nov 28, 2017
@HyukjinKwon
Copy link
Member

Sorry, but does anyone remember how we are going to deal with df.collect() in PySpark? R fix should be more like df.collect(). It should be good to file a JIRA for df.collect() in PySpark too while we are here, if I haven't missed some discussion about it.

Filed for R anyway - https://issues.apache.org/jira/browse/SPARK-22632.

@ueshin
Copy link
Member Author

ueshin commented Nov 28, 2017

Unfortunately, df.collect() is out of scope of this pr. Its timestamp values will respect python system timezone.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 28, 2017

Yup it should be separate. I meant to file another JIRA while we are here if it is something we need to fix before forgetting. If df.collect() is not meant to be fixed, I think I should reread the discussion and maybe resolve the R JIRA.

@ueshin
Copy link
Member Author

ueshin commented Nov 28, 2017

Maybe we need at least 2 external libraries like dateutil and pytz to handle timezones properly, which Pandas uses, but I have no idea to handle timezones out of Pandas properly for now.

@HyukjinKwon
Copy link
Member

Hm .. I see. but is this something we should fix though ideally? I am asking this because I am checking R related codes now ..

@ueshin
Copy link
Member Author

ueshin commented Nov 28, 2017

Ah, yes, I think so.

@HyukjinKwon
Copy link
Member

Thanks. I was just worried if I missed any discussion somewhere and wanted to double check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants