Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[DNM] update sparklog #952

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions tools/sparklog.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
" self.dfacc=self.dfacc.select(\"queryid\",(F.col(\"col\")[0]).alias(\"ID\"),(F.col(\"col\")[1]).alias(\"Update\")).join(amsdf,on=[\"ID\"])\n",
" \n",
" if self.queryplans is not None:\n",
" self.metricscollect=[l for l in self.allmetrics if l[1] in ['nsTiming','timing'] and (l[2].startswith(\"totaltime_\") or l[2].startswith(\"scan time\") or l[2].startswith(\"shuffle write time\")) and l[2] not in(\"totaltime_collectbatch\") ]\n",
" self.metricscollect=[l for l in self.allmetrics if l[1] in ['nsTiming','timing'] and (l[2].startswith(\"totaltime_\") or l[2].startswith(\"scan time\") or l[2].startswith(\"shuffle write time\") or l[2].startswith(\"shuffle spill time\")) and l[2] not in(\"totaltime_collectbatch\") ]\n",
" \n",
" #config=df.where(\"event='SparkListenerJobStart' and Properties.`spark.executor.cores` is not null\").select(\"Properties.*\").limit(1).collect()\n",
" config=df.select(\"`Spark Properties`.*\").where(\"`spark.app.id` is not null\").limit(1).collect()\n",
Expand Down Expand Up @@ -1638,7 +1638,7 @@
"\n",
" value=accid2stageid[m[\"accumulatorId\"]][1]\n",
" stdev_value=accid2stageid[m[\"accumulatorId\"]][2]\n",
" stdev_value=0 if stdev_value is null else stdev_value\n",
" stdev_value=0 if stdev_value is None else stdev_value\n",
" if m[\"metricType\"] in ['nsTiming','timing']:\n",
" totaltime=value/1000 if m[\"metricType\"] == 'timing' else value/1000000000\n",
" stdev_value=stdev_value/1000 if m[\"metricType\"] == 'timing' else stdev_value/1000000000\n",
Expand Down Expand Up @@ -1970,7 +1970,7 @@
" exchangedf=self.get_metrics_by_node(\"ColumnarExchange\")\n",
" exchangedf.cache()\n",
" exchangedf.count()\n",
" mapdf=exchangedf.where(\"totaltime_split is not null\").select(\"nodeID\",F.col(\"Stage ID\").alias(\"map_stageid\"),\"real_queryid\",F.floor(F.col(\"totaltime_split\")/F.col(\"totaltime_split_mean\")).alias(\"map_partnum\"),\"totaltime_compress\",\"totaltime_computepid\",\"totaltime_split\",\"shuffle write time\",'shuffle records written','data size','shuffle bytes written','shuffle bytes written_mean','shuffle bytes written_stddev','shuffle bytes spilled','number of input rows')\n",
" mapdf=exchangedf.where(\"totaltime_split is not null\").select(\"nodeID\",F.col(\"Stage ID\").alias(\"map_stageid\"),\"real_queryid\",F.floor(F.col(\"totaltime_split\")/F.col(\"totaltime_split_mean\")).alias(\"map_partnum\"),\"totaltime_compress\",\"totaltime_computepid\",\"totaltime_split\",\"shuffle write time\",\"shuffle spill time\",'shuffle records written','data size','shuffle bytes written','shuffle bytes written_mean','shuffle bytes written_stddev','shuffle bytes spilled','number of input rows')\n",
" reducerdf=exchangedf.where(\"totaltime_split is null\").select(\"nodeID\",F.col(\"Stage ID\").alias(\"reducer_stageid\"),\"real_queryid\",'local blocks read','local bytes read',F.floor(F.col(\"records read\")/F.col(\"records read_mean\")).alias(\"reducer_partnum\"),(F.col('avg read batch num rows')/10).alias(\"avg read batch num rows\"),'remote bytes read','records read','remote blocks read',(F.col(\"number of output rows\")/F.col(\"records read\")).alias(\"avg rows per split recordbatch\"))\n",
" shuffledf=mapdf.join(reducerdf,on=[\"nodeID\",\"real_queryid\"],how=\"full\")\n",
" if queryid is not None:\n",
Expand All @@ -1989,7 +1989,7 @@
" ax=shuffle_pdf.plot(y=[\"shuffle bytes written\",\"compress_ratio\"],secondary_y=[\"compress_ratio\"],figsize=(30,8),style=\"-*\",title=\"compress ratio\")\n",
" self.print_real_queryid(ax,shuffle_pdf)\n",
" shufflewritepdf=shuffle_pdf\n",
" ax=shufflewritepdf.plot.bar(y=[\"shuffle write time\",\"totaltime_compress\",\"totaltime_split\",\"totaltime_computepid\"],stacked=True,figsize=(30,8),title=\"split time + shuffle write time vs. shuffle bytes written\")\n",
" ax=shufflewritepdf.plot.bar(y=[\"shuffle write time\",\"shuffle spill time\",\"totaltime_compress\",\"totaltime_split\",\"totaltime_computepid\"],stacked=True,figsize=(30,8),title=\"split time + shuffle write time vs. shuffle bytes written\")\n",
" ax=shufflewritepdf.plot(ax=ax,y=[\"shuffle bytes written\"],secondary_y=[\"shuffle bytes written\"],style=\"-*\")\n",
" self.print_real_queryid(ax,shufflewritepdf)\n",
" \n",
Expand Down