I am working with Kafka in C#. I am trying to figure out if there is a race condition here in between I process event and clearing the queue.
This is how I start processing messages:
Task.Run(() => { return kafkaService.StartProcessing(); }); This is the method where I consume data from kafka and process it. If I am able to succesfully process the data then I will commit the offset.
public async Task StartProcessing() { foreach (var message in _kafkaListener.Read()) { bool success = await ProcessData(message); if (success) { try { await _kafkaListener.CommitOffset(); } catch (Exception ex) { // log error } } } } And this is my Read method. How does this while (true) logic working in Read method.
private List<ConsumeResult<Ignore, byte[]>> _messageHolder; public IEnumerable<Message<Ignore, byte[]>> Read() { try { while (true) { var consumeResult = _consumer.Consume(); _messageHolder.Add(consumeResult); yield return consumeResult.Message; } } finally { _consumer.Close(); } } And here is how we commit offset and also clear the queue:
private async Task CommitOffset() { if (_messageHolder.Count == 0) return; var offsetsPickedForCommit = _messageHolder.GroupBy(m => m.Partition) .Select(p => new TopicPartitionOffset(_topic, p.Key, new Offset(p.Max(m => m.Offset.Value)))); try { _consumer.Commit(offsetsPickedForCommit); // will this cause problem where I haven't process message yet but still cleared it from queue. _messageHolder.Clear(); } catch (KafkaException e) { // log error } } Problem Statement
Now my confusion is - In StartProcessing method I try to get data by calling Read method and in Read method I have while(true) loop which will keep adding data in _messageHolder List. And then when we commit the offset it will clear that List. Is there a race condition here where I can clear message from the list which I haven't processed yet?
没有评论:
发表评论