Antoine SCHWARTZ -CROIX-
11/20/2023, 5:44 PMAutoETS
run).
prediction_intervals = ConformalIntervals(h=forecast_horizon, n_windows=n_windows_conformal)
df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/df.parquet")
futr_df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/futr_df.parquet")
sf = StatsForecast(
models=[eval(F"{algo}(season_length=season_length, alias=algo, prediction_intervals=prediction_intervals)")],
freq=freq,
fallback_model=SeasonalNaive(season_length=season_length),
#n_jobs=1,
#verbose=True
)
predictions = sf.forecast(
h=forecast_horizon,
df=df,
X_df=futr_df,
level=[50, 80, 90, 95, 99],
prediction_intervals=prediction_intervals,
).toPandas()
However, for AutoARIMA
the computation time was still too high for me, so I dug a little deeper into the logs, and found that only 30% of CPUs were used on average during the run.
• I've tried tweaking the input spark dfs repartition, and modifying a few spark confs, but nothing changes. Do you have any ideas?
• Is the n_jobs
parameter used when the detected backend is spark?
• Would you advise me to constrain ARIMA's search fields a little to save a bit of computing time? If so, which parameters should be edited first?
Thanks in advance!José Morales
11/21/2023, 11:34 PMn_jobs
controlls the parallelism within each partition, but since I think each partition holds a single serie it wouldn't make a difference (I'll also verify this).
• There's an nmodels
argument that controls how many combinations are tried, so that's an easy one to limit the search space.n_jobs=1
hardcoded. How many partitions did you have when you were using 320 CPUs?Antoine SCHWARTZ -CROIX-
11/23/2023, 9:48 AMdf = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/df.parquet").repartition("unique_id")
df.rdd.getNumPartitions() # 162
spark.conf.get("spark.sql.shuffle.partitions") # 200
José Morales
11/23/2023, 3:39 PMAntoine SCHWARTZ -CROIX-
11/24/2023, 10:09 AMJosé Morales
11/24/2023, 4:03 PMspark.conf.set("spark.sql.shuffle.partitions", 640)
Antoine SCHWARTZ -CROIX-
11/24/2023, 6:30 PMnum_worker_cores = 320
spark.conf.set("spark.sql.shuffle.partitions", num_worker_cores*4)
spark.conf.set("spark.sql.adaptive.enabled", "false")
José Morales
11/24/2023, 6:54 PM