Skip to content

Commit

Permalink
address comments 2
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Dec 10, 2024
1 parent 8d519b4 commit f82aa93
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
Expand Down Expand Up @@ -80,40 +84,37 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int partitionId = -1;
long maxCreationTimeOfMergingSegments = 0;
// get list of segment metadata
List<SegmentMetadataImpl> segmentMetadataList = segmentDirs.stream().map(x -> {
try {
return new SegmentMetadataImpl(x);
} catch (Exception e) {
throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", x), e);
}
}).collect(Collectors.toList());

// validate if partitionID is same for all small segments. Get partition id value for new segment.
int partitionID = getCommonPartitionIDForSegments(segmentMetadataList);

// get the max creation time of the small segments. This will be the index creation time for the new segment.
Optional<Long> maxCreationTimeOfMergingSegments = segmentMetadataList.stream().map(
SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max);
if (maxCreationTimeOfMergingSegments.isEmpty()) {
String message = "No valid creation time found for the new merged segment. This might be due to "
+ "missing creation time for merging segments";
LOGGER.error(message);
throw new RuntimeException(message);
}

// validate if crc of deepstore copies is same as that in ZK of segments
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
for (int i = 0; i < numInputSegments; i++) {
File segmentDir = segmentDirs.get(i);
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, (i + 1), numInputSegments));

SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir);
String segmentName = segmentMetadata.getName();
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", segmentName));
}
if (partitionId != -1 && partitionId != segmentPartitionId) {
throw new IllegalStateException(String.format("Partition id mismatched for %s, expected partition id: %d",
segmentName, partitionId));
}
partitionId = segmentPartitionId;
maxCreationTimeOfMergingSegments = Math.max(maxCreationTimeOfMergingSegments,
segmentMetadata.getIndexCreationTime());
validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator);

String crcFromDeepStorageSegment = segmentMetadata.getCrc();
if (!originalSegmentCrcFromTaskGenerator.get(i).equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator.get(i),
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName,
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, crcFromDeepStorageSegment);
// Fetch validDocID snapshot from server and get record-reader for compacted reader.
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(),
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
Expand All @@ -123,21 +124,14 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
LOGGER.error(message);
throw new IllegalStateException(message);
}
return new CompactedPinotSegmentRecordReader(x.getIndexDir(), validDocIds);
}).collect(Collectors.toList());

recordReaders.add(new CompactedPinotSegmentRecordReader(segmentDir, validDocIds));
}

// create new UploadedRealtimeSegment
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get());
segmentProcessorConfigBuilder.setSegmentNameGenerator(
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionId,
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionID,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
if (maxCreationTimeOfMergingSegments != 0) {
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments);
} else {
String message = "No valid creation time found for the new merged segment. This might be due to "
+ "missing creation time for merging segments";
LOGGER.error(message);
throw new IllegalStateException(message);
}
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
try {
Expand Down Expand Up @@ -172,4 +166,34 @@ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifi
pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}

int getCommonPartitionIDForSegments(List<SegmentMetadataImpl> segmentMetadataList) {
List<String> segmentNames = segmentMetadataList.stream().map(SegmentMetadataImpl::getName)
.collect(Collectors.toList());
Set<Integer> partitionIDSet = segmentNames.stream().map(x -> {
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(x);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", x));
}
return segmentPartitionId;
}).collect(Collectors.toSet());
if (partitionIDSet.size() > 1) {
throw new IllegalStateException("Found segments with different partition ids during task execution: "
+ partitionIDSet);
}
return partitionIDSet.iterator().next();
}

void validateCRCForInputSegments(List<SegmentMetadataImpl> segmentMetadataList, List<String> expectedCRCList) {
for (int i = 0; i < segmentMetadataList.size(); i++) {
SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentMetadata.getName(), expectedCRCList.get(i),
segmentMetadata.getCrc());
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
Map<String, SegmentZKMetadata> candidateSegmentsMap =
candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity()));

List<String> alreadyMergedSegments = getAlreadyMergedSegments(allSegments);
Set<String> alreadyMergedSegments = getAlreadyMergedSegments(allSegments);

SegmentSelectionResult segmentSelectionResult =
processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap, validDocIdsMetadataList, alreadyMergedSegments);
Expand Down Expand Up @@ -223,7 +223,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(
Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> candidateSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
List<String> alreadyMergedSegments) {
Set<String> alreadyMergedSegments) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge = new HashMap<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
Expand Down Expand Up @@ -374,7 +374,7 @@ public static List<SegmentZKMetadata> getCandidateSegments(Map<String, String> t
}

@VisibleForTesting
protected static List<String> getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
protected static Set<String> getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
Set<String> alreadyMergedSegments = new HashSet<>();
for (SegmentZKMetadata segment : allSegments) {
// check if the segment has custom map having list of segments which merged to form this. we will later
Expand All @@ -387,7 +387,7 @@ protected static List<String> getAlreadyMergedSegments(List<SegmentZKMetadata> a
+ MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX), ",")));
}
}
return new ArrayList<>(alreadyMergedSegments);
return alreadyMergedSegments;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.util.Arrays;
import java.util.List;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;



public class UpsertCompactMergeTaskExecutorTest {
private UpsertCompactMergeTaskExecutor _taskExecutor;

@BeforeClass
public void setUp() {
_taskExecutor = new UpsertCompactMergeTaskExecutor(null);
}

@Test
public void testValidateCRCForInputSegments() {
SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);

Mockito.when(segment1.getCrc()).thenReturn("1000");
Mockito.when(segment2.getCrc()).thenReturn("2000");

List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, segment2);
List<String> expectedCRCList = Arrays.asList("1000", "2000");

_taskExecutor.validateCRCForInputSegments(segmentMetadataList, expectedCRCList);
}

@Test(expectedExceptions = IllegalStateException.class)
public void testValidateCRCForInputSegmentsWithMismatchedCRC() {
SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);

Mockito.when(segment1.getCrc()).thenReturn("1000");
Mockito.when(segment2.getCrc()).thenReturn("3000");

List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, segment2);
List<String> expectedCRCList = Arrays.asList("1000", "2000");

_taskExecutor.validateCRCForInputSegments(segmentMetadataList, expectedCRCList);
}

@Test
public void testGetCommonPartitionIDForSegments() {
SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
SegmentMetadataImpl segment3 = Mockito.mock(SegmentMetadataImpl.class);

Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
Mockito.when(segment2.getName()).thenReturn("testTable__0__1__0");
Mockito.when(segment3.getName()).thenReturn("testTable__0__2__0");

List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, segment2, segment3);

int partitionID = _taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
Assert.assertEquals(partitionID, 0);
}

@Test(expectedExceptions = IllegalStateException.class)
public void testGetCommonPartitionIDForSegmentsWithDifferentPartitionIDs() {
SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);

Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
Mockito.when(segment2.getName()).thenReturn("testTable__1__0__0");

List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, segment2);

_taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.core.common.MinionConstants;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void testGetAlreadyMergedSegments() {

// merged segment present
List<SegmentZKMetadata> allSegments = Arrays.asList(_completedSegment, _completedSegment2, mergedSegment);
List<String> alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments);
Set<String> alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments);
Assert.assertEquals(alreadyMergedSegments.size(), 2);
Assert.assertTrue(alreadyMergedSegments.contains("testTable__0"));
Assert.assertTrue(alreadyMergedSegments.contains("testTable__1"));
Expand Down

0 comments on commit f82aa93

Please sign in to comment.