Kamae¤
Kamae bridges the gap between offline data processing and online model serving. Build preprocessing pipelines in Spark for big data workloads, then export them as Keras models for low-latency inference.
Why Kamae?¤
Training and serving often happen on different platforms. Spark for batch processing at scale, TensorFlow for low-latency inference. Manually reimplementing preprocessing logic in both places creates: - Training/serving skew: Subtle bugs from inconsistent implementations - Development overhead: Writing and maintaining duplicate code - Deployment friction: Changes require updates in multiple systems
Kamae solves this by generating the inference model directly from your Spark pipeline, guaranteeing consistency between training and serving.
Installation¤
pip install kamae
Platform notes: Kamae supports tensorflow>=2.9.1,<2.19.0. For Mac ARM with tensorflow<2.13.0, install tensorflow-macos manually. TensorFlow no longer supports Mac x86_64 from version 2.18.0 onwards.
Quick Start¤
from pyspark.sql import SparkSession
from kamae.spark.estimators import StandardScaleEstimator, StringIndexEstimator
from kamae.spark.pipeline import KamaeSparkPipeline
from kamae.spark.transformers import LogTransformer, ArrayConcatenateTransformer
# Define preprocessing in Spark
spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(
[(1, 2, "a"), (4, 5, "b"), (7, 8, "c")],
["col1", "col2", "category"]
)
pipeline = KamaeSparkPipeline(stages=[
LogTransformer(inputCol="col1", outputCol="log_col1", alpha=1, inputDtype="float"),
ArrayConcatenateTransformer(inputCols=["log_col1", "col2"], outputCol="features", inputDtype="float"),
StandardScaleEstimator(inputCol="features", outputCol="scaled_features"),
StringIndexEstimator(inputCol="category", outputCol="category_indexed"),
])
fitted_pipeline = pipeline.fit(data)
fitted_pipeline.transform(data).show() # Use in Spark
# Export for TensorFlow Serving
tf_input_schema = [
{"name": "col1", "dtype": "int32", "shape": (None, 1)},
{"name": "col2", "dtype": "int32", "shape": (None, 1)},
{"name": "category", "dtype": "string", "shape": (None, 1)},
]
keras_model = fitted_pipeline.build_keras_model(tf_input_schema=tf_input_schema)
keras_model.save("./preprocessing_model.keras")
Usage¤
Spark Pipeline (Recommended): Build preprocessing pipelines using Spark transformers and estimators, fit on DataFrames, then export as Keras models. See examples for common patterns.
Direct Keras Layers: Import and compose Keras layers directly for non-tabular data or custom workflows. Browse available layers in the transformation table below.
For Scikit-learn support (experimental, unmaintained), see sklearn examples.
Documentation¤
- Examples: Full working examples for common use cases
- Chaining models: Use Kamae preprocessing models as inputs to trainable models
- Type parity: Ensuring consistent dtypes between Spark and Keras
- Shape parity: Ensuring consistent shapes between Spark and Keras
- Testing inference: Validate model outputs with TensorFlow Serving
- Adding transformers: Contributing new transformations
Supported Preprocessing Layers¤
| Transformation | Description | Keras Layer | Spark Transformer | Scikit-learn Transformer |
|---|---|---|---|---|
| AbsoluteValue | Applies the abs(x) transform. |
Link | Link | Not yet implemented |
| ArrayConcatenate | Assembles multiple features into a single array. | Link | Link | Link |
| ArrayCrop | Crops or pads a feature array to a consistent size. | Link | Link | Not yet implemented |
| ArraySplit | Splits a feature array into multiple features. | Link | Link | Link |
| ArraySubtractMinimum | Subtracts the minimum element in an array from therest to compute a timestamp difference. Ignores padded values. | Link | Link | Not yet implemented |
| BearingAngle | Compute the bearing angle (https://en.wikipedia.org/wiki/Bearing_(navigation)) between two pairs of lat/long. | Link | Link | Not yet implemented |
| Bin | Bins a numerical column into string categorical bins. Users can specify the bin values, labels and a default label. | Link | Link | Not yet implemented |
| BloomEncode | Hash encodes a string feature multiple times to create an array of indices. Useful for compressing input dimensions for embeddings. Paper: https://arxiv.org/pdf/1706.03993.pdf | Link | Link | Not yet implemented |
| Bucketize | Buckets a numerical column into integer bins. | Link | Link | Not yet implemented |
| ConditionalStandardScale | Normalises by the mean and standard deviation, with ability to: apply a mask on another column, not scale the zeros, and apply a non standard scaling function. | Link | Link | Not yet implemented |
| CosineSimilarity | Computes the cosine similarity between two array features. | Link | Link | Not yet implemented |
| CurrentDate | Returns the current date for use in other transformers. | Link | Link | Not yet implemented |
| CurrentDateTime | Returns the current date time in the format yyyy-MM-dd HH ss.SSS for use in other transformers. |
Link | Link | Not yet implemented |
| CurrentUnixTimestamp | Returns the current unix timestamp in either seconds or milliseconds for use in other transformers. | Link | Link | Not yet implemented |
| DateAdd | Adds a static or dynamic number of days to a date feature. NOTE: Destroys any time component of the datetime if present. | Link | Link | Not yet implemented |
| DateDiff | Computes the number of days between two date features. | Link | Link | Not yet implemented |
| DateParse | Parses a string date of format YYYY-MM-DD to extract a given date part. E.g. day of year. | Link | Link | Not yet implemented |
| DateTimeToUnixTimestamp | Converts a UTC datetime string to unix timestamp. | Link | Link | Not yet implemented |
| Divide | Divides a single feature by a constant or divides multiple features against each other. | Link | Link | Not yet implemented |
| Exp | Applies the exp(x) operation to the feature. | Link | Link | Not yet implemented |
| Exponent | Applies the x^exponent to a single feature or x^y for multiple features. | Link | Link | Not yet implemented |
| HashIndex | Transforms strings to indices via a hash table of predeterminded size. | Link | Link | Not yet implemented |
| HaversineDistance | Computes the haversine distance between latitude and longitude pairs. | Link | Link | Not yet implemented |
| Identity | Applies the identity operation, leaving the input the same. | Link | Link | Link |
| IfStatement | Computes a simple if statement on a set of columns/tensors and/or constants. | Link | Link | Not yet implemented |
| Impute | Performs imputation of either mean or median value of the data over a specified mask. | Link | Link | Not yet implemented |
| LambdaFunction | Transforms an input (or multiple inputs) to an output (or multiple outputs) with a user provided tensorflow function. | Link | Link | Not yet implemented |
| ListMax | Computes the listwise max of a feature, optionally calculated only on the top items based on another given feature. | Link | Link | Not yet implemented |
| ListMean | Computes the listwise mean of a feature, optionally calculated only on the top items based on another given feature. | Link | Link | Not yet implemented |
| ListMedian | Computes the listwise median of a feature, optionally calculated only on the top items based on another given feature. | Link | Link | Not yet implemented |
| ListMin | Computes the listwise min of a feature, optionally calculated only on the top items based on another given feature. | Link | Link | Not yet implemented |
| ListRank | Computes the listwise rank (ordering) of a feature. | Link | Link | Not yet implemented |
| ListStdDev | Computes the listwise standard deviation of a feature, optionally calculated only on the top items based on another given feature. | Link | Link | Not yet implemented |
| Log | Applies the natural logarithm log(alpha + x) transform . |
Link | Link | Link |
| LogicalAnd | Performs an and(x, y) operation on multiple boolean features. | Link | Link | Not yet implemented |
| LogicalNot | Performs a not(x) operation on a single boolean feature. | Link | Link | Not yet implemented |
| LogicalOr | Performs an or(x, y) operation on multiple boolean features. | Link | Link | Not yet implemented |
| Max | Computes the maximum of a feature with a constant or multiple other features. | Link | Link | Not yet implemented |
| Mean | Computes the mean of a feature with a constant or multiple other features. | Link | Link | Not yet implemented |
| Min | Computes the minimum of a feature with a constant or multiple other features. | Link | Link | Not yet implemented |
| MinHashIndex | Creates an integer bit array from a set of strings using the MinHash algorithm. | Link | Link | Not yet implemented |
| MinMaxScale | Scales the input feature by the min/max resulting in a feature in [0, 1]. | Link | Link | Not yet implemented |
| Modulo | Computes the modulo of a feature with the mod divisor being a constant or another feature. | Link | Link | Not yet implemented |
| Multiply | Multiplies a single feature by a constant or multiples multiple features together. | Link | Link | Not yet implemented |
| NumericalIfStatement | Performs a simple if else statement witha given operator. Value to check, result if true or false can be constants or features. | Link | Link | Not yet implemented |
| OneHotEncode | Transforms a string to a one-hot array. | Link | Link | Not yet implemented |
| OrdinalArrayEncode | Encodes strings in an array according to the order in which they appear. Only for 2D tensors. | Link | Link | Not yet implemented |
| Round | Rounds a floating feature to the nearest integer using ceil, floor or a standard round op. |
Link | Link | Not yet implemented |
| RoundToDecimal | Rounds a floating feature to the nearest decimal precision. | Link | Link | Not yet implemented |
| SharedOneHotEncode | Transforms a string to a one-hot array, using labels across multiple inputs to determine the one-hot size. | Link | Link | Not yet implemented |
| SharedStringIndex | Transforms strings to indices via a vocabulary lookup, sharing the vocabulary across multiple inputs. | Link | Link | Not yet implemented |
| SingleFeatureArrayStandardScale | Normalises by the mean and standard deviation calculated over all elements of all inputs, with ability to mask a specified value. | Link | Link | Not yet implemented |
| StandardScale | Normalises by the mean and standard deviation, with ability to mask a specified value. | Link | Link | Link |
| StringAffix | Prefixes and suffixes a string with provided constants. | Link | Link | Not yet implemented |
| StringArrayConstant | Inserts provided string array constant into a column. | Link | Link | Not yet implemented |
| StringCase | Applies an upper or lower casing operation to the feature. | Link | Link | Not yet implemented |
| StringConcatenate | Joins string columns using the provided separator. | Link | Link | Not yet implemented |
| StringContains | Checks for the existence of a constant or tensor-element substring within a feature. | Link | Link | Not yet implemented |
| StringContainsList | Checks for the existence of any string from a list of string constants within a feature. | Link | Link | Not yet implemented |
| StringEqualsIfStatement | Performs a simple if else statement on string equality. Value to check, result if true or false can be constants or features. | Link | Link | Not yet implemented |
| StringIndex | Transforms strings to indices via a vocabulary lookup | Link | Link | Not yet implemented |
| StringListToString | Concatenates a list of strings to a single string with a given delimiter. | Link | Link | Not yet implemented |
| StringMap | Maps a list of string values to a list of other string values with a standard CASE WHEN statement. Can provide a default value for ELSE. | Link | Link | Not yet implemented |
| StringIsInList | Checks if the feature is equal to at least one of the strings provided. | Link | Link | Not yet implemented |
| StringReplace | Performs a regex replace operation on a feature with constant params or between multiple features | Link | Link | Not yet implemented |
| StringToStringList | Splits a string by a separator, returning a list of parametrised length (with a default value for missing inputs). | Link | Link | Not yet implemented |
| SubStringDelimAtIndex | Splits a string column using the provided delimiter, and returns the value at the index given. If the index is out of bounds, returns a given default value | Link | Link | Not yet implemented |
| Subtract | Subtracts a constant from a single feature or subtracts multiple features from each other. | Link | Link | Not yet implemented |
| Sum | Adds a constant to a single feature or sums multiple features together. | Link | Link | Not yet implemented |
| UnixTimestampToDateTime | Converts a unix timestamp to a UTC datetime string. | Link | Link | Not yet implemented |
Development¤
Setup¤
Requirements: Python 3.10 (for development), pipx (installation instructions)
make setup # Install dependencies and pre-commit hooks
make all # Run tests, formatting, and linting
make help # See all available commands
The package supports Python 3.8-3.12 in production.
Common Commands¤
make run-example # Run example pipeline
make test-tf-serving # Test TensorFlow Serving inference
make test-end-to-end # Run example + test serving
Contributing¤
Create a branch from main and open a pull request. Follow the adding transformers guide for new transformers.
Code quality: Pre-commit hooks enforce formatting and linting. Install with uv run pre-commit install. PRs must pass all tests in tests/.
Versioning: Automated via semantic-release. Use conventional commit prefixes in PR titles: fix: (patch), feat: (minor), BREAKING CHANGE: (major).
Contact: Questions? Reach out to the Kamae team.
ss.SSS for use in other transformers.