Masalah yang saya alami adalah ketika konektor wastafel jdbc menggunakan pesan kafka, variabel kunci saat menulis ke db adalah nol.

Namun, ketika saya mengkonsumsi langsung melalui kafka-avro-consumer - saya dapat melihat variabel kunci dan nilai dengan nilainya karena saya menggunakan konfigurasi ini: --property print.key=true.

TANYAKAN: apakah ada cara untuk memastikan bahwa konektor jdbc memproses nilai variabel kunci pesan?

Konfigurasi konsol kafka-avro

/opt/confluent-5.4.1/bin/kafka-avro-console-consumer \
    --bootstrap-server "localhost:9092" \
    --topic equipmentidentifier.persist \
    --property parse.key=true \
    --property key.separator=~ \
    --property print.key=true \
    --property schema.registry.url="http://localhost:8081" \
    --property key.schema=[$KEY_SCHEMA] \
    --property value.schema=[$IDENTIFIER_SCHEMA,$VALUE_SCHEMA]

Kesalahan:

org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "assignment_table" ("created_date","custome
r","id_type","id_value") VALUES('1970-01-01 03:25:44.567+00'::timestamp,123,'BILL_OF_LADING','BOL-123') was aborted: ERROR: null value in column "equipment_ide
ntifier_type" violates not-null constraint
  Detail: Failing row contains (null, null, null, null, 1970-01-01 03:25:44.567, 123, id, 56).  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "equipment_identifier_type" violates not-null constraint

Konfigurasi wastafel:

task.max=1
topic=assignment
connect.class=io.confluet.connect.jdbc.JdbcSinkConnector

connection.url=jdbc:postgresql://localhost:5432/db
connection.user=test
connection.password=test

table.name.format=assignment_table
auto.create=false
insert.mode=insert
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId

transforms=flatten

transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Key
transforms.flattenKey.delimiter=_

transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flattenKey.delimiter=_

Kunci Kafka:

{
  "assignmentKey": {
    "cpId": {
      "long": 1001
    },
    "equip": {
      "Identifier": {
        "type": "eq",
        "value": "eq_45"
      }
    },
    "vendorId": {
      "string": "vendor"
    }
  }
}

Nilai Kafka:


{
  "assigmentValue": {
    "id": {
      "Identifier": {
        "type": "id",
        "value": "56"
      }
    },
    "timestamp": {
      "long": 1234456756
    },
    "customer": {
      "long": 123
    }
  }
}

0
iissa1 14 Mei 2020, 23:27

1 menjawab

Jawaban Terbaik

Anda perlu memberi tahu konektor untuk menggunakan bidang dari kunci, karena secara default tidak.

pk.mode=record_key

Namun Anda perlu menggunakan bidang dari baik Kunci atau Nilai, bukan keduanya seperti yang Anda miliki di konfigurasi Anda saat ini:

pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId

Jika Anda menyetel pk.mode=record_key maka pk.fields akan merujuk ke bidang di kunci pesan.

Ref: https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html#sink-pk-config-options

Lihat juga https://rmoff.dev/kafka-jdbc-video

1
Robin Moffatt 14 Mei 2020, 20:58