Improve Transformation template to process only new data


As a user, I would like to only apply transformations to a subset of my source data, thus limiting to only what wasn't transformed.

This is needed when source data can scale linearly in time (is constantly ingested), so we don't want to increase transformation time with it.

Merge strategy would become "Merge"


  • 1 big data table (+ joined to several small/metadata tables) :
    Include the processing_dttm from the source_table in the transformation_table, and only select rows from source_table where processing_dttm > max(transformation_table.source_table_processing_dttm), provided that the processing_dttm is a UNIX timestamp, else check if it's not included in the distinct list of source_table_processing_dttm

  • N big data table joins: no solution yet

Some interesting docs:


Claudiu Stanciu
October 19, 2017, 2:59 PM

My current workaround for this is:

  • Modify the Transformation template to permit the modification of the "metadata.dataTransformation.dataTransformScript" attribute.

  • Generate the transformation feed with the visual query and disable the feed upon creation.

  • Copy the generated scala script from NiFi -> Feed flow -> "Update flow parameters" processor -> "metadata.dataTransformation.dataTransformScript" attribute

  • In the scala script, add a filter, after the table alias declaration, based on existing transformed data (I am including the processing_dttm of the source data in my transformation table, so it's easy to get the a difference of the unprocessed source data)
    import org.apache.spark.sql._
    var tbl10 = sqlContext.table("category.source_table").alias("tbl10")
    val maxTS = sqlContext.sql("select max(transformation_table.source_table_processing_dttm) from category2.transformation_table").collect()
    if(maxTS(0).get(0) != null){
    tbl10 = tbl10.filter("processing_dttm > " + maxTS(0).get(0).toString())
    var df ="some_column"), tbl10.col("processing_dttm").as("source_table_processing_dttm"))
    df ="*"), functions.------)

  • Add the modified scala script in the "metadata.dataTransformation.dataTransformScript" attribute, via Kylo.

There could be issues if the processing_dttm is not a valid timestamp (which we don't enforce in the schema, but we require it for MergeTable processor when we select min(processing_dttm)
If processing_dttm is a random string, we can filter source rows "where source_processing_dttm not in ( select distinct transformation_table.source_table_processing_dttm from category2.transformation_table")

Greg Hart
October 24, 2017, 6:37 PM

A couple options:

  • Use the HighWaterMark processors to record the last processing_dttm that was read from the source table. When creating the transformation, add in your filter but use a placeholder value such as the first processing_dttm of the source table. Edit the Data Transformation template to add in the HighWaterMark processors and use the ExecuteScript processor to find and replace your placeholder value with the high water mark value in the dataTransformScript attribute.

  • Create a client-spark-functions.json file with your source table filter as a function. It will be then added to the script so no manual modifications are necessary.


Greg Hart


Claudiu Stanciu