Skip to content

Commit

Permalink
[590] Add interface for CatalogSyncClient and CatalogSync
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 18, 2024
1 parent a360aff commit 115abc5
Show file tree
Hide file tree
Showing 26 changed files with 970 additions and 43 deletions.
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
<maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
<mockito.version>5.2.0</mockito.version>
<parquet.version>1.12.2</parquet.version>
<protobuf.version>3.25.5</protobuf.version>
<scala12.version>2.12.20</scala12.version>
Expand Down Expand Up @@ -438,7 +439,19 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.8.0</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

Expand Down
4 changes: 4 additions & 0 deletions xtable-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,9 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@EqualsAndHashCode(callSuper = true)
@Getter
public class SourceTable extends ExternalTable {
/** The path to the data files, defaults to the metadataPath */
/** The path to the data files, defaults to the basePath */
@NonNull private final String dataPath;

@Builder(toBuilder = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.catalog;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

/** This class represents the unique identifier for a table in a catalog. */
@Value
@Builder
public class CatalogTableIdentifier {
/**
* Catalogs have the ability to group tables logically, databaseName is the identifier for such
* logical classification. The alternate names for this field include namespace, schemaName etc.
*/
@NonNull String databaseName;

/**
* The table name used when syncing the table to the catalog. NOTE: This name can be different
* from the table name in storage.
*/
@NonNull String tableName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.exception;

/** Exception thrown when refresh operation (updating table format metadata) in catalog fails. */
public class CatalogRefreshException extends InternalException {

public CatalogRefreshException(String message, Throwable e) {
super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e);
}

public CatalogRefreshException(String message) {
super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum ErrorCode {
INVALID_SCHEMA(10006),
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
PARSE_EXCEPTION(10009);
PARSE_EXCEPTION(10009),
CATALOG_REFRESH_EXCEPTION(10010);

private final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import lombok.Builder;
import lombok.Value;
Expand All @@ -30,18 +31,20 @@
* @since 0.1
*/
@Value
@Builder
@Builder(toBuilder = true)
public class SyncResult {
// Mode used for the sync
SyncMode mode;
Instant lastInstantSynced;
Instant syncStartTime;
// Duration
Duration syncDuration;
// Status of the sync
SyncStatus status;
// Status of the tableFormat sync
SyncStatus tableFormatSyncStatus;
// The Sync Mode recommended for the next sync (Usually filled on an error)
SyncMode recommendedSyncMode;
// The sync status for each catalog.
List<CatalogSyncStatus> catalogSyncStatusList;

public enum SyncStatusCode {
SUCCESS,
Expand All @@ -57,6 +60,25 @@ public static class SyncStatus {
SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build();
// Status code
SyncStatusCode statusCode;
// errorDetails if any
ErrorDetails errorDetails;
}

/** Represents status for catalog sync status operation */
@Value
@Builder
public static class CatalogSyncStatus {
// Catalog Identifier.
String catalogName;
// Status code
SyncStatusCode statusCode;
// errorDetails if any
ErrorDetails errorDetails;
}

@Value
@Builder
public static class ErrorDetails {
// error Message if any
String errorMessage;
// Readable description of the error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.spi.extractor;

import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* A client for converting the table with tableIdentifier {@link CatalogTableIdentifier} in {@link
* org.apache.xtable.conversion.SourceCatalog} to SourceTable object. {@link SourceTable} can be
* used by downstream consumers for syncing it to multiple {@link
* org.apache.xtable.conversion.TargetTable}
*/
public interface CatalogConversionSource {
/** Returns the source table object present in the catalog. */
SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public interface ConversionSource<COMMIT> extends Closeable {
*/
InternalTable getTable(COMMIT commit);

/**
* Extracts the {@link InternalTable} as of latest state. This method is less expensive as
* compared to {@link ConversionSource#getCurrentSnapshot()} as it doesn't load the files present
* in the table.
*
* @return {@link InternalTable} representing the current table.
*/
InternalTable getCurrentTable();

/**
* Extracts the {@link InternalSnapshot} as of latest state.
*
Expand Down
129 changes: 129 additions & 0 deletions xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.spi.sync;

import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;

import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;

/** Provides the functionality to sync metadata from InternalTable to multiple target catalogs */
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CatalogSync {
private static final CatalogSync INSTANCE = new CatalogSync();

public static CatalogSync getInstance() {
return INSTANCE;
}

public SyncResult syncTable(
Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients, InternalTable table) {
List<CatalogSyncStatus> results = new ArrayList<>();
Instant startTime = Instant.now();
catalogSyncClients.forEach(
((tableIdentifier, catalogSyncClient) -> {
try {
results.add(syncCatalog(catalogSyncClient, tableIdentifier, table));
log.info(
"Catalog sync is successful for table {} with format {} using catalogSync {}",
table.getBasePath(),
table.getTableFormat(),
catalogSyncClient.getClass().getName());
} catch (Exception e) {
log.error(
"Catalog sync failed for table {} with format {} using catalogSync {}",
table.getBasePath(),
table.getTableFormat(),
catalogSyncClient.getClass().getName());
results.add(
getCatalogSyncFailureStatus(
catalogSyncClient.getCatalogName(), catalogSyncClient.getClass().getName(), e));
}
}));
return SyncResult.builder()
.lastInstantSynced(table.getLatestCommitTime())
.syncStartTime(startTime)
.syncDuration(Duration.between(startTime, Instant.now()))
.catalogSyncStatusList(results)
.build();
}

private <TABLE> CatalogSyncStatus syncCatalog(
CatalogSyncClient<TABLE> catalogSyncClient,
CatalogTableIdentifier tableIdentifier,
InternalTable table) {
if (!catalogSyncClient.hasDatabase(tableIdentifier.getDatabaseName())) {
catalogSyncClient.createDatabase(tableIdentifier.getDatabaseName());
}
TABLE catalogTable = catalogSyncClient.getTable(tableIdentifier);
String storageDescriptorLocation = catalogSyncClient.getStorageDescriptorLocation(catalogTable);
if (catalogTable == null) {
catalogSyncClient.createTable(table, tableIdentifier);
} else if (hasStorageDescriptorLocationChanged(
storageDescriptorLocation, table.getBasePath())) {
// Replace table if there is a mismatch between hmsTable location and Xtable basePath.
// Possible reasons could be:
// 1) hms table (manually) created with a different location before and need to be
// re-created with a new basePath
// 2) XTable basePath changes due to migration or other reasons
String oldLocation =
StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation;
log.warn(
"StorageDescriptor location changed from {} to {}, re-creating table",
oldLocation,
table.getBasePath());
catalogSyncClient.createOrReplaceTable(table, tableIdentifier);
} else {
log.debug("Table metadata changed, refreshing table");
catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier);
}
return CatalogSyncStatus.builder()
.catalogName(catalogSyncClient.getCatalogName())
.statusCode(SyncResult.SyncStatusCode.SUCCESS)
.build();
}

private CatalogSyncStatus getCatalogSyncFailureStatus(
String catalogName, String catalogImpl, Exception e) {
return CatalogSyncStatus.builder()
.catalogName(catalogName)
.statusCode(SyncResult.SyncStatusCode.ERROR)
.errorDetails(
SyncResult.ErrorDetails.builder()
.errorMessage(e.getMessage())
.errorDescription("catalogSync failed for " + catalogImpl)
.build())
.build();
}
}
Loading

0 comments on commit 115abc5

Please sign in to comment.