Skip to content

subtract

SubtractTransformer ¤

SubtractTransformer(
    inputCol=None,
    inputCols=None,
    outputCol=None,
    inputDtype=None,
    outputDtype=None,
    layerName=None,
    mathFloatConstant=None,
)

Bases: BaseTransformer, SingleInputSingleOutputParams, MultiInputSingleOutputParams, MathFloatConstantParams

SubtractLayer Spark Transformer for use in Spark pipelines. This transformer subtracts a column by a constant or another column.

Initializes an SubtractTransformer transformer.

Parameters:

Name Type Description Default
inputCol Optional[str]

Input column name. Only used if inputCols is not specified. If specified, we divide this column by the mathFloatConstant.

None
inputCols Optional[List[str]]

Input column names.

None
outputCol Optional[str]

Output column name.

None
inputDtype Optional[str]

Input data type to cast input column(s) to before transforming.

None
outputDtype Optional[str]

Output data type to cast the output column to after transforming.

None
layerName Optional[str]

Name of the layer. Used as the name of the tensorflow layer in the keras model. If not set, we use the uid of the Spark transformer.

None
mathFloatConstant Optional[float]

Optional constant to divide by. If not provided, then two input columns are required.

None

Returns:

Type Description
None

None - class instantiated.

Source code in src/kamae/spark/transformers/subtract.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@keyword_only
def __init__(
    self,
    inputCol: Optional[str] = None,
    inputCols: Optional[List[str]] = None,
    outputCol: Optional[str] = None,
    inputDtype: Optional[str] = None,
    outputDtype: Optional[str] = None,
    layerName: Optional[str] = None,
    mathFloatConstant: Optional[float] = None,
) -> None:
    """
    Initializes an SubtractTransformer transformer.

    :param inputCol: Input column name. Only used if inputCols is not specified.
    If specified, we divide this column by the mathFloatConstant.
    :param inputCols: Input column names.
    :param outputCol: Output column name.
    :param inputDtype: Input data type to cast input column(s) to before
    transforming.
    :param outputDtype: Output data type to cast the output column to after
    transforming.
    :param layerName: Name of the layer. Used as the name of the tensorflow layer
    in the keras model. If not set, we use the uid of the Spark transformer.
    :param mathFloatConstant: Optional constant to divide by. If not provided,
    then two input columns are required.
    :returns: None - class instantiated.
    """
    super().__init__()
    self._setDefault(mathFloatConstant=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)

compatible_dtypes property ¤

compatible_dtypes

List of compatible data types for the layer. If the computation can be performed on any data type, return None.

Returns:

Type Description
Optional[List[DataType]]

List of compatible data types for the layer.

_transform ¤

_transform(dataset)

Transforms the input dataset. Creates a new column with name outputCol, which is the same as the column with name inputCol.

Parameters:

Name Type Description Default
dataset DataFrame

Pyspark dataframe to transform.

required

Returns:

Type Description
DataFrame

Transformed pyspark dataframe.

Source code in src/kamae/spark/transformers/subtract.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def _transform(self, dataset: DataFrame) -> DataFrame:
    """
    Transforms the input dataset. Creates a new column with name `outputCol`,
    which is the same as the column with name `inputCol`.

    :param dataset: Pyspark dataframe to transform.
    :returns: Transformed pyspark dataframe.
    """
    input_cols = self.get_multiple_input_cols(
        constant_param_name="mathFloatConstant",
    )
    # input_cols can contain either actual columns or lit(constants). In order to
    # determine the datatype of the input columns, we select them from the dataset
    # first.
    input_col_names = dataset.select(input_cols).columns
    input_col_datatypes = [
        self.get_column_datatype(dataset=dataset.select(input_cols), column_name=c)
        for c in input_col_names
    ]
    output_col = multi_input_single_output_scalar_transform(
        input_cols=input_cols,
        input_col_names=input_col_names,
        input_col_datatypes=input_col_datatypes,
        func=lambda x: reduce(sub, [x[c] for c in input_col_names]),
    )

    return dataset.withColumn(self.getOutputCol(), output_col)

get_tf_layer ¤

get_tf_layer()

Gets the tensorflow layer for the divide transformer.

Returns:

Type Description
Layer

Tensorflow keras layer with name equal to the layerName parameter that performs a divide operation.

Source code in src/kamae/spark/transformers/subtract.py
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_tf_layer(self) -> tf.keras.layers.Layer:
    """
    Gets the tensorflow layer for the divide transformer.

    :returns: Tensorflow keras layer with name equal to the layerName parameter that
     performs a divide operation.
    """
    return SubtractLayer(
        name=self.getLayerName(),
        input_dtype=self.getInputTFDtype(),
        output_dtype=self.getOutputTFDtype(),
        subtrahend=self.getMathFloatConstant(),
    )