Slackbot
02/06/2024, 10:30 PMJosé Morales
02/06/2024, 11:04 PMfrom fugue import transform
from triad import Schema
from mlforecast.lag_transforms import ExpandingMean
from mlforecast.feature_engineering import transform_exog
# compute locally to get schema
params = {
'lags': [1, 2],
'lag_transforms': {1: [ExpandingMean()]},
'num_threads': 2,
}
transformed = transform_exog(prices.head(), **params)
# fugue wrapper
def transform_exog_wrapper(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
return transform_exog(df, **kwargs)
# distributed version
transformed_ddf = transform(
ddf,
transform_exog_wrapper,
schema=Schema(transformed),
params=params,
)
transformed_ddf
Mike C
02/06/2024, 11:34 PM