https://github.com/nixtla logo
#statsforecast
Title
# statsforecast
a

Antoine SCHWARTZ -CROIX-

11/20/2023, 5:44 PM
Hello Nixtla community! I have a few questions regarding distributed compute over Spark for statsforecasts models. Indeed, I have 30k series to forecast, and by staying on a single machine (even a big one), it takes too long to calculate, especially for "Auto" methods. So, I launched a large cluster of 320 cores on databricks, and the code below works very well (for example, I go from more than 2 hours to less than 15 minutes for an
AutoETS
run).
Copy code
prediction_intervals = ConformalIntervals(h=forecast_horizon, n_windows=n_windows_conformal)

df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/df.parquet")
futr_df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/futr_df.parquet")

sf = StatsForecast(
    models=[eval(F"{algo}(season_length=season_length, alias=algo, prediction_intervals=prediction_intervals)")],
    freq=freq,
    fallback_model=SeasonalNaive(season_length=season_length),
    #n_jobs=1,
    #verbose=True
)

predictions = sf.forecast(
    h=forecast_horizon,
    df=df, 
    X_df=futr_df,
    level=[50, 80, 90, 95, 99],
    prediction_intervals=prediction_intervals,
).toPandas()
However, for
AutoARIMA
the computation time was still too high for me, so I dug a little deeper into the logs, and found that only 30% of CPUs were used on average during the run. • I've tried tweaking the input spark dfs repartition, and modifying a few spark confs, but nothing changes. Do you have any ideas? • Is the
n_jobs
parameter used when the detected backend is spark? • Would you advise me to constrain ARIMA's search fields a little to save a bit of computing time? If so, which parameters should be edited first? Thanks in advance!
j

José Morales

11/21/2023, 11:34 PM
Hey. Thanks for using statsforecast. • I believe the df is repartitioned again after you pass it to the forecast method, so I think the partitioning wouldn't make a difference, this is something I'm going to work on this week. • The
n_jobs
controlls the parallelism within each partition, but since I think each partition holds a single serie it wouldn't make a difference (I'll also verify this). • There's an
nmodels
argument that controls how many combinations are tried, so that's an easy one to limit the search space.
Hey. I looked into this today and I think the number of partitions is preserved, but each partition has
n_jobs=1
hardcoded. How many partitions did you have when you were using 320 CPUs?
a

Antoine SCHWARTZ -CROIX-

11/23/2023, 9:48 AM
Hi @José Morales, are you talking about the default partition config in spark, or the DF partition after reading? I don't know if the data is distributed differently later in the code, but at the time of reading it has only one partition (because it's read from a single file, written with pandas)
and 162 if using repartition method on TS id
Copy code
df = spark.read.parquet(f"{base_s3_dir_path}/{cutoff}/{input_dfs_dir_name}/df.parquet").repartition("unique_id")
df.rdd.getNumPartitions() # 162
and 200 by default?
spark.conf.get("spark.sql.shuffle.partitions") # 200
Oh, by forcing a 320 repartition, I've increased CPU utilisation from 30 to 60%!
j

José Morales

11/23/2023, 3:39 PM
Hey. I meant the DF partitioning, so exactly what you did is what I expect would help reducing the time. How many CPUs does each executor have? I'm not sure if two tasks can run concurrently on the same executor
a

Antoine SCHWARTZ -CROIX-

11/24/2023, 10:09 AM
Hello José, I don't find the precise answer, but during the ARIMA fit, I can see that the maximum number of task is 200 (20/32 per executor = compute workers on databricks)
j

José Morales

11/24/2023, 4:03 PM
I think it's doing a shuffle first and then the training, so that's probably the shuffle partitions. Can you increase them to a multiple of your cores? e.g.
spark.conf.set("spark.sql.shuffle.partitions", 640)
a

Antoine SCHWARTZ -CROIX-

11/24/2023, 6:30 PM
It seems to work, up to 98% cpu usage with this settings:
Copy code
num_worker_cores = 320
spark.conf.set("spark.sql.shuffle.partitions", num_worker_cores*4)
spark.conf.set("spark.sql.adaptive.enabled", "false")
I don't know is the last one is usefull, and if x4 multiplier is ok, but anyway
j

José Morales

11/24/2023, 6:54 PM
Nice!
🙌 1
2 Views