Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upsert small segment merger task in minions #14477

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZ
}

@Nullable
private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
public static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
// A fast path to get partition id if the segmentName is in a known format like LLC.
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private MinionConstants() {
*/
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;

/**
* Job configs
Expand Down Expand Up @@ -223,4 +224,59 @@ public static class UpsertCompactionTask {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";
}

public static class UpsertCompactMergeTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broad question: say we run both merge compaction and the segment refresh task for a table. How do we ensure that there aren't any race conditions, because it could be that the two tasks end up processing the same set of segments concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with merge-compaction we would end up creating a new segment and refresh would just go and do a replaceSegment flow in metadata-manager. For refresh of LLC segment we should be good as the newly generated UploadedRealtimeSegment would override the keys which it should.
For refresh of UploadedRealtimeSegment, there's an issue right now where we use creation time to resolve for the latest uploaded segment, now there can be an edge case where creation time is same for both then whichever gets refreshed / uploaded later would dominate. But overall if you see it doesn't matter in the broad sense because for upserts we would still point to just one record per key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness wise we should be fine I guess, but this will still impede or impact the Segment Refresh task. Can you create a separate issue for this with some context on the problem so we could pick it up later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final String TASK_TYPE = "UpsertCompactMergeTask";

/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
*/
public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";

/**
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* prefix for the new segment name that is created,
* {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter
* so not adding _ as a suffix here.
*/
public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted";

/**
* maximum number of records to process in a single task, sum of all docs in to-be-merged segments
*/
public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";

/**
* default maximum number of records to process in a single task, same as the value in {@link MergeRollupTask}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;

/**
* maximum number of records in the output segment
*/
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";

/**
* default maximum number of records in output segment, same as the value in
* {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;

/**
* maximum number of segments to process in a single task
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask";

/**
* default maximum number of segments to process in a single task
*/
public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;

public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
Expand All @@ -47,12 +49,15 @@ public class SegmentProcessorConfig {
private final Map<String, Map<String, String>> _aggregationFunctionParameters;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
private final SegmentNameGenerator _segmentNameGenerator;
private final Long _customCreationTime;

private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes,
Map<String, Map<String, String>> aggregationFunctionParameters, SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
Consumer<Object> progressObserver, @Nullable SegmentNameGenerator segmentNameGenerator,
@Nullable Long customCreationTime) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
_schema = schema;
Expand All @@ -65,6 +70,8 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
_segmentNameGenerator = segmentNameGenerator;
_customCreationTime = customCreationTime;
}

/**
Expand Down Expand Up @@ -127,11 +134,20 @@ public Consumer<Object> getProgressObserver() {
return _progressObserver;
}

public SegmentNameGenerator getSegmentNameGenerator() {
return _segmentNameGenerator;
}

public long getCustomCreationTime() {
return _customCreationTime != null ? _customCreationTime : System.currentTimeMillis();
}

@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig="
+ _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _mergeType=" + _mergeType
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}';
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig
+ ", _segmentNameGenerator=" + _segmentNameGenerator + ", _customCreationTime=" + _customCreationTime + '}';
}

/**
Expand All @@ -147,6 +163,8 @@ public static class Builder {
private Map<String, Map<String, String>> _aggregationFunctionParameters;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
private SegmentNameGenerator _segmentNameGenerator;
private Long _customCreationTime;

public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -193,6 +211,16 @@ public Builder setProgressObserver(Consumer<Object> progressObserver) {
return this;
}

public Builder setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
_segmentNameGenerator = segmentNameGenerator;
return this;
}

public Builder setCustomCreationTime(Long customCreationTime) {
_customCreationTime = customCreationTime;
return this;
}

public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig");
Expand All @@ -216,7 +244,8 @@ public SegmentProcessorConfig build() {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType,
_aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver);
_aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver,
_segmentNameGenerator, _customCreationTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,11 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
} else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
segmentNamePostfix, fixedSegmentName, false));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Minion task that compacts and merges multiple segments of an upsert table and uploads it back as one single
* segment. This helps in keeping the segment count in check and also prevents a lot of small segments created over
* time.
*/
public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);

