Skip to content

Commit

Permalink
Refactor some methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 6, 2024
1 parent af817aa commit d5e61a7
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
* limitations under the License.
*/

package org.apache.xtable.exception;

import org.apache.xtable.model.exception.ErrorCode;
import org.apache.xtable.model.exception.InternalException;
package org.apache.xtable.model.exception;

public class CatalogRefreshException extends InternalException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import lombok.Builder;
import lombok.Value;
Expand All @@ -42,6 +43,8 @@ public class SyncResult {
SyncStatus status;
// The Sync Mode recommended for the next sync (Usually filled on an error)
SyncMode recommendedSyncMode;
// The sync status for each catalog.
List<CatalogSyncStatus> catalogSyncStatusList;

public enum SyncStatusCode {
SUCCESS,
Expand All @@ -64,4 +67,27 @@ public static class SyncStatus {
// Can the client retry for this type of error (Transient error=true, persistent error=false)
boolean canRetryOnFailure;
}

/** Represents status for catalog sync status operation */
@Value
@Builder
public static class CatalogSyncStatus {
// Catalog Identifier.
String catalogIdentifier;
// Status code
SyncStatusCode statusCode;
// errorDetails if any
ErrorDetails errorDetails;
}

@Value
@Builder
public static class ErrorDetails {
// error Message if any
String errorMessage;
// Readable description of the error
String errorDescription;
// Can the client retry for this type of error (Transient error=true, persistent error=false)
boolean canRetryOnFailure;
}
}
103 changes: 103 additions & 0 deletions xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.spi.sync;

import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;

import org.apache.xtable.conversion.ExternalCatalog;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.exception.CatalogRefreshException;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;

@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CatalogSync {
private static final CatalogSync INSTANCE = new CatalogSync();

public static CatalogSync getInstance() {
return INSTANCE;
}

public Map<String, List<CatalogSyncStatus>> syncTable(
Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) {
Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>());
results
.get(catalogSyncOperation.getTableFormat())
.add(getCatalogSyncStatus(catalogSyncOperation, table));
}
return results;
}

private CatalogSyncStatus getCatalogSyncStatus(
CatalogSyncOperations catalogSyncOperation, InternalTable table) {
ExternalCatalog.TableIdentifier tableIdentifier = catalogSyncOperation.getTableIdentifier();
boolean doesDatabaseExists =
catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != null;
if (!doesDatabaseExists) {
catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName());
}
Object catalogTable = catalogSyncOperation.getTable(tableIdentifier);
String storageDescriptorLocation =
catalogSyncOperation.getStorageDescriptorLocation(catalogTable);
if (catalogTable == null) {
catalogSyncOperation.createTable(table, tableIdentifier);
} else if (hasStorageDescriptorLocationChanged(
storageDescriptorLocation, table.getBasePath())) {
// Replace table if there is a mismatch between hmsTable location and Xtable basePath.
// Possible reasons could be:
// 1) hms table (manually) created with a different location before and need to be
// re-created with a new basePath
// 2) XTable basePath changes due to migration or other reasons
String oldLocation =
StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation;
log.warn(
"StorageDescriptor location changed from {} to {}, re-creating table",
oldLocation,
table.getBasePath());
catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);
} else {
try {
log.debug("Table metadata changed, refreshing table");
catalogSyncOperation.refreshTable(table, catalogTable, tableIdentifier);
} catch (CatalogRefreshException e) {
log.warn("Table refresh failed, re-creating table", e);
catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);
}
}
return CatalogSyncStatus.builder()
.catalogIdentifier(catalogSyncOperation.getCatalogIdentifier())
.statusCode(SyncResult.SyncStatusCode.SUCCESS)
.build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
* limitations under the License.
*/

package org.apache.xtable.catalog;
package org.apache.xtable.spi.sync;

import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier;
import org.apache.xtable.exception.CatalogRefreshException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogType;

public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable {
String getCatalogIdentifier();

CatalogType getCatalogType();

String getTableFormat();

Expand All @@ -38,8 +41,7 @@ public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable {

void createTable(InternalTable table, TableIdentifier tableIdentifier);

void refreshTable(InternalTable table, TABLE catalogTable, TableIdentifier tableIdentifier)
throws CatalogRefreshException;
void refreshTable(InternalTable table, TABLE catalogTable, TableIdentifier tableIdentifier);

void createOrReplaceTable(InternalTable table, TableIdentifier tableIdentifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.xtable.catalog;
package org.apache.xtable.spi.sync;

import java.net.URI;
import java.util.Arrays;
Expand All @@ -27,10 +27,9 @@
import org.apache.hadoop.fs.Path;

public class CatalogUtils {

private static final List<String> S3_FS_SCHEMES = Arrays.asList("s3", "s3a", "s3n");

public static boolean hasStorageDescriptorLocationChanged(
static boolean hasStorageDescriptorLocationChanged(
String storageDescriptorLocation, String tableBasePath) {

if (StringUtils.isEmpty(storageDescriptorLocation)) {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit d5e61a7

Please sign in to comment.