-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upsert small segment merger task in minions #14477
Upsert small segment merger task in minions #14477
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14477 +/- ##
============================================
+ Coverage 61.75% 63.98% +2.23%
- Complexity 207 1600 +1393
============================================
Files 2436 2696 +260
Lines 133233 148389 +15156
Branches 20636 22740 +2104
============================================
+ Hits 82274 94946 +12672
- Misses 44911 46476 +1565
- Partials 6048 6967 +919
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
dc1df38
to
64cd7d6
Compare
Marking it ready for review for early feedback! |
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
Outdated
Show resolved
Hide resolved
.../org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
Outdated
Show resolved
Hide resolved
.../org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
Show resolved
Hide resolved
.../org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
Outdated
Show resolved
Hide resolved
.../org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
Outdated
Show resolved
Hide resolved
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
@@ -199,4 +200,59 @@ public static class UpsertCompactionTask { | |||
*/ | |||
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; | |||
} | |||
|
|||
public static class UpsertCompactMergeTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broad question: say we run both merge compaction and the segment refresh task for a table. How do we ensure that there aren't any race conditions, because it could be that the two tasks end up processing the same set of segments concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with merge-compaction we would end up creating a new segment and refresh would just go and do a replaceSegment flow in metadata-manager. For refresh of LLC segment we should be good as the newly generated UploadedRealtimeSegment would override the keys which it should.
For refresh of UploadedRealtimeSegment, there's an issue right now where we use creation time to resolve for the latest uploaded segment, now there can be an edge case where creation time is same for both then whichever gets refreshed / uploaded later would dominate. But overall if you see it doesn't matter in the broad sense because for upserts we would still point to just one record per key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correctness wise we should be fine I guess, but this will still impede or impact the Segment Refresh task. Can you create a separate issue for this with some context on the problem so we could pick it up later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
Outdated
Show resolved
Hide resolved
private static final String DEFAULT_BUFFER_PERIOD = "2d"; | ||
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500; | ||
|
||
public static class SegmentMergerMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private class or VisibleForTests
?
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Outdated
Show resolved
Hide resolved
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); | ||
|
||
List<SegmentConversionResult> results = new ArrayList<>(); | ||
for (File outputSegmentDir : outputSegmentDirs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be sure, could one task generate multiple new segments or just one? If multiple segments could be created, then how to handle cases when some segments failed to get uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation ensures generating one segment. But if you think about it even if we generate multiple segments, we don't need to think about rollback much. It's just that whatever segments got uploaded, validDocIds will be uploaded for those and the corresponding segments would be discarded as their docs will be invalid (assuming takeSnapshot has run once).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
For M:N merging, I think the computation of alreadyMergedSegments
might need some extensions, as you may want to retry some of the input segments, instead of skipping them if they are in some segment's custom map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For M:N merging, I think the computation of alreadyMergedSegments might need some extensions, as you may want to retry some of the input segments, instead of skipping them if they are in some segment's custom map.
Yeah or we ensure always selecting complete 'x' segments while creating N segments such that sum over x = M. For the sake of simplicity, we are resolving this in task-generation itself currently ensuring to generate multiple X:1 subtasks instead.
83f52a8
to
ed2f8ea
Compare
...org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
Show resolved
Hide resolved
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); | ||
|
||
List<SegmentConversionResult> results = new ArrayList<>(); | ||
for (File outputSegmentDir : outputSegmentDirs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
For M:N merging, I think the computation of alreadyMergedSegments
might need some extensions, as you may want to retry some of the input segments, instead of skipping them if they are in some segment's custom map.
96c5e17
to
3487e06
Compare
3487e06
to
f82aa93
Compare
// get the max creation time of the small segments. This will be the index creation time for the new segment. | ||
Optional<Long> maxCreationTimeOfMergingSegments = segmentMetadataList.stream().map( | ||
SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max); | ||
if (maxCreationTimeOfMergingSegments.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm: numInputSegments
is always greater than 0 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we have added the check in generator code to ensure numInputSegments > 1
int partitionID = entry.getKey(); | ||
List<SegmentMergerMetadata> segments = entry.getValue(); | ||
// task config thresholds | ||
// TODO add output segment size as one of the thresholds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a issue for this for tracking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR related to the PEP request: #14305
Here, we are adding a new minion task to merge small segments in an upsert table. More implementation details in the design doc of the linked issue.
Test plan: Enabled this in one of infinite retention tables in Uber. The tables had ~35k segments initially and after enabling this task for ~2 days we were able to reach ~2k segments. The curve also flattens post reaching ~2k segments. We are using the default configs of this task and the table is generating ~500 segments daily. See attached screenshot.
Few details: