Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Dec 10, 2024
1 parent 7297c34 commit 8d519b4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static class UpsertCompactMergeTask {
* {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter
* so not adding _ as a suffix here.
*/
public static final String MERGED_SEGMENT_NAME_PREFIX = "compactmerged";
public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted";

/**
* maximum number of records to process in a single task, sum of all docs in to-be-merged segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public SegmentNameGenerator getSegmentNameGenerator() {
}

public long getCustomCreationTime() {
return (_customCreationTime != null ? _customCreationTime : System.currentTimeMillis());
return _customCreationTime != null ? _customCreationTime : System.currentTimeMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
import org.slf4j.LoggerFactory;


/**
* Minion task that compacts and merges multiple segments of an upsert table and uploads it back as one single
* segment. This helps in keeping the segment count in check and also prevents a lot of small segments created over
* time.
*/
public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);
Expand All @@ -59,7 +64,6 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
File workingDir)
throws Exception {
int numInputSegments = segmentDirs.size();
List<SegmentConversionResult> results = new ArrayList<>();
_eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments);
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
Expand All @@ -77,15 +81,14 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int count = 1;
int partitionId = -1;
long maxCreationTimeOfMergingSegments = 0;
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
for (int i = 0; i < numInputSegments; i++) {
File segmentDir = segmentDirs.get(i);
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments));
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, (i + 1), numInputSegments));

SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segmentDir);
String segmentName = segmentMetadata.getName();
Expand Down Expand Up @@ -129,6 +132,11 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
if (maxCreationTimeOfMergingSegments != 0) {
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments);
} else {
String message = "No valid creation time found for the new merged segment. This might be due to "
+ "missing creation time for merging segments";
LOGGER.error(message);
throw new IllegalStateException(message);
}
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
Expand All @@ -144,6 +152,7 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));

List<SegmentConversionResult> results = new ArrayList<>();
for (File outputSegmentDir : outputSegmentDirs) {
String outputSegmentName = outputSegmentDir.getName();
results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -55,13 +56,13 @@
public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator {

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class);
private static final String DEFAULT_BUFFER_PERIOD = "7d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;

public static class SegmentMergerMetadata {
SegmentZKMetadata _segmentZKMetadata;
long _validDocIds;
long _invalidDocIds;
private final SegmentZKMetadata _segmentZKMetadata;
private final long _validDocIds;
private final long _invalidDocIds;

SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds) {
_segmentZKMetadata = segmentZKMetadata;
Expand All @@ -84,18 +85,18 @@ public long getInvalidDocIds() {

public static class SegmentSelectionResult {

private final Map<Integer, List<List<SegmentMergerMetadata>>> _segmentsForCompactMerge;
private final Map<Integer, List<List<SegmentMergerMetadata>>> _segmentsForCompactMergeByPartition;

private final List<String> _segmentsForDeletion;

SegmentSelectionResult(Map<Integer, List<List<SegmentMergerMetadata>>> segmentsForCompactMerge,
SegmentSelectionResult(Map<Integer, List<List<SegmentMergerMetadata>>> segmentsForCompactMergeByPartition,
List<String> segmentsForDeletion) {
_segmentsForCompactMerge = segmentsForCompactMerge;
_segmentsForCompactMergeByPartition = segmentsForCompactMergeByPartition;
_segmentsForDeletion = segmentsForDeletion;
}

public Map<Integer, List<List<SegmentMergerMetadata>>> getSegmentsForCompactMerge() {
return _segmentsForCompactMerge;
public Map<Integer, List<List<SegmentMergerMetadata>>> getSegmentsForCompactMergeByPartition() {
return _segmentsForCompactMergeByPartition;
}

public List<String> getSegmentsForDeletion() {
Expand Down Expand Up @@ -188,7 +189,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
int maxTasks = Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
String.valueOf(MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS)));
for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry
: segmentSelectionResult.getSegmentsForCompactMerge().entrySet()) {
: segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) {
if (numTasks == maxTasks) {
break;
}
Expand All @@ -198,20 +199,17 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
continue;
}
// there are no groups with more than 1 segment to merge
// this can be later removed if we want to just do single-segment compaction from this task
// TODO this can be later removed if we want to just do single-segment compaction from this task
if (groups.get(0).size() <= 1) {
continue;
}
// TODO see if multiple groups of same partition can be added
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig,
groups.get(0).stream()
.map(x -> x.getSegmentZKMetadata().getSegmentName()).collect(Collectors.toList())));
configs.put(MinionConstants.DOWNLOAD_URL_KEY,
StringUtils.join(groups.get(0).stream().map(x -> x.getSegmentZKMetadata().getDownloadUrl())
.collect(Collectors.toList()), ","));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, getDownloadUrl(groups.get(0)));
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, StringUtils.join(groups.get(0).stream()
.map(x -> String.valueOf(x.getSegmentZKMetadata().getCrc())).collect(Collectors.toList()), ","));
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, getSegmentCrcList(groups.get(0)));
pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, configs));
numTasks++;
}
Expand Down Expand Up @@ -269,15 +267,9 @@ public static SegmentSelectionResult processValidDocIdsMetadata(
}
}

segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) -> segmentList.sort((o1, o2) -> {
// Sort primarily by creationTime in ascending order
if (o1.getSegmentZKMetadata().getCreationTime() < o2.getSegmentZKMetadata().getCreationTime()) {
return -1;
} else if (o1.getSegmentZKMetadata().getCreationTime() == o2.getSegmentZKMetadata().getCreationTime()) {
return 0;
}
return 1;
}));
segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) ->
segmentList.sort(Comparator.comparingLong(o -> o.getSegmentZKMetadata().getCreationTime()))
);

// Map to store the result: each key (partition) will have a list of groups
Map<Integer, List<List<SegmentMergerMetadata>>> groupedSegments = new HashMap<>();
Expand All @@ -287,6 +279,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(
int partitionID = entry.getKey();
List<SegmentMergerMetadata> segments = entry.getValue();
// task config thresholds
// TODO add output segment size as one of the thresholds
long validDocsThreshold = Long.parseLong(taskConfigs.getOrDefault(MinionConstants
.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
Expand Down Expand Up @@ -381,7 +374,7 @@ public static List<SegmentZKMetadata> getCandidateSegments(Map<String, String> t
}

@VisibleForTesting
public static List<String> getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
protected static List<String> getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
Set<String> alreadyMergedSegments = new HashSet<>();
for (SegmentZKMetadata segment : allSegments) {
// check if the segment has custom map having list of segments which merged to form this. we will later
Expand Down Expand Up @@ -416,4 +409,16 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format(
"'enableSnapshot' from UpsertConfig must be enabled for %s", MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
}

@VisibleForTesting
protected String getDownloadUrl(List<SegmentMergerMetadata> segmentMergerMetadataList) {
return StringUtils.join(segmentMergerMetadataList.stream().map(x -> x.getSegmentZKMetadata().getDownloadUrl())
.collect(Collectors.toList()), ",");
}

@VisibleForTesting
protected String getSegmentCrcList(List<SegmentMergerMetadata> segmentMergerMetadataList) {
return StringUtils.join(segmentMergerMetadataList.stream()
.map(x -> String.valueOf(x.getSegmentZKMetadata().getCrc())).collect(Collectors.toList()), ",");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,43 @@ public void testGetCandidateSegments() {
List.of(incompleteSegment), System.currentTimeMillis());
Assert.assertEquals(candidateSegments.size(), 0);
}

@Test
public void testGetDownloadUrl() {
// empty list
List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata> segmentMergerMetadataList = Arrays.asList();
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "");

// single segment
segmentMergerMetadataList =
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10));
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0");

// multiple segments
segmentMergerMetadataList = Arrays.asList(
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20)
);
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"fs://testTable__0,fs://testTable__1");
}

@Test
public void testGetSegmentCrcList() {
// empty list
List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata> segmentMergerMetadataList = Arrays.asList();
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "");

// single segment
segmentMergerMetadataList =
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10));
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000");

// multiple segments
segmentMergerMetadataList = Arrays.asList(
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20)
);
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000,2000");
}
}

0 comments on commit 8d519b4

Please sign in to comment.