There are various elements in a PySpark program’s efficiency. PySpark helps numerous profiling instruments to show tight loops of your program and mean you can make efficiency enchancment choices, see extra. Nonetheless, reminiscence, as one of many key elements of a program’s efficiency, had been lacking in PySpark profiling. A PySpark program on the Spark driver could be profiled with Reminiscence Profiler as a standard Python course of, however there was not a simple technique to profile reminiscence on Spark executors.
PySpark UDFs, probably the most common Python APIs, are executed by Python employee subprocesses spawned by Spark executors. They’re highly effective as a result of they allow customers to run customized code on prime of the Apache Spark™ engine. Nonetheless, it’s tough to optimize UDFs with out understanding reminiscence consumption. To assist optimize PySpark UDFs and cut back the chance of out-of-memory errors, the PySpark reminiscence profiler supplies details about whole reminiscence utilization. It pinpoints which strains of code in a UDF attribute to essentially the most reminiscence utilization.
Implementing reminiscence profiling on executors is difficult. As a result of executors are distributed on the cluster, consequence reminiscence profiles should be collected from every executor and aggregated correctly to point out the overall reminiscence utilization. In the meantime, a mapping between the reminiscence consumption and every supply code line must be supplied for debugging and pruning functions. In Databricks Runtime 12.0, PySpark overcame all these technical difficulties, and reminiscence profiling was enabled on executors. On this weblog, we offer an summary of user-defined features (UDFs) and show tips on how to use the reminiscence profiler with UDFs.
Consumer-defined Features(UDFs) overview
There are two major classes of UDFs supported in PySpark: Python UDFs and Pandas UDFs.
- Python UDFs are user-defined scalar features that take/return Python objects serialized/deserialized by Pickle and function one-row-at-a-time
- Pandas UDFs (a.okay.a. Vectorized UDFs) are UDFs that take/return pandas Collection or DataFrame serialized/deserialized by Apache Arrow and function block by block. Pandas UDFs have some variations categorized by utilization, with particular enter and output sorts:
Collection to Collection
,Collection to Scalar
, andIterator to Iterator
.
Based mostly on Pandas UDFs implementation, there are additionally Pandas Perform APIs: Map (i.e., mapInPandas
) and (Co)Grouped Map (i.e., applyInPandas
), in addition to an Arrow Perform API – mapInArrow
. The reminiscence profiler applies to all UDF sorts talked about above until the perform takes in/outputs an iterator.
Allow Reminiscence Profiling
To allow reminiscence profiling on a cluster, we should always set up the Reminiscence Profiler library and set the Spark config “spark.python.profile.reminiscence
” to “true
” as proven beneath.
- Set up the Reminiscence Profiler library on the cluster.
- Allow the “
spark.python.profile.reminiscence
” Spark configuration.
Then, we are able to profile the reminiscence of a UDF. We are going to illustrate the reminiscence profiler with GroupedData.applyInPandas
.
Firstly, a PySpark DataFrame with 4,000,000 rows is generated, as proven beneath. Later, we are going to group by the id column, which ends up in 4 teams with 1,000,000 rows per group.
sdf = spark.vary(0, 4 * 1000000).withColumn(
'id', (col('id') % 4).solid('integer')
).withColumn('v', rand())
Then a perform arith_op
is outlined and utilized to sdf
as proven beneath.
def arith_op(pdf: pd.DataFrame) -> pd.DataFrame:
new_v = []
for x in pdf.v:
new_v.append(x * 10 + 1)
pdf.v = pd.Collection(new_v)
return pdf
res = sdf.groupby("id").applyInPandas(arith_op, schema=sdf.schema)
res.acquire()
Executing the code above and working sc.show_profiles()
prints the next consequence profile. The consequence profile can be dumped to disk by sc.dump_profiles(path)
.
The UDF id within the above consequence profile, 245
, matches that within the following Spark plan for res
which could be proven by calling res.clarify()
.
== Bodily Plan ==
...
FlatMapGroupsInPandas [...], arith_op(...)#245, [...]
Within the physique of the consequence profile of sc.show_profiles()
, the column heading contains
Line #
, line variety of the code that has been profiled,Mem utilization
, the reminiscence utilization of the Python interpreter after that line has been executedIncrement
, the distinction in reminiscence of the present line with respect to the final oneOccurrences
, the variety of instances this line has been executedLine Contents
, the code that has been profiled
We will inform from the consequence profile that Line 3 ("for x in pdf.v")
consumes essentially the most reminiscence: ~125 MiB;
and the overall reminiscence utilization of the perform is ~185 MiB
.
We will optimize the perform to be extra memory-efficient by eradicating the iteration of pdf.v
as proven beneath.
def optimized_arith_op(pdf: pd.DataFrame) -> pd.DataFrame:
pdf.v = pdf.v * 10 + 1
return pdf
res = sdf.groupby("id").applyInPandas(optimized_arith_op, schema=sdf.schema)
res.acquire()
The up to date consequence profile is as proven beneath.
The full reminiscence utilization for the optimized_arith_op
is diminished to ~61 MiB
which makes use of 2x much less reminiscence.
The instance above demonstrates how the reminiscence profiler helps deeply perceive the reminiscence consumption of the UDF, determine the reminiscence bottleneck, and make the perform extra memory-efficient.
Conclusion
PySpark reminiscence profiler is carried out based mostly on Reminiscence Profiler. Spark Accumulators additionally play an necessary function when accumulating consequence profiles from Python staff. The reminiscence profiler calculates the overall reminiscence utilization of a UDF and pinpoints which strains of code attribute to essentially the most reminiscence utilization. It’s straightforward to make use of and out there ranging from Databricks Runtime 12.0.
As well as, we now have open sourced PySpark reminiscence profiler to the Apache Spark™ neighborhood. The reminiscence profiler will likely be out there ranging from Spark 3.4; see SPARK-40281 for extra data.