Skip to content

standard_scale

StandardScaleEstimator ¤

StandardScaleEstimator(
    inputCol=None,
    outputCol=None,
    inputDtype=None,
    outputDtype=None,
    layerName=None,
    maskValue=None,
)

Bases: BaseEstimator, SingleInputSingleOutputParams, MaskValueParams

Standard scaler estimator for use in Spark pipelines. This estimator is used to calculate the mean and standard deviation of the input feature column. When fit is called it returns a StandardScaleTransformer which can be used to standardize/transform additional features.

WARNING: If the input is an array, we assume that the array has a constant shape across all rows.

Initializes a StandardScaleEstimator estimator. Sets all parameters to given inputs.

Parameters:

Name Type Description Default
inputCol Optional[str]

Input column name to standardize.

None
outputCol Optional[str]

Output column name.

None
inputDtype Optional[str]

Input data type to cast input column to before transforming.

None
outputDtype Optional[str]

Output data type to cast the output column to after transforming.

None
layerName Optional[str]

Name of the layer. Used as the name of the tensorflow layer in the keras model. If not set, we use the uid of the Spark transformer.

None

Returns:

Type Description
None

None - class instantiated.

Source code in src/kamae/spark/estimators/standard_scale.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@keyword_only
def __init__(
    self,
    inputCol: Optional[str] = None,
    outputCol: Optional[str] = None,
    inputDtype: Optional[str] = None,
    outputDtype: Optional[str] = None,
    layerName: Optional[str] = None,
    maskValue: Optional[float] = None,
) -> None:
    """
    Initializes a StandardScaleEstimator estimator.
    Sets all parameters to given inputs.

    :param inputCol: Input column name to standardize.
    :param outputCol: Output column name.
    :param inputDtype: Input data type to cast input column to before
    transforming.
    :param outputDtype: Output data type to cast the output column to after
    transforming.
    :param layerName: Name of the layer. Used as the name of the tensorflow layer
     in the keras model. If not set, we use the uid of the Spark transformer.
    :returns: None - class instantiated.
    """
    super().__init__()
    self._setDefault(maskValue=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)

compatible_dtypes property ¤

compatible_dtypes

List of compatible data types for the layer. If the computation can be performed on any data type, return None.

Returns:

Type Description
Optional[List[DataType]]

List of compatible data types for the layer.

_fit ¤

_fit(dataset)

Fits the StandardScaleEstimator estimator to the given dataset. Calculates the mean and standard deviation of the input feature column and returns a StandardScaleTransformer with the mean and standard deviation set.

Parameters:

Name Type Description Default
dataset DataFrame

Pyspark dataframe to fit the estimator to.

required

Returns:

Type Description
StandardScaleTransformer

StandardScaleTransformer instance with mean & standard deviation set.

Source code in src/kamae/spark/estimators/standard_scale.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def _fit(self, dataset: DataFrame) -> "StandardScaleTransformer":
    """
    Fits the StandardScaleEstimator estimator to the given dataset.
    Calculates the mean and standard deviation of the input feature column and
    returns a StandardScaleTransformer with the mean and standard deviation set.

    :param dataset: Pyspark dataframe to fit the estimator to.
    :returns: StandardScaleTransformer instance with mean & standard deviation set.
    """
    input_column_type = self.get_column_datatype(dataset, self.getInputCol())
    if not isinstance(input_column_type, ArrayType):
        input_col = F.array(F.col(self.getInputCol()))
        input_column_type = ArrayType(input_column_type)
    else:
        input_col = F.col(self.getInputCol())

    # Collect a single row to driver and get the length.
    # We assume all subsequent rows have the same length.
    array_size = np.array((dataset.select(input_col).first()[0])).shape[-1]

    element_struct = construct_nested_elements_for_scaling(
        column=input_col,
        column_datatype=input_column_type,
        array_dim=array_size,
    )

    mean_cols = [
        F.mean(
            F.when(
                F.col(f"element_struct.element_{i}") == F.lit(self.getMaskValue()),
                F.lit(None),
            ).otherwise(F.col(f"element_struct.element_{i}"))
        ).alias(f"mean_{i}")
        for i in range(1, array_size + 1)
    ]

    stddev_cols = [
        F.stddev_pop(
            F.when(
                F.col(f"element_struct.element_{i}") == F.lit(self.getMaskValue()),
                F.lit(None),
            ).otherwise(F.col(f"element_struct.element_{i}"))
        ).alias(f"stddev_{i}")
        for i in range(1, array_size + 1)
    ]

    metric_cols = mean_cols + stddev_cols

    mean_and_stddev_dict = (
        dataset.select(element_struct).agg(*metric_cols).first().asDict()
    )
    mean = [mean_and_stddev_dict[f"mean_{i}"] for i in range(1, array_size + 1)]
    stddev = [mean_and_stddev_dict[f"stddev_{i}"] for i in range(1, array_size + 1)]

    return StandardScaleTransformer(
        inputCol=self.getInputCol(),
        outputCol=self.getOutputCol(),
        layerName=self.getLayerName(),
        inputDtype=self.getInputDtype(),
        outputDtype=self.getOutputDtype(),
        mean=mean,
        stddev=stddev,
        maskValue=self.getMaskValue(),
    )