I ran into some Spark weirdness when working on some ETL.
Specifically, when repartitioning a parquet file with coalesce()
, the parallelism for the entire job (including upstream tasks) was constrained by the number of coalesce partitions.
Instead, I expected the upstream jobs to use all available cores.
We should be limited by the number of file partitions only when its time to actually write the file.
It’s probably easier if I demonstrate. Below I’ll create a small example dataframe containing 10 rows. I’ll map a slow function over the example dataframe in a few different ways. I’d expect these calculations to take a fixed amount of time, since they’re happening in parallel. However, for one example, execution time will increase linearly with the number of rows.
import time from pyspark.sql.types import LongType path = "~/tmp.parquet"
sc.defaultParallelism
32
def slow_func(ping): """Identity function that takes 1s to return""" time.sleep(1) return(ping)
def timer(func): """Times the execution of a function""" start_time = time.time() func() return time.time() - start_time
# Example usage: timer(lambda: slow_func(10))
1.001082181930542
def create_frame(rdd): return sqlContext.createDataFrame(rdd, schema=LongType())
First, let’s look at a simple RDD. Everything seems to work as expected here. Execution time levels off to ~3.7 as the dataset increases:
map(lambda x: timer(lambda: sc.parallelize(range(x)).map(slow_func).take(x)), range(10))
[0.07758498191833496, 118.664391040802, 2.453991174697876, 2.390385866165161, 2.3567309379577637, 2.3262758255004883, 2.3200111389160156, 3.3115720748901367, 3.3115429878234863, 3.274951934814453]
Let’s create a Spark DataFrame and write the contents to parquet without any modification. Again, things seem to be behaving here. Execution time is fairly flat.
map(lambda x: timer(lambda: create_frame(sc.parallelize(range(x)))\ .coalesce(1).write.mode("overwrite").parquet(path)), range(10))
[5.700469017028809, 1.5091090202331543, 1.4622771739959717, 1.448883056640625, 1.4437789916992188, 1.4351229667663574, 1.4368910789489746, 1.4349958896636963, 1.4199819564819336, 1.4395389556884766]
Now, let’s map the slow function over the DataFrame before saving. This should increase execution time by one second for every dataset. However, it looks like execution time is increasing by one second for each row.
map(lambda x: timer(lambda: create_frame(sc.parallelize(range(x))\ .map(slow_func))\ .coalesce(1).write.mode("overwrite").parquet(path)), range(10))
[1.42529296875, 2.436065912246704, 3.3423829078674316, 4.332568883895874, 5.268526077270508, 6.280202865600586, 7.169728994369507, 8.18229603767395, 9.098582029342651, 10.119444131851196]
Using repartition
instead of coalesce
fixes the issue.
map(lambda x: timer(lambda: create_frame(sc.parallelize(range(x))\ .map(slow_func))\ .repartition(1).write.mode("overwrite").parquet(path)), range(10))
[0.8304200172424316, 1.276075839996338, 1.2515549659729004, 1.2429919242858887, 1.2587580680847168, 1.2490499019622803, 1.6439399719238281, 1.229665994644165, 1.2340660095214844, 1.2454640865325928]
sc.cancelAllJobs()