Skip to content

Commit

Permalink
Upsert small segment merger task in minions (#14477)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Dec 12, 2024
1 parent 7965055 commit 626c45d
Show file tree
Hide file tree
Showing 10 changed files with 1,155 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZ
}

@Nullable
private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
public static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) {
// A fast path to get partition id if the segmentName is in a known format like LLC.
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private MinionConstants() {
*/
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;

/**
* Job configs
Expand Down Expand Up @@ -223,4 +224,59 @@ public static class UpsertCompactionTask {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";
}

public static class UpsertCompactMergeTask {
public static final String TASK_TYPE = "UpsertCompactMergeTask";

/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
*/
public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";

/**
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* prefix for the new segment name that is created,
* {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter
* so not adding _ as a suffix here.
*/
public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted";

/**
* maximum number of records to process in a single task, sum of all docs in to-be-merged segments
*/
public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";

/**
* default maximum number of records to process in a single task, same as the value in {@link MergeRollupTask}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;

/**
* maximum number of records in the output segment
*/
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";

/**
* default maximum number of records in output segment, same as the value in
* {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
*/
public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;

/**
* maximum number of segments to process in a single task
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask";

/**
* default maximum number of segments to process in a single task
*/
public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;

public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
Expand All @@ -47,12 +49,15 @@ public class SegmentProcessorConfig {
private final Map<String, Map<String, String>> _aggregationFunctionParameters;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
private final SegmentNameGenerator _segmentNameGenerator;
private final Long _customCreationTime;

private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes,
Map<String, Map<String, String>> aggregationFunctionParameters, SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
Consumer<Object> progressObserver, @Nullable SegmentNameGenerator segmentNameGenerator,
@Nullable Long customCreationTime) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
_schema = schema;
Expand All @@ -65,6 +70,8 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
_segmentNameGenerator = segmentNameGenerator;
_customCreationTime = customCreationTime;
}

/**
Expand Down Expand Up @@ -127,11 +134,20 @@ public Consumer<Object> getProgressObserver() {
return _progressObserver;
}

public SegmentNameGenerator getSegmentNameGenerator() {
return _segmentNameGenerator;
}

public long getCustomCreationTime() {
return _customCreationTime != null ? _customCreationTime : System.currentTimeMillis();
}

@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig="
+ _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _mergeType=" + _mergeType
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}';
+ ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig
+ ", _segmentNameGenerator=" + _segmentNameGenerator + ", _customCreationTime=" + _customCreationTime + '}';
}

/**
Expand All @@ -147,6 +163,8 @@ public static class Builder {
private Map<String, Map<String, String>> _aggregationFunctionParameters;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
private SegmentNameGenerator _segmentNameGenerator;
private Long _customCreationTime;

public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -193,6 +211,16 @@ public Builder setProgressObserver(Consumer<Object> progressObserver) {
return this;
}

public Builder setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
_segmentNameGenerator = segmentNameGenerator;
return this;
}

public Builder setCustomCreationTime(Long customCreationTime) {
_customCreationTime = customCreationTime;
return this;
}

public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig");
Expand All @@ -216,7 +244,8 @@ public SegmentProcessorConfig build() {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType,
_aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver);
_aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver,
_segmentNameGenerator, _customCreationTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,11 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));

if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
} else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
segmentNamePostfix, fixedSegmentName, false));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.DefaultSegmentNumRowProvider;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Minion task that compacts and merges multiple segments of an upsert table and uploads it back as one single
* segment. This helps in keeping the segment count in check and also prevents a lot of small segments created over
* time.
*/
public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);

public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
super(minionConf);
}

@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
throws Exception {
int numInputSegments = segmentDirs.size();
_eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments);
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();

String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);

SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

// get list of segment metadata
List<SegmentMetadataImpl> segmentMetadataList = segmentDirs.stream().map(x -> {
try {
return new SegmentMetadataImpl(x);
} catch (Exception e) {
throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", x), e);
}
}).collect(Collectors.toList());

// validate if partitionID is same for all small segments. Get partition id value for new segment.
int partitionID = getCommonPartitionIDForSegments(segmentMetadataList);

// get the max creation time of the small segments. This will be the index creation time for the new segment.
Optional<Long> maxCreationTimeOfMergingSegments =
segmentMetadataList.stream().map(SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max);
if (maxCreationTimeOfMergingSegments.isEmpty()) {
String message = "No valid creation time found for the new merged segment. This might be due to "
+ "missing creation time for merging segments";
LOGGER.error(message);
throw new RuntimeException(message);
}

// validate if crc of deepstore copies is same as that in ZK of segments
List<String> originalSegmentCrcFromTaskGenerator =
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator);

// Fetch validDocID snapshot from server and get record-reader for compacted reader.
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(),
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s", "");
LOGGER.error(message);
throw new IllegalStateException(message);
}
return new CompactedPinotSegmentRecordReader(x.getIndexDir(), validDocIds);
}).collect(Collectors.toList());

// create new UploadedRealtimeSegment
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get());
segmentProcessorConfigBuilder.setSegmentNameGenerator(
new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionID,
System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
List<File> outputSegmentDirs;
try {
_eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
outputSegmentDirs = new SegmentProcessorFramework(segmentProcessorConfig, workingDir,
SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders),
Collections.emptyList(), new DefaultSegmentNumRowProvider(Integer.parseInt(
configs.get(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)))).process();
} finally {
for (RecordReader recordReader : recordReaders) {
recordReader.close();
}
}

long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));

List<SegmentConversionResult> results = new ArrayList<>();
for (File outputSegmentDir : outputSegmentDirs) {
String outputSegmentName = outputSegmentDir.getName();
results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Map<String, String> updateMap = new TreeMap<>();
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis()));
updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}

int getCommonPartitionIDForSegments(List<SegmentMetadataImpl> segmentMetadataList) {
List<String> segmentNames =
segmentMetadataList.stream().map(SegmentMetadataImpl::getName).collect(Collectors.toList());
Set<Integer> partitionIDSet = segmentNames.stream().map(x -> {
Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(x);
if (segmentPartitionId == null) {
throw new IllegalStateException(String.format("Partition id not found for %s", x));
}
return segmentPartitionId;
}).collect(Collectors.toSet());
if (partitionIDSet.size() > 1) {
throw new IllegalStateException(
"Found segments with different partition ids during task execution: " + partitionIDSet);
}
return partitionIDSet.iterator().next();
}

void validateCRCForInputSegments(List<SegmentMetadataImpl> segmentMetadataList, List<String> expectedCRCList) {
for (int i = 0; i < segmentMetadataList.size(); i++) {
SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentMetadata.getName(), expectedCRCList.get(i),
segmentMetadata.getCrc());
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
}
}
Loading

0 comments on commit 626c45d

Please sign in to comment.