I followed Debezium tutorial (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres) and all received CDC data from Postgres are sent to Kafka topic in JSON format with schema - how to get rid of schema?
Here is config of connector (launched in Docker container)
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include": "inventory" } }
The JSON schema is still in message. I managed to get rid of it only when launched Docker container with following environment variables:
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
Why I cannot achieve exactly the same from connector configuration?
Example of Kafka message with schema:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}
Example (desired by me) w/o schema:
{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}
Debezium container is run with following command:
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.3
or as docker-compose
connect: image: debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
and CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
was added later by me, but without them I cannot get rid of schema.
connect
docker container (Kafka connectors servers cluster - if I understood it correctly) is started without any connector. I create it manually.
LOGs from docker-compose for connect when connector created
connect_1 | 2021-01-29 18:04:57,395 INFO || JsonConverterConfig values: connect_1 | converter.type = key connect_1 | decimal.format = BASE64 connect_1 | schemas.cache.size = 1000 connect_1 | schemas.enable = true connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig] connect_1 | 2021-01-29 18:04:57,396 INFO || Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task inventory-connector-0 using the worker config [org.apache.kafka.connect.runtime.Worker] connect_1 | 2021-01-29 18:04:57,396 INFO || JsonConverterConfig values: connect_1 | converter.type = value connect_1 | decimal.format = BASE64 connect_1 | schemas.cache.size = 1000 connect_1 | schemas.enable = true connect_1 | [org.apache.kafka.connect.json.JsonConverterConfig] ... connect_1 | 2021-01-29 18:04:57,458 INFO || Starting PostgresConnectorTask with configuration: [io.debezium.connector.common.BaseSourceTask] connect_1 | 2021-01-29 18:04:57,460 INFO || key.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask] connect_1 | 2021-01-29 18:04:57,460 INFO || value.converter.schemas.enable = false [io.debezium.connector.common.BaseSourceTask]
Here are get connector command output:
$ curl -i http://localhost:8083/connectors/inventory-connector {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector",**"key.converter.schemas.enable":"false"**,"database.user":"postgres","database.dbname":"postgres","tasks.max":"1","database.hostname":"postgres","database.password":"postgres",**"value.converter.schemas.enable":"false"**,"name":"inventory-connector","database.server.name":"dbserver1","database.port":"5432","schema.include":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}
https://stackoverflow.com/questions/65954394/how-to-disable-json-schema-in-kafka-source-connector-e-g-debezium January 29, 2021 at 08:22PM
没有评论:
发表评论