I get the following error occasionally while pushing the data into state store using kvstore.put(key, val)
via CustomProcessor process()
. Following this error streams thread moves to Dead state.
org.apache.kafka.streams.errors.StreamsException: task [0_1] Abort sending since an error caught with a previous record (timestamp 1610582731123) to topic ... due to java.lang.IllegalMonitorStateException\n\t at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:245)\n\t at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)\n\t at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:62)\n\tat org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:116)\n\t at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)\n\t at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)\n\t at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)\n\t at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
Any thoughts what could be the issue?
Complete stack trace:
org.apache.kafka.streams.errors.StreamsException: task [0_9] Abort sending since an error caught with a previous record (timestamp 1610556909445) to topic ... due to java.lang.IllegalMonitorStateException\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:245)\n\tat org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)\n\tat org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:62)\n\tat org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:116)\n\tat org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)\n\tat org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)\n\tat org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)\n\tat com.aruba.central.svc.Kafka.CustomProcessor.push_to_store_and_check_for_failure(CustomProcessor.java:146)\n\tat com.aruba.central.svc.Kafka.CustomProcessor.process(CustomProcessor.java:210)\n\tat com.aruba.central.svc.Kafka.CustomProcessor.process(CustomProcessor.java:45)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432)\n\tat org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)\nCaused by: java.lang.IllegalMonitorStateException\n\tat java.base/java.lang.Object.wait(Native Method)\n\tat org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)\n\tat org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:97)\n\tat org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1029)\n\tat org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:885)\n\tat org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
https://stackoverflow.com/questions/65743083/kafka-streams-abort-sending-since-an-error-caught-with-a-previous-record-error January 16, 2021 at 04:23AM
没有评论:
发表评论