diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java index 85587a430..dd3222927 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java @@ -18,9 +18,14 @@ package org.apache.xtable.model.exception; +/** Exception thrown when refresh operation (updating table format metadata) in catalog fails. */ public class CatalogRefreshException extends InternalException { - protected CatalogRefreshException(String message, Throwable e) { + public CatalogRefreshException(String message, Throwable e) { super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e); } + + public CatalogRefreshException(String message) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message); + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java index 27c730a36..ef06b9cae 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java @@ -60,12 +60,8 @@ public static class SyncStatus { SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build(); // Status code SyncStatusCode statusCode; - // error Message if any - String errorMessage; - // Readable description of the error - String errorDescription; - // Can the client retry for this type of error (Transient error=true, persistent error=false) - boolean canRetryOnFailure; + // errorDetails if any + ErrorDetails errorDetails; } /** Represents status for catalog sync status operation */ diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java index a2c2151c3..a06cde392 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java @@ -19,15 +19,14 @@ package org.apache.xtable.spi.sync; import java.net.URI; -import java.util.Arrays; -import java.util.List; import java.util.Objects; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.xtable.model.exception.CatalogRefreshException; + public class CatalogUtils { - private static final List S3_FS_SCHEMES = Arrays.asList("s3", "s3a", "s3n"); static boolean hasStorageDescriptorLocationChanged( String storageDescriptorLocation, String tableBasePath) { @@ -38,19 +37,17 @@ static boolean hasStorageDescriptorLocationChanged( URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri(); URI basePathUri = new Path(tableBasePath).toUri(); - // In case of s3 path, compare without schemes - boolean includeScheme = - !S3_FS_SCHEMES.contains(basePathUri.getScheme()) - || !S3_FS_SCHEMES.contains(storageDescriptorUri.getScheme()); - storageDescriptorLocation = getPath(storageDescriptorUri, includeScheme); - tableBasePath = getPath(basePathUri, includeScheme); - return !Objects.equals(storageDescriptorLocation, tableBasePath); - } - - private static String getPath(URI uri, boolean includeScheme) { - if (includeScheme) { - return uri.toString(); + if (storageDescriptorUri.equals(basePathUri) + || storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme()) + || basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) { + String storageDescriptorLocationIdentifier = + storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath(); + String tableBasePathIdentifier = basePathUri.getAuthority() + basePathUri.getPath(); + return !Objects.equals(storageDescriptorLocationIdentifier, tableBasePathIdentifier); } - return uri.getAuthority() + uri.getPath(); + throw new CatalogRefreshException( + String.format( + "Storage scheme has changed for table catalogStorageDescriptorUri %s basePathUri %s", + storageDescriptorUri, basePathUri)); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java index 7cd0b384f..42612ff21 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java @@ -184,9 +184,12 @@ private SyncResult buildResultForError(SyncMode mode, Instant startTime, Excepti .status( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage(e.getMessage()) - .errorDescription("Failed to sync " + mode.name()) - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage(e.getMessage()) + .errorDescription("Failed to sync " + mode.name()) + .canRetryOnFailure(true) + .build()) .build()) .syncStartTime(startTime) .syncDuration(Duration.between(startTime, Instant.now())) diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java new file mode 100644 index 000000000..69575e8e7 --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.xtable.model.exception.CatalogRefreshException; + +public class TestCatalogUtils { + + static Stream storageLocationTestArgs() { + return Stream.of( + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true), + Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v2/", true), + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false), + Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false), + Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false), + Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false), + Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", false), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v1/", false)); + } + + static Stream storageLocationTestArgsException() { + return Stream.of( + Arguments.of( + "s3://bucket/table/v1", + "gs://bucket/table/v1", + new CatalogRefreshException( + "Storage scheme has changed for table catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri gs://bucket/table/v1"))); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgs") + void testHasStorageLocationChanged(String storageLocation, String basePath, boolean expected) { + assertEquals(expected, hasStorageDescriptorLocationChanged(storageLocation, basePath)); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgsException") + void testHasStorageLocationChangedException( + String storageLocation, String basePath, Exception exception) { + assertThrows( + exception.getClass(), + () -> hasStorageDescriptorLocationChanged(storageLocation, basePath), + exception.getMessage()); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 2a9e05886..d3f31891f 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -99,9 +99,12 @@ void syncSnapshotWithFailureForOneFormat() { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync FULL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync FULL") + .canRetryOnFailure(true) + .build()) .build(), failureResult.getStatus()); @@ -176,9 +179,12 @@ void syncChangesWithFailureForOneFormat() { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync INCREMENTAL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync INCREMENTAL") + .canRetryOnFailure(true) + .build()) .build(), partialSuccessResults.get(1).getStatus());