Skip to content

max

MaxTransformer ¤

MaxTransformer(
    inputCol=None,
    inputCols=None,
    outputCol=None,
    inputDtype=None,
    outputDtype=None,
    layerName=None,
    mathFloatConstant=None,
)

Bases: BaseTransformer, SingleInputSingleOutputParams, MultiInputSingleOutputParams, MathFloatConstantParams

MaxLayer Spark Transformer for use in Spark pipelines. This transformer gets the max of a column and a constant or another column.

Initializes an MaxTransformer transformer.

Parameters:

Name Type Description Default
inputCol Optional[str]

Input column name. Only used if inputCols is not specified. If specified, we max this column by the mathFloatConstant.

None
inputCols Optional[List[str]]

Input column names.

None
outputCol Optional[str]

Output column name.

None
inputDtype Optional[str]

Input data type to cast input column(s) to before transforming.

None
outputDtype Optional[str]

Output data type to cast the output column to after transforming.

None
layerName Optional[str]

Name of the layer. Used as the name of the tensorflow layer in the keras model. If not set, we use the uid of the Spark transformer.

None
mathFloatConstant Optional[float]

Optional constant to use for max op. If not provided, then two input columns are required.

None

Returns:

Type Description
None

None - class instantiated.

Source code in src/kamae/spark/transformers/max.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@keyword_only
def __init__(
    self,
    inputCol: Optional[str] = None,
    inputCols: Optional[List[str]] = None,
    outputCol: Optional[str] = None,
    inputDtype: Optional[str] = None,
    outputDtype: Optional[str] = None,
    layerName: Optional[str] = None,
    mathFloatConstant: Optional[float] = None,
) -> None:
    """
    Initializes an MaxTransformer transformer.

    :param inputCol: Input column name. Only used if inputCols is not specified.
    If specified, we max this column by the mathFloatConstant.
    :param inputCols: Input column names.
    :param outputCol: Output column name.
    :param inputDtype: Input data type to cast input column(s) to before
    transforming.
    :param outputDtype: Output data type to cast the output column to after
    transforming.
    :param layerName: Name of the layer. Used as the name of the tensorflow layer
    in the keras model. If not set, we use the uid of the Spark transformer.
    :param mathFloatConstant: Optional constant to use for max op. If not provided,
    then two input columns are required.
    :returns: None - class instantiated.
    """
    super().__init__()
    self._setDefault(mathFloatConstant=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)

compatible_dtypes property ¤

compatible_dtypes

List of compatible data types for the layer. If the computation can be performed on any data type, return None.

Returns:

Type Description
Optional[List[DataType]]

List of compatible data types for the layer.

_transform ¤

_transform(dataset)

Transforms the input dataset. Creates a new column with name outputCol, which is the maximum of either the inputCols if specified, or the inputCol and the mathFloatConstant

Parameters:

Name Type Description Default
dataset DataFrame

Pyspark dataframe to transform.

required

Returns:

Type Description
DataFrame

Transformed pyspark dataframe.

Source code in src/kamae/spark/transformers/max.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def _transform(self, dataset: DataFrame) -> DataFrame:
    """
    Transforms the input dataset. Creates a new column with name `outputCol`,
    which is the maximum of either the `inputCols` if specified, or the `inputCol`
    and the `mathFloatConstant`

    :param dataset: Pyspark dataframe to transform.
    :returns: Transformed pyspark dataframe.
    """
    input_cols = self.get_multiple_input_cols(
        constant_param_name="mathFloatConstant"
    )
    # input_cols can contain either actual columns or lit(constants). In order to
    # determine the datatype of the input columns, we select them from the dataset
    # first.
    input_col_names = dataset.select(input_cols).columns
    input_col_datatypes = [
        self.get_column_datatype(dataset=dataset.select(input_cols), column_name=c)
        for c in input_col_names
    ]

    output_col = multi_input_single_output_scalar_transform(
        input_cols=input_cols,
        input_col_names=input_col_names,
        input_col_datatypes=input_col_datatypes,
        func=lambda x: F.greatest(*[x[c] for c in input_col_names]),
    )
    return dataset.withColumn(self.getOutputCol(), output_col)

get_tf_layer ¤

get_tf_layer()

Gets the tensorflow layer for the max transformer.

Returns:

Type Description
Layer

Tensorflow keras layer with name equal to the layerName parameter that performs a max operation.

Source code in src/kamae/spark/transformers/max.py
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_tf_layer(self) -> tf.keras.layers.Layer:
    """
    Gets the tensorflow layer for the max transformer.

    :returns: Tensorflow keras layer with name equal to the layerName parameter that
     performs a max operation.
    """
    return MaxLayer(
        name=self.getLayerName(),
        input_dtype=self.getInputTFDtype(),
        output_dtype=self.getOutputTFDtype(),
        max_constant=self.getMathFloatConstant(),
    )