Skip to content

Commit

Permalink
Address comments Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 9, 2024
1 parent d5e61a7 commit 06501c1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.xtable.model.exception;

/** Exception thrown when refresh operation (updating table format metadata) in catalog fails. */
public class CatalogRefreshException extends InternalException {

protected CatalogRefreshException(String message, Throwable e) {
public CatalogRefreshException(String message, Throwable e) {
super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e);
}

public CatalogRefreshException(String message) {
super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ public static class SyncStatus {
SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build();
// Status code
SyncStatusCode statusCode;
// error Message if any
String errorMessage;
// Readable description of the error
String errorDescription;
// Can the client retry for this type of error (Transient error=true, persistent error=false)
boolean canRetryOnFailure;
// errorDetails if any
ErrorDetails errorDetails;
}

/** Represents status for catalog sync status operation */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.apache.xtable.spi.sync;

import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;

import org.apache.xtable.model.exception.CatalogRefreshException;

public class CatalogUtils {
private static final List<String> S3_FS_SCHEMES = Arrays.asList("s3", "s3a", "s3n");

static boolean hasStorageDescriptorLocationChanged(
String storageDescriptorLocation, String tableBasePath) {
Expand All @@ -38,19 +37,17 @@ static boolean hasStorageDescriptorLocationChanged(
URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri();
URI basePathUri = new Path(tableBasePath).toUri();

// In case of s3 path, compare without schemes
boolean includeScheme =
!S3_FS_SCHEMES.contains(basePathUri.getScheme())
|| !S3_FS_SCHEMES.contains(storageDescriptorUri.getScheme());
storageDescriptorLocation = getPath(storageDescriptorUri, includeScheme);
tableBasePath = getPath(basePathUri, includeScheme);
return !Objects.equals(storageDescriptorLocation, tableBasePath);
}

private static String getPath(URI uri, boolean includeScheme) {
if (includeScheme) {
return uri.toString();
if (storageDescriptorUri.equals(basePathUri)
|| storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme())
|| basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) {
String storageDescriptorLocationIdentifier =
storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath();
String tableBasePathIdentifier = basePathUri.getAuthority() + basePathUri.getPath();
return !Objects.equals(storageDescriptorLocationIdentifier, tableBasePathIdentifier);
}
return uri.getAuthority() + uri.getPath();
throw new CatalogRefreshException(
String.format(
"Storage scheme has changed for table catalogStorageDescriptorUri %s basePathUri %s",
storageDescriptorUri, basePathUri));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ private SyncResult buildResultForError(SyncMode mode, Instant startTime, Excepti
.status(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
.errorMessage(e.getMessage())
.errorDescription("Failed to sync " + mode.name())
.canRetryOnFailure(true)
.errorDetails(
SyncResult.ErrorDetails.builder()
.errorMessage(e.getMessage())
.errorDescription("Failed to sync " + mode.name())
.canRetryOnFailure(true)
.build())
.build())
.syncStartTime(startTime)
.syncDuration(Duration.between(startTime, Instant.now()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.spi.sync;

import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.stream.Stream;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.xtable.model.exception.CatalogRefreshException;

public class TestCatalogUtils {

static Stream<Arguments> storageLocationTestArgs() {
return Stream.of(
Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true),
Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true),
Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v2/", true),
Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false),
Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false),
Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false),
Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false),
Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", false),
Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v1/", false));
}

static Stream<Arguments> storageLocationTestArgsException() {
return Stream.of(
Arguments.of(
"s3://bucket/table/v1",
"gs://bucket/table/v1",
new CatalogRefreshException(
"Storage scheme has changed for table catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri gs://bucket/table/v1")));
}

@ParameterizedTest
@MethodSource("storageLocationTestArgs")
void testHasStorageLocationChanged(String storageLocation, String basePath, boolean expected) {
assertEquals(expected, hasStorageDescriptorLocationChanged(storageLocation, basePath));
}

@ParameterizedTest
@MethodSource("storageLocationTestArgsException")
void testHasStorageLocationChangedException(
String storageLocation, String basePath, Exception exception) {
assertThrows(
exception.getClass(),
() -> hasStorageDescriptorLocationChanged(storageLocation, basePath),
exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ void syncSnapshotWithFailureForOneFormat() {
assertEquals(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
.errorMessage("Failure")
.errorDescription("Failed to sync FULL")
.canRetryOnFailure(true)
.errorDetails(
SyncResult.ErrorDetails.builder()
.errorMessage("Failure")
.errorDescription("Failed to sync FULL")
.canRetryOnFailure(true)
.build())
.build(),
failureResult.getStatus());

Expand Down Expand Up @@ -176,9 +179,12 @@ void syncChangesWithFailureForOneFormat() {
assertEquals(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
.errorMessage("Failure")
.errorDescription("Failed to sync INCREMENTAL")
.canRetryOnFailure(true)
.errorDetails(
SyncResult.ErrorDetails.builder()
.errorMessage("Failure")
.errorDescription("Failed to sync INCREMENTAL")
.canRetryOnFailure(true)
.build())
.build(),
partialSuccessResults.get(1).getStatus());

Expand Down

0 comments on commit 06501c1

Please sign in to comment.