Issues with Kafka connectors

Hi everyone,

As established in one of the forum topics (Requisition data feed) in SELV v3 we’ve started using Kafka to transfer the data from requisition and referencedata schemas to newly created schema requisitionbatch. Unfortunately, we’ve recently discovered that the data is not being transferred at all anymore. In logs we’ve found occurrences of below information for almost all of the sink workers:

Task is being killed and will not recover until manually restarted

It occurred that an unrecoverable Exception thrown during the sink task execution causes its silent shutdown. The whole service is still working but the sink tasks are not transferring the data anymore, and therefore we have our schema out of date. We could find two main causes of the Exceptions, and I’m hoping after resolving those no other will occur.

First one is related to the duplicated key issues:

duplicate key value violates unique constraint “batch_req_prod_fac_per”

and the second one refers to issue with mapping one of the column:

column “golivedate” is of type date but expression is of type integer

(more detailed logs can be found there)

We would like to open a discussion on how to best approach resolving them.

  1. What is the best option to ensure that the messages are processed only one, or maybe some other idea why duplicate key issue is present?
  2. How to configure the mapping of certain columns, or do you know a reason why it’s not done automatically? (Both columns have the same name and type - date)
  3. Do you have any other recommendations to use data-pumps in a production environment? We would like to monitor if some of the tasks are down. Current ideas are to execute API calls to all connectors regularly as a health check or to control the logs and catch such messages with some tool. In both cases, for now, we are thinking about notifying the administrators if any of the connectors is down.

Thanks in advance for sharing your thoughts.

Best Regards,
Oskar

Hey @ohinc,

Thanks for bringing this to the attention of the community. It is good to know about the different snags that can happen with the data-pump approach.

For the first issue, I am curious how it came to that point. It would only come up with this error if the requisition ID changed somehow, as Kafka Connect will attempt to update instead of insert if it finds the ID already exists in the sink (batch requisition) database. Did the requisition database get cleared without clearing the batch requisition database?

For the second issue, could it be solved with a different configuration? I see something here about time.precision.mode (https://debezium.io/documentation/reference/0.10/connectors/postgresql.html#temporal-values).

As for knowing when a connector is down, either idea seems fine to me.

Shalom,
Chongsun

Thanks for the quick reply @Chongsun_Ahn!

We certainly did clear the database a few times since the initial configuration, but as far as I know always by removing the whole DB volume. So when the requisition table was cleared, the requisitionbatch was cleared as well.

According to the second issue, I agree that it’s a configuration issue, but I was having difficulties with configuration of a certain colum. Using the property you’ve mentioned lead me to another issue. I’ve added it in the sink JSON config:

“config”: {
“time.precision.mode”: “connect”,

But then I’ve encounred:

Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField (GenericDatabaseDialect.java:1467)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField (PreparedStatementBinder.java:149)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields (PreparedStatementBinder.java:143)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord (PreparedStatementBinder.java:78)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
… 10 more

Best,
Oskar

Hey @ohinc,

I would monitor the issue with the database to see how the IDs ended up mismatched.

As for the STRUCT issue, is this because some tables (facilities and geographic_zones) have some PostGIS fields like Geometry? We might need to exclude those from the data-pump. Can you clarify which fields are complaining?

Shalom,
Chongsun

Hi @Chongsun_Ahn,

I’ve uploaded the full log of the third issue on the Google Drive. The error is not pointing to any column, but it occurs only for geographic_zones and ‘facilities’ tables that are only ones with geometry columns, so I think that might be the case.

Best,
Oskar

Hey @ohinc,

It looks like Kafka Connect does not support that type currently. I assume that those properties are not necessary to the requisition batch service, so it seems best to exclude those columns from the data-pump.

Shalom,
Chongsun