-
Notifications
You must be signed in to change notification settings - Fork 855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
searching messages in specific partition #678
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this contribution and sorry for the long delay in responding.
The change looks good, except for some small formatting and spelling topics. See my comments.
Integer maximumCount, | ||
Date startTimestamp, | ||
Deserializers deserializers) { | ||
initializeClient(); | ||
final List<TopicPartition> partitions = determinePartitionsForTopic(topic); | ||
List<TopicPartition> partitions = determinePartitionsForTopic(topic); | ||
if(partition != -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(partition != -1) { | |
if (partition != -1) { |
List<TopicPartition> partitions = determinePartitionsForTopic(topic); | ||
if(partition != -1) { | ||
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny(); | ||
if(partitionOpt.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(partitionOpt.isEmpty()) { | |
if (partitionOpt.isEmpty()) { |
if(partition != -1) { | ||
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny(); | ||
if(partitionOpt.isEmpty()) { | ||
throw new IllegalArgumentException("Partition does not exists in the topic"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("Partition does not exists in the topic"); | |
throw new IllegalArgumentException("Partition does not exist in topic"); |
Integer maxmuimCount, | ||
Date startTimestamp, | ||
Deserializers deserializers) { | ||
final var records = highLevelConsumer.searchRecords(topic, searchString, maxmuimCount, startTimestamp, | ||
final var records = highLevelConsumer.searchRecords(topic, searchString, partition, maxmuimCount, startTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are making a change here anyway, do you mind fixing the spelling error in maximumCount
(now spelled as maxmuimCount
)?
Hi, thank you for the review. All issues should be fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot!
Hi team,
in my usecase I usually know in which partition I need to search data. This will help me to reduce message consumption from partition that surely doesn't contain messages I want to search.
Option to search across all parition is preserved.