2021年1月29日星期五

How to disable JSON schema in Kafka Source Connector (e.g. Debezium)

I followed Debezium tutorial (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres) and all received CDC data from Postgres are sent to Kafka topic in JSON format with schema - how to get rid of schema?

Here is config of connector (launched in Docker container)

{      "name": "inventory-connector",      "config": {          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",          "tasks.max": "1",          "key.converter.schemas.enable": "false",          "value.converter.schemas.enable": "false",          "database.hostname": "postgres",          "database.port": "5432",          "database.user": "postgres",          "database.password": "postgres",          "database.dbname" : "postgres",          "database.server.name": "dbserver1",          "schema.include": "inventory"      }  }  

The JSON schema is still in message. I managed to get rid of it only when launched Docker container with following environment variables:

 - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false   - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false  

Why I cannot achieve exactly the same from connector configuration?

Example of Kafka message with schema:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}  

Example (desired by me) w/o schema:

{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}  

Debezium container is run with following command:

docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.3  

or as docker-compose

  connect:      image: debezium/connect:${DEBEZIUM_VERSION}      ports:       - 8083:8083      links:       - kafka       - postgres      environment:       - BOOTSTRAP_SERVERS=kafka:9092       - GROUP_ID=1       - CONFIG_STORAGE_TOPIC=my_connect_configs       - OFFSET_STORAGE_TOPIC=my_connect_offsets       - STATUS_STORAGE_TOPIC=my_connect_statuses       - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false       - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false  

CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false and CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false was added later by me, but without them I cannot get rid of schema.

connect docker container (Kafka connectors servers cluster - if I understood it correctly) is started without any connector. I create it manually.

LOGs from docker-compose for connect when connector created

connect_1    | 2021-01-29 18:04:57,395 INFO   ||  JsonConverterConfig values:   connect_1    |  converter.type = key  connect_1    |  decimal.format = BASE64  connect_1    |  schemas.cache.size = 1000  connect_1    |  schemas.enable = true  connect_1    |    [org.apache.kafka.connect.json.JsonConverterConfig]  connect_1    | 2021-01-29 18:04:57,396 INFO   ||  Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task inventory-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]  connect_1    | 2021-01-29 18:04:57,396 INFO   ||  JsonConverterConfig values:   connect_1    |  converter.type = value  connect_1    |  decimal.format = BASE64  connect_1    |  schemas.cache.size = 1000  connect_1    |  schemas.enable = true  connect_1    |    [org.apache.kafka.connect.json.JsonConverterConfig]  ...  connect_1    | 2021-01-29 18:04:57,458 INFO   ||  Starting PostgresConnectorTask with configuration:   [io.debezium.connector.common.BaseSourceTask]  connect_1    | 2021-01-29 18:04:57,460 INFO   ||     key.converter.schemas.enable = false   [io.debezium.connector.common.BaseSourceTask]  connect_1    | 2021-01-29 18:04:57,460 INFO   ||     value.converter.schemas.enable = false   [io.debezium.connector.common.BaseSourceTask]  

Here are get connector command output:

$ curl -i http://localhost:8083/connectors/inventory-connector    {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector",**"key.converter.schemas.enable":"false"**,"database.user":"postgres","database.dbname":"postgres","tasks.max":"1","database.hostname":"postgres","database.password":"postgres",**"value.converter.schemas.enable":"false"**,"name":"inventory-connector","database.server.name":"dbserver1","database.port":"5432","schema.include":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}  
https://stackoverflow.com/questions/65954394/how-to-disable-json-schema-in-kafka-source-connector-e-g-debezium January 29, 2021 at 08:22PM

没有评论:

发表评论