From 8d519b406a3989d8a9a78ec4345a265fff6f5812 Mon Sep 17 00:00:00 2001 From: tibrewalpratik Date: Thu, 5 Dec 2024 14:48:09 +0530 Subject: [PATCH] address comments --- .../pinot/core/common/MinionConstants.java | 2 +- .../framework/SegmentProcessorConfig.java | 2 +- .../UpsertCompactMergeTaskExecutor.java | 15 ++++- .../UpsertCompactMergeTaskGenerator.java | 57 ++++++++++--------- .../UpsertCompactMergeTaskGeneratorTest.java | 39 +++++++++++++ 5 files changed, 84 insertions(+), 31 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 2daaabd92429..08b0eca90907 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -244,7 +244,7 @@ public static class UpsertCompactMergeTask { * {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter * so not adding _ as a suffix here. */ - public static final String MERGED_SEGMENT_NAME_PREFIX = "compactmerged"; + public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted"; /** * maximum number of records to process in a single task, sum of all docs in to-be-merged segments diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java index 367534a8da7f..3b22ce84a1d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java @@ -139,7 +139,7 @@ public SegmentNameGenerator getSegmentNameGenerator() { } public long getCustomCreationTime() { - return (_customCreationTime != null ? _customCreationTime : System.currentTimeMillis()); + return _customCreationTime != null ? _customCreationTime : System.currentTimeMillis(); } @Override diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java index dacd26218a44..956346eca256 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java @@ -46,6 +46,11 @@ import org.slf4j.LoggerFactory; +/** + * Minion task that compacts and merges multiple segments of an upsert table and uploads it back as one single + * segment. This helps in keeping the segment count in check and also prevents a lot of small segments created over + * time. + */ public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class); @@ -59,7 +64,6 @@ protected List convert(PinotTaskConfig pinotTaskConfig, File workingDir) throws Exception { int numInputSegments = segmentDirs.size(); - List results = new ArrayList<>(); _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments); String taskType = pinotTaskConfig.getTaskType(); Map configs = pinotTaskConfig.getConfigs(); @@ -77,7 +81,6 @@ protected List convert(PinotTaskConfig pinotTaskConfig, segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p)); List recordReaders = new ArrayList<>(numInputSegments); - int count = 1; int partitionId = -1; long maxCreationTimeOfMergingSegments = 0; List originalSegmentCrcFromTaskGenerator = @@ -85,7 +88,7 @@ protected List convert(PinotTaskConfig pinotTaskConfig, for (int i = 0; i < numInputSegments; i++) { File segmentDir = segmentDirs.get(i); _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments)); + String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, (i + 1), numInputSegments)); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir); String segmentName = segmentMetadata.getName(); @@ -129,6 +132,11 @@ protected List convert(PinotTaskConfig pinotTaskConfig, System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null)); if (maxCreationTimeOfMergingSegments != 0) { segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments); + } else { + String message = "No valid creation time found for the new merged segment. This might be due to " + + "missing creation time for merging segments"; + LOGGER.error(message); + throw new IllegalStateException(message); } SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build(); List outputSegmentDirs; @@ -144,6 +152,7 @@ protected List convert(PinotTaskConfig pinotTaskConfig, long endMillis = System.currentTimeMillis(); LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); + List results = new ArrayList<>(); for (File outputSegmentDir : outputSegmentDirs) { String outputSegmentName = outputSegmentDir.getName(); results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index 380c75285f08..6bdbf65aa81c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -55,13 +56,13 @@ public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class); - private static final String DEFAULT_BUFFER_PERIOD = "7d"; + private static final String DEFAULT_BUFFER_PERIOD = "2d"; private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500; public static class SegmentMergerMetadata { - SegmentZKMetadata _segmentZKMetadata; - long _validDocIds; - long _invalidDocIds; + private final SegmentZKMetadata _segmentZKMetadata; + private final long _validDocIds; + private final long _invalidDocIds; SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds) { _segmentZKMetadata = segmentZKMetadata; @@ -84,18 +85,18 @@ public long getInvalidDocIds() { public static class SegmentSelectionResult { - private final Map>> _segmentsForCompactMerge; + private final Map>> _segmentsForCompactMergeByPartition; private final List _segmentsForDeletion; - SegmentSelectionResult(Map>> segmentsForCompactMerge, + SegmentSelectionResult(Map>> segmentsForCompactMergeByPartition, List segmentsForDeletion) { - _segmentsForCompactMerge = segmentsForCompactMerge; + _segmentsForCompactMergeByPartition = segmentsForCompactMergeByPartition; _segmentsForDeletion = segmentsForDeletion; } - public Map>> getSegmentsForCompactMerge() { - return _segmentsForCompactMerge; + public Map>> getSegmentsForCompactMergeByPartition() { + return _segmentsForCompactMergeByPartition; } public List getSegmentsForDeletion() { @@ -188,7 +189,7 @@ public List generateTasks(List tableConfigs) { int maxTasks = Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, String.valueOf(MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS))); for (Map.Entry>> entry - : segmentSelectionResult.getSegmentsForCompactMerge().entrySet()) { + : segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) { if (numTasks == maxTasks) { break; } @@ -198,7 +199,7 @@ public List generateTasks(List tableConfigs) { continue; } // there are no groups with more than 1 segment to merge - // this can be later removed if we want to just do single-segment compaction from this task + // TODO this can be later removed if we want to just do single-segment compaction from this task if (groups.get(0).size() <= 1) { continue; } @@ -206,12 +207,9 @@ public List generateTasks(List tableConfigs) { Map configs = new HashMap<>(getBaseTaskConfigs(tableConfig, groups.get(0).stream() .map(x -> x.getSegmentZKMetadata().getSegmentName()).collect(Collectors.toList()))); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, - StringUtils.join(groups.get(0).stream().map(x -> x.getSegmentZKMetadata().getDownloadUrl()) - .collect(Collectors.toList()), ",")); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, getDownloadUrl(groups.get(0))); configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, StringUtils.join(groups.get(0).stream() - .map(x -> String.valueOf(x.getSegmentZKMetadata().getCrc())).collect(Collectors.toList()), ",")); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, getSegmentCrcList(groups.get(0))); pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, configs)); numTasks++; } @@ -269,15 +267,9 @@ public static SegmentSelectionResult processValidDocIdsMetadata( } } - segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) -> segmentList.sort((o1, o2) -> { - // Sort primarily by creationTime in ascending order - if (o1.getSegmentZKMetadata().getCreationTime() < o2.getSegmentZKMetadata().getCreationTime()) { - return -1; - } else if (o1.getSegmentZKMetadata().getCreationTime() == o2.getSegmentZKMetadata().getCreationTime()) { - return 0; - } - return 1; - })); + segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) -> + segmentList.sort(Comparator.comparingLong(o -> o.getSegmentZKMetadata().getCreationTime())) + ); // Map to store the result: each key (partition) will have a list of groups Map>> groupedSegments = new HashMap<>(); @@ -287,6 +279,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata( int partitionID = entry.getKey(); List segments = entry.getValue(); // task config thresholds + // TODO add output segment size as one of the thresholds long validDocsThreshold = Long.parseLong(taskConfigs.getOrDefault(MinionConstants .UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT))); @@ -381,7 +374,7 @@ public static List getCandidateSegments(Map t } @VisibleForTesting - public static List getAlreadyMergedSegments(List allSegments) { + protected static List getAlreadyMergedSegments(List allSegments) { Set alreadyMergedSegments = new HashSet<>(); for (SegmentZKMetadata segment : allSegments) { // check if the segment has custom map having list of segments which merged to form this. we will later @@ -416,4 +409,16 @@ public void validateTaskConfigs(TableConfig tableConfig, Map tas Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format( "'enableSnapshot' from UpsertConfig must be enabled for %s", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); } + + @VisibleForTesting + protected String getDownloadUrl(List segmentMergerMetadataList) { + return StringUtils.join(segmentMergerMetadataList.stream().map(x -> x.getSegmentZKMetadata().getDownloadUrl()) + .collect(Collectors.toList()), ","); + } + + @VisibleForTesting + protected String getSegmentCrcList(List segmentMergerMetadataList) { + return StringUtils.join(segmentMergerMetadataList.stream() + .map(x -> String.valueOf(x.getSegmentZKMetadata().getCrc())).collect(Collectors.toList()), ","); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index 93409657ee1c..c9c17bccbe32 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -211,4 +211,43 @@ public void testGetCandidateSegments() { List.of(incompleteSegment), System.currentTimeMillis()); Assert.assertEquals(candidateSegments.size(), 0); } + + @Test + public void testGetDownloadUrl() { + // empty list + List segmentMergerMetadataList = Arrays.asList(); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), ""); + + // single segment + segmentMergerMetadataList = + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0"); + + // multiple segments + segmentMergerMetadataList = Arrays.asList( + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + ); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), + "fs://testTable__0,fs://testTable__1"); + } + + @Test + public void testGetSegmentCrcList() { + // empty list + List segmentMergerMetadataList = Arrays.asList(); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), ""); + + // single segment + segmentMergerMetadataList = + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000"); + + // multiple segments + segmentMergerMetadataList = Arrays.asList( + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + ); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000,2000"); + } }