Dedupe and merge can create duplicated rows if input is processed out of order

Description

There is a corner case in the MergeTable NiFi processor when processing multiple inputs to a feed at one time: if two inputs contain the same record but they are processed out-of-order – the one with the larger processing_dttm value gets merged first – then the target table can end up with multiple copies of the record.

We found this issue while processing a large amount of backfill data consisting of daily table snapshots. We had the "Validate and Split Records" processor set to execute up to 10 flowfiles in parallel, so occasionally a given day's input would finish in spark before a previous day, so the merges into the target table would be executed with processing_dttm out of sequence.

The root cause in the "having min(processing_dttm) = ${feedts}" clause of the hive query. In the case of an out of order execution, the target table already has the row with a later processing_dttm value, but the "having..." clause will still include the row as if it were not in the target. The solution is to add a "count(processing_dttm) = 1" condition to the "having..." clause so that we only insert rows that are from groups that only come from the incoming input data.

I have a pull request with this fix ready to go and will submit it as soon as I get an issue number for this ticket.

Environment

Ubuntu 16.04.3 LTS
AWS EC2 m4.2xlarge instance

Assignee

Greg Hart

Reporter

Jonathan Traupman

Labels

Reviewer

None

Story point estimate

None

Components

Sprint

None

Fix versions

Affects versions

Priority

Medium
Configure