2021年3月22日星期一

Potential race condition while processing data from kafka and clearing the queue?

I am working with Kafka in C#. I am trying to figure out if there is a race condition here in between I process event and clearing the queue.

This is how I start processing messages:

Task.Run(() =>  {      return kafkaService.StartProcessing();  });  

This is the method where I consume data from kafka and process it. If I am able to succesfully process the data then I will commit the offset.

public async Task StartProcessing()  {      foreach (var message in _kafkaListener.Read())      {          bool success = await ProcessData(message);          if (success)          {              try              {                  await _kafkaListener.CommitOffset();              }              catch (Exception ex)              {                  // log error              }          }      }  }  

And this is my Read method. How does this while (true) logic working in Read method.

private List<ConsumeResult<Ignore, byte[]>> _messageHolder;    public IEnumerable<Message<Ignore, byte[]>> Read()  {      try      {          while (true)          {              var consumeResult = _consumer.Consume();              _messageHolder.Add(consumeResult);              yield return consumeResult.Message;          }      }      finally      {          _consumer.Close();      }  }  

And here is how we commit offset and also clear the queue:

    private async Task CommitOffset()      {          if (_messageHolder.Count == 0) return;          var offsetsPickedForCommit = _messageHolder.GroupBy(m => m.Partition)              .Select(p => new TopicPartitionOffset(_topic, p.Key, new Offset(p.Max(m => m.Offset.Value))));            try          {              _consumer.Commit(offsetsPickedForCommit);              // will this cause problem where I haven't process message yet but still cleared it from queue.              _messageHolder.Clear();          }          catch (KafkaException e)          {              // log error          }      }  

Problem Statement

Now my confusion is - In StartProcessing method I try to get data by calling Read method and in Read method I have while(true) loop which will keep adding data in _messageHolder List. And then when we commit the offset it will clear that List. Is there a race condition here where I can clear message from the list which I haven't processed yet?

https://stackoverflow.com/questions/66756296/potential-race-condition-while-processing-data-from-kafka-and-clearing-the-queue March 23, 2021 at 10:08AM

没有评论:

发表评论