This message was deleted.
# mlforecast
s
This message was deleted.
j
Hey. We don't have it implemented but it should be straightforward to do, here's an example:
Copy code
from fugue import transform
from triad import Schema

from mlforecast.lag_transforms import ExpandingMean
from mlforecast.feature_engineering import transform_exog

# compute locally to get schema
params = {
    'lags': [1, 2],
    'lag_transforms': {1: [ExpandingMean()]},
    'num_threads': 2,
}
transformed = transform_exog(prices.head(), **params)

# fugue wrapper
def transform_exog_wrapper(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
    return transform_exog(df, **kwargs)

# distributed version
transformed_ddf = transform(
    ddf,
    transform_exog_wrapper,
    schema=Schema(transformed),
    params=params,
)
transformed_ddf
💯 1
m
Awesome, thank you!
j
Btw the same partitioning setup is required for this to work properly, you need to make sure that each serie is within a single partition. You can refer to this for spark and this for dask
👍 1