-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-34: implement Size() filter for repeated columns #3098
base: master
Are you sure you want to change the base?
Conversation
// If all values have repetition level 0, then no array has more than 1 element | ||
if (repetitionLevelHistogram.size() == 1 | ||
|| repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() | ||
.allMatch(l -> l == 0)) { | ||
|
||
// Null list fields are treated as having size 0 | ||
if (( // all lists are nulls | ||
definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() | ||
.allMatch(l -> l == 0)) | ||
|| // all lists are size 0 | ||
(definitionLevelHistogram.get(0) == 0 | ||
&& definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream() | ||
.allMatch(l -> l == 0))) { | ||
|
||
final boolean blockCannotMatch = | ||
size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); | ||
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; | ||
} | ||
|
||
long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1); | ||
|
||
// If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists | ||
// are of size 1 | ||
if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) { | ||
final boolean blockCannotMatch = size.filter( | ||
(eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1); | ||
|
||
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; | ||
} | ||
} | ||
long nonNullElementCount = | ||
repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0); | ||
long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); | ||
|
||
// Given the total number of elements and non-null fields, we can compute the max size of any array field | ||
long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); | ||
final boolean blockCannotMatch = size.filter( | ||
(eq) -> eq > maxArrayElementCount, | ||
(lt) -> false, | ||
(lte) -> false, | ||
(gt) -> gt >= maxArrayElementCount, | ||
(gte) -> gte > maxArrayElementCount); | ||
|
||
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully this is a faithful transcription of the logic outlined here: #1452 (comment)
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); | ||
} | ||
|
||
private static SizeStatistics createSizeStatisticsForRepeatedField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm dynamically generating SizeStatistics for each test case which does add a lot of LOC to the file--I could also just replace it with the computed SizeStatistics
for each test case if that's simpler. I just wrote it this way originally because I wasn't that confident in my ability to translate the striping algorithm by hand for all these cases 😅
public CountingValueInspector(ValueInspector delegate, Function<Long, Boolean> shouldUpdateDelegate) { | ||
this.observedValueCount = 0; | ||
this.delegate = delegate; | ||
this.shouldUpdateDelegate = shouldUpdateDelegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: The "shouldUpdateDelegate" is needed since don't want to terminate prematurely with a false positive. For example if we're filtering on size(eq(3))
but the input array has 4 elements, we want to prevent the delegated Eq
from returning true after it hits the third element because it thinks the condition is satisfied.
@@ -378,6 +379,11 @@ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Contains<T> conta | |||
indices -> IndexIterator.all(getPageCount())); | |||
} | |||
|
|||
@Override | |||
public PrimitiveIterator.OfInt visit(Size size) { | |||
return IndexIterator.all(getPageCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repetitionLevelHistogram
and definitionLevelHistogram
are both in scope here, should I repeat the logic from StatisticsFilter
or is that completely redundant?
final boolean blockCannotMatch = size.filter( | ||
(eq) -> eq < numDistinctValues, | ||
(lt) -> lt <= numDistinctValues, | ||
(lte) -> lte < numDistinctValues, | ||
(gt) -> false, | ||
(gte) -> false); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually now that I think about it, this isn't accurate, since we don't know the distribution of values. I guess we could combine it with SizeStatistics to get the number of elements and work out the minimum size from there.
Thanks for adding this! This is a large PR that I need to take some time to review. It would be good if @emkornfield @gszadovszky could take a look to see if this is a good use case for SizeStatistics. |
thanks, no rush on reviewing it! 👍 |
Rationale for this change
this PR continues the work outlined in #1452. It implements a
size()
predicate for filtering on # of elements in repeated fields:What changes are included in this PR?
Size()
andnot(size())
implemented for all list fields withrequired
element type. Attempting to filter on a list of optional elements will throw an exception in the schema validator. This is because the existing record-level filtering setup (IncrementallyUpdatedFilterPredicateEvaluator
) only feeds in non-null values to theValueInspectors
. thus if you had an array [1,2, null, 4] it would only count 3 elements. I can file a ticket to support this eventually but I think we'd have to rework the FilteringRecordMaterializer to be aware of repetition/definition levels.The list group itself can be
optional
orrequired
. Null lists are treated as having size 0. Again, this is due to difficulty disambiguating them at the record-level filtering step. (Would love feedback on both these design decisions!!)Are these changes tested?
Unit tests + tested a snapshot build locally with real datasets
Are there any user-facing changes?
New Operators API
Part of #1452