This message was deleted.
# statsforecast
s
This message was deleted.
j
Hey. Thanks for using statsforecast. • I believe the df is repartitioned again after you pass it to the forecast method, so I think the partitioning wouldn't make a difference, this is something I'm going to work on this week. • The
n_jobs
controlls the parallelism within each partition, but since I think each partition holds a single serie it wouldn't make a difference (I'll also verify this). • There's an
nmodels
argument that controls how many combinations are tried, so that's an easy one to limit the search space.
Hey. I looked into this today and I think the number of partitions is preserved, but each partition has
n_jobs=1
hardcoded. How many partitions did you have when you were using 320 CPUs?
a
Hi @José Morales, are you talking about the default partition config in spark, or the DF partition after reading? I don't know if the data is distributed differently later in the code, but at the time of reading it has only one partition (because it's read from a single file, written with pandas)
and 162 if using repartition method on TS id
Copy code
df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/df.parquet").repartition("unique_id")
df.rdd.getNumPartitions() # 162
and 200 by default?
spark.conf.get("spark.sql.shuffle.partitions") # 200
Oh, by forcing a 320 repartition, I've increased CPU utilisation from 30 to 60%!
j
Hey. I meant the DF partitioning, so exactly what you did is what I expect would help reducing the time. How many CPUs does each executor have? I'm not sure if two tasks can run concurrently on the same executor
a
Hello José, I don't find the precise answer, but during the ARIMA fit, I can see that the maximum number of task is 200 (20/32 per executor = compute workers on databricks)
j
I think it's doing a shuffle first and then the training, so that's probably the shuffle partitions. Can you increase them to a multiple of your cores? e.g.
spark.conf.set("spark.sql.shuffle.partitions", 640)
a
It seems to work, up to 98% cpu usage with this settings:
Copy code
num_worker_cores = 320
spark.conf.set("spark.sql.shuffle.partitions", num_worker_cores*4)
spark.conf.set("spark.sql.adaptive.enabled", "false")
I don't know is the last one is usefull, and if x4 multiplier is ok, but anyway
j
Nice!
🙌 1
a
I retested,
spark.conf.set("spark.sql.shuffle.partitions", num_worker_cores)
is enough, the problem was simply to overcome the default limit of 200. Thanks again for your help @José Morales!
🙌 1