diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java index 956346eca256..174ea5c5a57d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java @@ -22,7 +22,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; import org.apache.pinot.common.utils.SegmentUtils; @@ -80,40 +84,37 @@ protected List convert(PinotTaskConfig pinotTaskConfig, // Progress observer segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p)); - List recordReaders = new ArrayList<>(numInputSegments); - int partitionId = -1; - long maxCreationTimeOfMergingSegments = 0; + // get list of segment metadata + List segmentMetadataList = segmentDirs.stream().map(x -> { + try { + return new SegmentMetadataImpl(x); + } catch (Exception e) { + throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", x), e); + } + }).collect(Collectors.toList()); + + // validate if partitionID is same for all small segments. Get partition id value for new segment. + int partitionID = getCommonPartitionIDForSegments(segmentMetadataList); + + // get the max creation time of the small segments. This will be the index creation time for the new segment. + Optional maxCreationTimeOfMergingSegments = segmentMetadataList.stream().map( + SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max); + if (maxCreationTimeOfMergingSegments.isEmpty()) { + String message = "No valid creation time found for the new merged segment. This might be due to " + + "missing creation time for merging segments"; + LOGGER.error(message); + throw new RuntimeException(message); + } + + // validate if crc of deepstore copies is same as that in ZK of segments List originalSegmentCrcFromTaskGenerator = List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(",")); - for (int i = 0; i < numInputSegments; i++) { - File segmentDir = segmentDirs.get(i); - _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, (i + 1), numInputSegments)); - - SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir); - String segmentName = segmentMetadata.getName(); - Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName); - if (segmentPartitionId == null) { - throw new IllegalStateException(String.format("Partition id not found for %s", segmentName)); - } - if (partitionId != -1 && partitionId != segmentPartitionId) { - throw new IllegalStateException(String.format("Partition id mismatched for %s, expected partition id: %d", - segmentName, partitionId)); - } - partitionId = segmentPartitionId; - maxCreationTimeOfMergingSegments = Math.max(maxCreationTimeOfMergingSegments, - segmentMetadata.getIndexCreationTime()); + validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator); - String crcFromDeepStorageSegment = segmentMetadata.getCrc(); - if (!originalSegmentCrcFromTaskGenerator.get(i).equals(crcFromDeepStorageSegment)) { - String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc " - + "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator.get(i), - crcFromDeepStorageSegment); - LOGGER.error(message); - throw new IllegalStateException(message); - } - RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, - ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, crcFromDeepStorageSegment); + // Fetch validDocID snapshot from server and get record-reader for compacted reader. + List recordReaders = segmentMetadataList.stream().map(x -> { + RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(), + ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc()); if (validDocIds == null) { // no valid crc match found or no validDocIds obtained from all servers // error out the task instead of silently failing so that we can track it via task-error metrics @@ -123,21 +124,14 @@ protected List convert(PinotTaskConfig pinotTaskConfig, LOGGER.error(message); throw new IllegalStateException(message); } + return new CompactedPinotSegmentRecordReader(x.getIndexDir(), validDocIds); + }).collect(Collectors.toList()); - recordReaders.add(new CompactedPinotSegmentRecordReader(segmentDir, validDocIds)); - } - + // create new UploadedRealtimeSegment + segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get()); segmentProcessorConfigBuilder.setSegmentNameGenerator( - new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionId, + new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionID, System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null)); - if (maxCreationTimeOfMergingSegments != 0) { - segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments); - } else { - String message = "No valid creation time found for the new merged segment. This might be due to " - + "missing creation time for merging segments"; - LOGGER.error(message); - throw new IllegalStateException(message); - } SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build(); List outputSegmentDirs; try { @@ -172,4 +166,35 @@ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifi pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY)); return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap); } + + private int getCommonPartitionIDForSegments(List segmentMetadataList) { + List segmentNames = segmentMetadataList.stream().map(SegmentMetadataImpl::getName) + .collect(Collectors.toList()); + Set partitionIDSet = segmentNames.stream().map(x -> { + Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(x); + if (segmentPartitionId == null) { + throw new IllegalStateException(String.format("Partition id not found for %s", x)); + } + return segmentPartitionId; + }).collect(Collectors.toSet()); + if (partitionIDSet.size() > 1) { + throw new IllegalStateException("Found segments with different partition ids during task execution: " + + partitionIDSet); + } + return partitionIDSet.iterator().next(); + } + + private void validateCRCForInputSegments(List segmentMetadataList, + List expectedCRCList) { + for (int i = 0; i < segmentMetadataList.size(); i++) { + SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i); + if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) { + String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc " + + "from ZK: %s, crc from deepstore: %s", segmentMetadata.getName(), expectedCRCList.get(i), + segmentMetadata.getCrc()); + LOGGER.error(message); + throw new IllegalStateException(message); + } + } + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index 6bdbf65aa81c..89c730b68575 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -172,7 +172,7 @@ public List generateTasks(List tableConfigs) { Map candidateSegmentsMap = candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); - List alreadyMergedSegments = getAlreadyMergedSegments(allSegments); + Set alreadyMergedSegments = getAlreadyMergedSegments(allSegments); SegmentSelectionResult segmentSelectionResult = processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap, validDocIdsMetadataList, alreadyMergedSegments); @@ -223,7 +223,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata( Map taskConfigs, Map candidateSegmentsMap, Map> validDocIdsMetadataInfoMap, - List alreadyMergedSegments) { + Set alreadyMergedSegments) { Map> segmentsEligibleForCompactMerge = new HashMap<>(); List segmentsForDeletion = new ArrayList<>(); for (String segmentName : validDocIdsMetadataInfoMap.keySet()) { @@ -374,7 +374,7 @@ public static List getCandidateSegments(Map t } @VisibleForTesting - protected static List getAlreadyMergedSegments(List allSegments) { + protected static Set getAlreadyMergedSegments(List allSegments) { Set alreadyMergedSegments = new HashSet<>(); for (SegmentZKMetadata segment : allSegments) { // check if the segment has custom map having list of segments which merged to form this. we will later @@ -387,7 +387,7 @@ protected static List getAlreadyMergedSegments(List a + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX), ","))); } } - return new ArrayList<>(alreadyMergedSegments); + return alreadyMergedSegments; } @Override diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index c9c17bccbe32..5556ac53cd20 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.core.common.MinionConstants; @@ -147,7 +148,7 @@ public void testGetAlreadyMergedSegments() { // merged segment present List allSegments = Arrays.asList(_completedSegment, _completedSegment2, mergedSegment); - List alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments); + Set alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments); Assert.assertEquals(alreadyMergedSegments.size(), 2); Assert.assertTrue(alreadyMergedSegments.contains("testTable__0")); Assert.assertTrue(alreadyMergedSegments.contains("testTable__1"));