> [with the default enable.auto.commit=true] Kafka consumers may automatically mark offsets as committed, regardless of whether they have actually been processed by the application. This means that a consumer can poll a series of records, mark them as committed, then crash—effectively causing those records to be lost
That's never been my understanding of auto-commit, that would be a crazy default wouldn't it?
The docs say this:
> when auto-commit is enabled, every time the poll method is called and data is fetched, the consumer is ready to automatically commit the offsets of messages that have been returned by the poll. If the processing of these messages is not completed before the next auto-commit interval, there’s a risk of losing the message’s progress if the consumer crashes or is otherwise restarted. In this case, when the consumer restarts, it will begin consuming from the last committed offset. When this happens, the last committed position can be as old as the auto-commit interval. Any messages that have arrived since the last commit are read again. If you want to reduce the window for duplicates, you can reduce the auto-commit interval
I don't find it amazingly clear, but overall my understanding from this is that offsets are committed _only_ if the processing finishes. Tuning the auto-commit interval helps with duplicate processing, not with lost messages, as you'd expect for at-least-once processing.
It is a little surprising, and I agree, the docs here are not doing a particularly good job of explaining it. It might help to ask: if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you? It doesn't! It assumes any message it hands you is instantaneously processed.
Auto-commit is a bit like handing someone an ice cream cone, then immediately walking away and assuming they ate it. Sometimes people drop their ice cream immediately after you hand it to them, and never get a bite.
Information on the internet about this seems unreliable, confusing and contradictory... It's crazy for something so critical, especially when it's enabled by default.
Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer.
E.g. the following commits every 10s - on each call to `poll`, it doesn't automagically commit every 5 s.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "5000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(10_000);
}
Just a note: I am not claiming it is working correctly, only saying there is a clear and documented way how the client knows when to commit, and that it works as expected in a simple scenario.
> if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you?
I did expect that auto-commit still involved an explicit commit. I expected that it meant that the consumer side would commit _after_ processing a message/batch _if_ it had been >= autocommit_interval since the last commit. In other words, that it was a functionality baked into the Kafka client library (which does know when a message has been processed by the application). I don't know if it really makes sense, I never really thought hard about it before!
I'm still a bit skeptical... I'm pretty sure (although not positive) that I've seen consumers with autocommit being stuck because of timeouts that were much greater than the autocommit interval, and yet retrying the same message in a loop
Auto commit has always seemed super shady. Manual commit I have assumed is safe though - something something vector clocks - and it’d be really interesting to know if that trust is misplaced.
What is the process and cost for having you do a Jepsen test for something like that?
It is a bit of splitting hairs in some sense, but the key concept here is just because the message was delivered to the Kafka client successfully, does not mean it was processed by the application.
You will have to explicitly ack if you want that guarantee. For a concrete example, lets say all you do with a message is write it to a database. As soon as that message is in your client handler callback, that message is ack'ed. But you probably only want that ack to happen after a successful insert into the DB. The most likely scenario here to cause unprocessed messages is that the DB is down for whatever reason (maybe a network link is down, or k8s or even a firewall config now prevents you from accessing), and at some point during this your client goes down, maybe by an eng attempting a restart to see if the problem goes away.
It is my understanding that the reason why this is is high performance situations. You have some other system that can figure out if something fail, but with this feature you can move the high water mark so that you don't have to redo as much. But if you got the timing right and there is a failure you can go ahead and assume that when you restart again you'll be getting some stuff that you already processed. The problem is when you don't have this for mailing before the auto commit. It is meant to be done far after processing in my reading of it, but it does certainly seem like there's a contradiction that it should auto commit but only stuff so many milliseconds before the auto commit time?
I can maybe give some justification for why this feature exists. It's designed for synchronous, single-threaded consumers which do something like this:
loop {
1. Call poll
2. Durably process the messages
}
I think a point of confusion here is that the auto-commit check happens on the next call to poll—not asynchronously after the timeout. So you should only be able to drop writes if you are storing the messages without durably processing them (which includes any kind of async/defer/queues/etc.) before calling poll again.
(I should say—this is the documented behavior for the Java client library[0]—it's possible that it's not actually the behavior that's implemented today.)
The Kafka protocol is torn between being high-level and low-level, and as a result it does neither particularly well. Auto commit is a high-level feature that aims to make it easier to build simple applications without needing to really understand all of the moving pieces, but obviously can fail if you don't use it as expected.
I'd argue that today end users shouldn't be using the Kafka client directly—use a proper high level implementation that will get the details right for you (for data use cases this is probably a stream processing engine, for application use cases it's something like a duration execution engine).
> [with the default enable.auto.commit=true] Kafka consumers may automatically mark offsets as committed, regardless of whether they have actually been processed by the application. This means that a consumer can poll a series of records, mark them as committed, then crash—effectively causing those records to be lost
That's never been my understanding of auto-commit, that would be a crazy default wouldn't it?
The docs say this:
> when auto-commit is enabled, every time the poll method is called and data is fetched, the consumer is ready to automatically commit the offsets of messages that have been returned by the poll. If the processing of these messages is not completed before the next auto-commit interval, there’s a risk of losing the message’s progress if the consumer crashes or is otherwise restarted. In this case, when the consumer restarts, it will begin consuming from the last committed offset. When this happens, the last committed position can be as old as the auto-commit interval. Any messages that have arrived since the last commit are read again. If you want to reduce the window for duplicates, you can reduce the auto-commit interval
I don't find it amazingly clear, but overall my understanding from this is that offsets are committed _only_ if the processing finishes. Tuning the auto-commit interval helps with duplicate processing, not with lost messages, as you'd expect for at-least-once processing.