ImportSqoop step using sqoop log to find --lastvalue in incrimental append

Description

if i change any sqoop logging properties it wont work

/**

  • Class to handle a stream, and clear it to avoid filling up buffer
    */
    class SqoopThreadedStreamHandler extends Thread {

private InputStream inputStream;
private Boolean logLineWithRetrievedRecordsFound = false;
private Boolean logLineWithNewHighWaterMarkFound = false;

private ComponentLog logger = null;

private SqoopLoadStrategy sourceLoadStrategy;

private String[] logLines;
private CountDownLatch latch;

/**

  • Constructor
    *

  • @param inputStream input stream

  • @param logger logger

  • @param logLines log lines

  • @param latch countdown latch

  • @param sourceLoadStrategy load strategy
    */
    public SqoopThreadedStreamHandler(InputStream inputStream,
    ComponentLog logger,
    String[] logLines,
    CountDownLatch latch,
    SqoopLoadStrategy sourceLoadStrategy) {
    if (inputStream == null) {
    throw new IllegalArgumentException("Input stream has invalid value of null");
    }
    checkNotNull(inputStream);
    checkNotNull(logger);
    checkNotNull(logLines);
    checkNotNull(latch);
    checkNotNull(sourceLoadStrategy);
    this.inputStream = inputStream;
    this.logger = logger;
    this.logLines = logLines;
    this.latch = latch;
    this.sourceLoadStrategy = sourceLoadStrategy;

this.logger.info("Input stream initialized for type: " + inputStream.getClass().toString());
}

/**

  • Run the thread
    */
    public void run() {
    BufferedReader bufferedReader;
    bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
    String line;

/* Clear the buffer */
try {
while ((line = bufferedReader.readLine()) != null) {

String SEARCH_STRING_FOR_RETRIEVED_RECORDS_FOUND = "INFO mapreduce.ImportJobBase: Retrieved";
String SEARCH_STRING_FOR_NO_NEW_RECORDS_FOUND = "No new rows detected since last import";
if ((!logLineWithRetrievedRecordsFound)
&&
(line.contains(SEARCH_STRING_FOR_RETRIEVED_RECORDS_FOUND)

line.contains(SEARCH_STRING_FOR_NO_NEW_RECORDS_FOUND))) {
logLineWithRetrievedRecordsFound = true;
logLines[0] = line;
latch.countDown();
}

if (this.sourceLoadStrategy != SqoopLoadStrategy.FULL_LOAD) {
String SEARCH_STRING_FOR_NEW_HIGH_WATERMARK_FOUND = "INFO tool.ImportTool: --last-value";
if ((!logLineWithNewHighWaterMarkFound)
&& (line.contains(SEARCH_STRING_FOR_NEW_HIGH_WATERMARK_FOUND))) {
logLineWithNewHighWaterMarkFound = true;
logLines[1] = line;
latch.countDown();
}
}

logger.info(line);
}
} catch (IOException ioe) {
logger.warn("I/O error occurred while handling stream. [{}]", new Object[]{ioe.getMessage()});
} catch (Throwable t) {
logger.warn("An error occurred handling stream. [{}]", new Object[]{t.getMessage()});
} finally {
for (long i = 0; i < latch.getCount(); i++) {
latch.countDown();
}
try {
bufferedReader.close();
} catch (IOException ioe) {
logger.warn("I/O error closing buffered reader for stream");
}
}
}
}

Status

Assignee

Unassigned

Reporter

Subin Soman

Labels

None

Reviewer

None

Components

Priority

Medium
Configure