Failed to run data transformation feed if there is a column of type Date with 'Date/Time' standardizer or set default value

Description

Re-produce steps:
1. create a Hive table with a column of type Date e.g.
create table foo {
id string,
start_date timestamp
}

2. insert a record ("a", "2012-02-12 10:20:00") and ("b", null) into the table 'foo'

3. create a 'Data Transformation' feed by selecting the 'foo' table as the data source

4. add default value standardizer with the default value to '1970-01-01 00:00:00'

5. Run the feed

Expected result:
The feed should be run successfully

Actual result:

The feed failed with following error:
8/11/13 15:28:39 ERROR executor.Executor: Exception in task 1.0 in stage 1.0 (TID 3)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Date is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), StringType), true) AS id#88
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, date), StringType), true) AS date#89
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, dlp_reject_reason), StringType), true) AS dlp_reject_reason#90
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, processing_dttm), StringType), true) AS processing_dttm#91
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.sql.Date is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 21 more

We use spark 2.x instead of spark 1.6

After some debugging, I think the root cause is in the following code snippet in ModifiedSchema.java:getValidTableSchema():
if (dataType != null && dataType.isDateOrTimestamp()) {

  • field = new StructField(field.name(), DataTypes.StringType, field.nullable(), field.metadata());*
    }
    Kylo set the field of type Date to 'String' explicitly and it seems that java.sql.Date can't be converted to Stirng in Spark 2.x

Environment

None

Status

Assignee

Unassigned

Reporter

Boying Lu

Labels

None

Reviewer

None

Story point estimate

None

Components

Priority

Medium
Configure