Skip to content

Commit

Permalink
Allow searching messages in a specific partition (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
krejcp19 authored Nov 26, 2024
1 parent 10d4537 commit d6d43a7
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public String searchMessageForm(@PathVariable("name") String topicName,
defaultForm.setSearchText("");
defaultForm.setFormat(defaultFormat);
defaultForm.setKeyFormat(defaultKeyFormat);
defaultForm.setPartition(-1);
defaultForm.setMaximumCount(100);
defaultForm.setStartTimestamp(new Date(0));
model.addAttribute("searchMessageForm", defaultForm);
Expand All @@ -231,6 +232,7 @@ public String searchMessageForm(@PathVariable("name") String topicName,
model.addAttribute("messageFormats", MessageFormat.values());
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("partitions", topic.getPartitions().stream().map(TopicPartitionVO::getId).toList());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

if (!searchMessageForm.isEmpty() && !errors.hasErrors()) {
Expand All @@ -247,6 +249,7 @@ public String searchMessageForm(@PathVariable("name") String topicName,
var searchResults = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/kafdrop/form/SearchMessageForm.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class SearchMessageForm {
@Max(1000)
private Integer maximumCount;

private Integer partition;

private MessageFormat format;

private MessageFormat keyFormat;
Expand Down Expand Up @@ -103,4 +105,12 @@ public String getMsgTypeName() {
public void setMsgTypeName(String msgTypeName) {
this.msgTypeName = msgTypeName;
}

public Integer getPartition() {
return partition;
}

public void setPartition(Integer partition) {
this.partition = partition;
}
}
10 changes: 9 additions & 1 deletion src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,19 @@ synchronized List<ConsumerRecord<String, String>> getLatestRecords(String topic,
*/
synchronized SearchResults searchRecords(String topic,
String searchString,
Integer partition,
Integer maximumCount,
Date startTimestamp,
Deserializers deserializers) {
initializeClient();
final List<TopicPartition> partitions = determinePartitionsForTopic(topic);
List<TopicPartition> partitions = determinePartitionsForTopic(topic);
if (partition != -1) {
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny();
if (partitionOpt.isEmpty()) {
throw new IllegalArgumentException("Partition does not exist in topic");
}
partitions = List.of(partitionOpt.get());
}
kafkaConsumer.assign(partitions);
seekToTimestamp(partitions, startTimestamp);

Expand Down
1 change: 1 addition & 0 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int coun

SearchResultsVO searchMessages(String topic,
String searchString,
Integer partition,
Integer maximumCount,
Date startTimestamp,
Deserializers deserializers);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,11 @@ public List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos) {
@Override
public SearchResultsVO searchMessages(String topic,
String searchString,
Integer maxmuimCount,
Integer partition,
Integer maximumCount,
Date startTimestamp,
Deserializers deserializers) {
final var records = highLevelConsumer.searchRecords(topic, searchString, maxmuimCount, startTimestamp,
final var records = highLevelConsumer.searchRecords(topic, searchString, partition, maximumCount, startTimestamp,
deserializers);
final var results = new SearchResultsVO();

Expand Down
14 changes: 12 additions & 2 deletions src/main/resources/templates/search-message.ftlh
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,23 @@
<form method="GET" action="<@spring.url '/topic/${topic.name}/search-messages'/>" id="searchMessageForm" class="card-body">
<div class="row g-3">
<@spring.bind path="searchMessageForm.searchText"/>
<div class="col-7 form-group ${spring.status.error?string("has-error", "")}">
<div class="col-5 form-group ${spring.status.error?string("has-error", "")}">
<label class="control-label" for="searchText">Search text</label>
<@spring.formInput path="searchMessageForm.searchText" attributes='class="form-control" size="40"'/>
<#if spring.status.error>
<span class="text-danger"><i class="fa fa-times-circle"></i><@spring.showErrors "<br/>"/></span>
</#if>
</div>
<@spring.bind path="searchMessageForm.partition"/>
<div class="col-2 form-group ${spring.status.error?string("has-error", "")}">
<label class="control-label" for="partition">Partition</label>
<select class="form-control" id="partition" name="partition">
<option value="-1">All</option>
<#list partitions as p>
<option value="${p}">${p}</option>
</#list>
</select>
</div>
<@spring.bind path="searchMessageForm.startTimestamp"/>
<div class="col form-group ${spring.status.error?string("has-error", "")}">
<label class=control-label" for="maximumCount">Start Timestamp</label>
Expand Down Expand Up @@ -169,4 +179,4 @@
</#if>
</div>

<@template.footer/>
<@template.footer/>

0 comments on commit d6d43a7

Please sign in to comment.