Marsellino Prawiro Halim
05/02/2024, 2:35 AM!pip3 install -U statsforecast==1.7.4
!pip3 install -U ray==2.20.0
!pip3 install -U dask[dataframe]==2023.8.1
import os
os.environ['NIXTLA_ID_AS_COL'] = '1'
from statsforecast.core import StatsForecast
from statsforecast.models import (
AutoARIMA,
AutoETS,
)
from statsforecast.utils import generate_series
from sklearn.preprocessing import RobustScaler
max_orders = 30
FIT_SETTINGS = dict(
d=None,
D=None,
max_order=max_orders,
max_p=max_orders,
max_d=max_orders,
max_q=max_orders,
max_P=max_orders,
max_D=max_orders,
max_Q=max_orders,
start_p=2,
start_q=2,
start_P=2,
start_Q=2,
test="kpss",
stepwise=True,
method='lbfgs',
seasonal=True,
)
n_series = 1000
horizon = 180
models = [AutoARIMA(season_length=7, **FIT_SETTINGS)]
series = generate_series(n_series, min_length=365*3, max_length=365*3)
sf = StatsForecast(
verbose=True,
models=models,
freq='D',
n_jobs=-1
)
# IT RUNNING SMOOTHLY
p_statsforecast = sf.forecast(df=series, h=horizon)
import ray
import logging
ray.init(logging_level=logging.ERROR)
series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)
ctx = ray.data.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
ray_series = ray.data.from_pandas(series).repartition(150)
#IT CAN'T BE RUN
p = sf.forecast(df=ray_series, h=horizon)
José Morales
05/02/2024, 5:19 PMMarsellino Prawiro Halim
05/03/2024, 12:17 PMMarsellino Prawiro Halim
05/04/2024, 2:30 PMfrom statsforecast.core import StatsForecast
from statsforecast.models import (
AutoARIMA,
AutoETS,
)
from statsforecast.utils import generate_series
n_series = 4
horizon = 7
series = generate_series(n_series)
sf = StatsForecast(
models=[AutoETS(season_length=7)],
freq='D',
)
sf.forecast(df=series, h=horizon).head()
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('forecast') \
.config('spark.ui.port', 'xxxx') \
.config('spark.executor.memory', 'xg') \
.config('spark.driver.memory', 'xxg') \
.master('local[*]')\
.enableHiveSupport() \
.getOrCreate()
# Make unique_id a column
series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)
# Convert to Spark
sdf = spark.createDataFrame(series)
# Returns a Spark DataFrame
sf.forecast(df=sdf, h=horizon, level=[90]).show(5)
When I ran the code, I retrieved an error like in the attachment. Am I doing something wrong? What can I do so it can be run on the Spark Environment?José Morales
05/06/2024, 5:10 PMMarsellino Prawiro Halim
05/08/2024, 3:02 AM