public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
super(minionConf);
}

@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
int numInputSegments = segmentDirs.size();
_eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments);
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();

String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);

SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

// get list of segment metadata
List<SegmentMetadataImpl> segmentMetadataList = segmentDirs.stream().map(x -> {
try {
return new SegmentMetadataImpl(x);
} catch (Exception e) {
throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", x), e);
}
}).collect(Collectors.toList());

// validate if partitionID is same for all small segments. Get partition id value for new segment.
int partitionID = getCommonPartitionIDForSegments(segmentMetadataList);

// get the max creation time of the small segments. This will be the index creation time for the new segment.
Optional<Long> maxCreationTimeOfMergingSegments = segmentMetadataList.stream().map(
SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max);
if (maxCreationTimeOfMergingSegments.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: numInputSegments is always greater than 0 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we have added the check in generator code to ensure numInputSegments > 1

String message = "No valid creation time found for the new merged segment. This might be due to "
+ "missing creation time for merging segments";
LOGGER.error(message);
throw new RuntimeException(message);
}

// validate if crc of deepstore copies is same as that in ZK of segments
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator);

// Fetch validDocID snapshot from server and get record-reader for compacted reader.
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(),
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
"");
LOGGER.error(message);
throw new IllegalStateException(message);
}
return new CompactedPinotSegmentRecordReader(x.getIndexDir(), validDocIds);
}).collect(Collectors.toList());

// create new UploadedRealtimeSegment
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get());
segmentProcessorConfigBuilder.setSegmentNameGenerator(
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionID,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
try {
_eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
} finally {
for (RecordReader recordReader : recordReaders) {
recordReader.close();
}
}

long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));

List<SegmentConversionResult> results = new ArrayList<>();
for (File outputSegmentDir : outputSegmentDirs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be sure, could one task generate multiple new segments or just one? If multiple segments could be created, then how to handle cases when some segments failed to get uploaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation ensures generating one segment. But if you think about it even if we generate multiple segments, we don't need to think about rollback much. It's just that whatever segments got uploaded, validDocIds will be uploaded for those and the corresponding segments would be discarded as their docs will be invalid (assuming takeSnapshot has run once).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

For M:N merging, I think the computation of alreadyMergedSegments might need some extensions, as you may want to retry some of the input segments, instead of skipping them if they are in some segment's custom map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For M:N merging, I think the computation of alreadyMergedSegments might need some extensions, as you may want to retry some of the input segments, instead of skipping them if they are in some segment's custom map.

Yeah or we ensure always selecting complete 'x' segments while creating N segments such that sum over x = M. For the sake of simplicity, we are resolving this in task-generation itself currently ensuring to generate multiple X:1 subtasks instead.

String outputSegmentName = outputSegmentDir.getName();
results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Map<String, String> updateMap = new TreeMap<>();
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis()));
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}

int getCommonPartitionIDForSegments(List<SegmentMetadataImpl> segmentMetadataList) {
List<String> segmentNames = segmentMetadataList.stream().map(SegmentMetadataImpl::getName)
.collect(Collectors.toList());
Set<Integer> partitionIDSet = segmentNames.stream().map(x -> {
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(x);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", x));
}
return segmentPartitionId;
}).collect(Collectors.toSet());
if (partitionIDSet.size() > 1) {
throw new IllegalStateException("Found segments with different partition ids during task execution: "
+ partitionIDSet);
}
return partitionIDSet.iterator().next();
}

void validateCRCForInputSegments(List<SegmentMetadataImpl> segmentMetadataList, List<String> expectedCRCList) {
for (int i = 0; i < segmentMetadataList.size(); i++) {
SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentMetadata.getName(), expectedCRCList.get(i),
segmentMetadata.getCrc());
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
}
}
Loading
Loading