Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto.offset.reset ignored #104

Open
owahab opened this issue Jun 5, 2018 · 5 comments
Open

auto.offset.reset ignored #104

owahab opened this issue Jun 5, 2018 · 5 comments

Comments

@owahab
Copy link

owahab commented Jun 5, 2018

Thanks to your awesome project, my consumer was up and running in minutes.

However, I needed to re-consume the entire queue so did a consumer-group offset reset, then started my consumer but it consumed only the last messages from the queue.

This is my config:

import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
...
    @Override
    public void start() {
        Map<String, String> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        if (!kafkaUsername.isEmpty() && !kafkaPassword.isEmpty()) {
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            config.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
            config.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" " + "password=\"%s\";",
                kafkaUsername,
                kafkaPassword
            ));
        }

        kafkaConsumer = KafkaConsumer.create(vertx, config);
}

I tried disabling the auto commit, tried changing the consumer group but nothing worked.

What am I missing or doing wrong?

@ppatierno
Copy link
Member

I see the answer on SO as well, please don't duplicate the stuff :-)
Have you already tried what happens using the native Kafka client as consumer?

@owahab
Copy link
Author

owahab commented Jun 5, 2018

@ppatierno just tried native consumer and it works as expected.

@ppatierno
Copy link
Member

can you post the configuration of the native consumer please?

@owahab
Copy link
Author

owahab commented Jun 6, 2018

@ppatierno it's the exact same config I used with vertx-kafka-client.

@ppatierno
Copy link
Member

@owahab the auto.offset.reset config parameter defines what to do when there is no initial offset in Kafka or if the current offset does not exist (as stated by the Kafka doc). It means that when you start the consumer for the first time, there is no offset committed and it starts to get messages. If you stop the consumer and then you start it again with "earliest", because the committed offset is already available no messages are re-read.
Of course it's related to the fact that the consumer is in the same consumer group as before. If you start the consumer in a different consumer group it gets the messages from the beginning.
In the case you want to use the same consumer group you have to use the seekAtBeginning method.
I tried it with native Kafka client as well, and the behavior is the same because it's how Kafka works.
How did you do the test with native Kafka client?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants