Using the Pipeline Designer
Pipelines consist of a sequence of steps that are applied to the data in sequential order. Steps can be of kind
Field-level steps can define one transform and/or filter for each field of the value of the Kafka records.
Record-level steps can process the entire record with one transform and/or filter.
Navigate to the pipeline, which you have just created, in your browser:
Click on Edit in Pipeline Designer.
The pipeline designer shows a sample of the most recent records of the source stream in a spreadsheet. In the following example, you can see that records have four fields in their value: type, createdAt, data, and id.
If you are interested in the raw records, please click on Raw:
Let's build a first transform which turns a UNIX timestamp in the field createdAt into a human-readable formatted timestamp.
First, we need to add a step. We choose Transform single fields:
Second, click on Apply transform in the header of the column createdAt. Now we see the list of the pre-defined transforms that are available in DataCater. You can view them on our GitHub.
User-defined transform, whichallows us to provide a custom Python transform that is applied to the selected field.
We can provide the following Python code to turn the UNIX timestamp into a formatted timestamp:
from datetime import datetime
# field: Value of the field that the transform is applied to.
# record: The entire record as dict.
def transform(field, record: dict):
# Turn timestamp from milliseconds into seconds
ts_sec = int(field / 1000)
return datetime \
Please click Save & Run to see the preview of the transform, which shows how the transform would change the sample data:
Next, let's have a look at how to apply transforms to entire records. Add a new step of type
In the used example data, the field
dataholds a deeply-nested JSON structure. We want to flatten it and replace it with the value of the key
We just switched to the Raw view, which might be helpful for displaying fields with large values, e.g., JSON objects.
You can switch back and forth between the Grid and Raw view as needed.
Please click Apply transform and select
User-defined transformfrom the sidebar.
We can provide the following Python transform to replace the field
datawith the content of the key
# record["key"]: The key of the Apache Kafka record. Can be overwritten.
# record["value"]: The value of the Apache Kafka record. Can be overwritten.
# record["metadata"]: The metadata of the Apache Kafka record, e.g., the offset or the timestamp. Cannot be overwritten.
def transform(record: dict) -> dict:
record["value"]["data"] = \
Click Save & Run to preview the transform: