Failed to run data transformation feed if there is a column of type Date with 'Date/Time' standardizer or set default value

Description

Re-produce steps:
1. create a Hive table with a column of type Date e.g.
create table foo {
id string,
start_date timestamp
}

2. insert a record ("a", "2012-02-12 10:20:00") and ("b", null) into the table 'foo'

3. create a 'Data Transformation' feed by selecting the 'foo' table as the data source

4. add default value standardizer with the default value to '1970-01-01 00:00:00'

5. Run the feed

Expected result:
The feed should be run successfully

Actual result:

The feed failed with following error:
8/11/13 15:28:39 ERROR executor.Executor: Exception in task 1.0 in stage 1.0 (TID 3)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Date is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), StringType), true) AS id#88
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, date), StringType), true) AS date#89
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, dlp_reject_reason), StringType), true) AS dlp_reject_reason#90
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, processing_dttm), StringType), true) AS processing_dttm#91
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.sql.Date is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 21 more

We use spark 2.x instead of spark 1.6

After some debugging, I think the root cause is in the following code snippet in ModifiedSchema.java:getValidTableSchema():
if (dataType != null && dataType.isDateOrTimestamp()) {

  • field = new StructField(field.name(), DataTypes.StringType, field.nullable(), field.metadata());*
    }
    Kylo set the field of type Date to 'String' explicitly and it seems that java.sql.Date can't be converted to Stirng in Spark 2.x

Environment

None

Activity

Show:
Boying Lu
November 28, 2018, 12:42 PM

I created a PR for this issue https://github.com/Teradata/kylo/pull/95

Boying Lu
December 25, 2018, 4:27 PM

I can't re-produce the issue on Kylo 0.10.0, so please close this JIRA.

Done

Assignee

Unassigned

Reporter

Boying Lu

Labels

None

Reviewer

None

Story point estimate

None

Components

Priority

Medium