Skip to content

Commit

Permalink
more work in progress #11057
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Dec 17, 2024
1 parent 8bf4156 commit f9e90e9
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 75 deletions.
2 changes: 1 addition & 1 deletion src/main/java/edu/harvard/iq/dataverse/api/Datasets.java
Original file line number Diff line number Diff line change
Expand Up @@ -4389,7 +4389,7 @@ public Response requestGlobusDownload(@Context ContainerRequestContext crc, @Pat
case 400:
return badRequest("Unable to grant permission");
case 409:
return conflict("Permission already exists");
return conflict("Permission already exists or no more permissions allowed");
default:
return error(null, "Unexpected error when granting permission");
}
Expand Down
259 changes: 193 additions & 66 deletions src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,20 @@ public class GlobusServiceBean implements java.io.Serializable {
private static final SimpleDateFormat logFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm-ss");

private String getRuleId(GlobusEndpoint endpoint, String principal, String permissions)
throws MalformedURLException {
/*throws MalformedURLException*/ {
// @todo the method should probably swallow this MalformedURLException, rather
// than throw it?

String principalType = "identity";

URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() + "/access_list");
String apiUrlString = "https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() + "/access_list";
URL url = null;

try {
url = new URL(apiUrlString);
} catch (MalformedURLException mue) {
logger.severe("Malformed URL exception when attempting to look up ACL rule via Globus API: " + apiUrlString);
return null;
}
MakeRequestResponse result = makeRequest(url, "Bearer", endpoint.getClientToken(), "GET", null);
if (result.status == 200) {
AccessList al = parseJson(result.jsonResponse, AccessList.class, false);
Expand Down Expand Up @@ -160,21 +169,39 @@ private void deletePermission(String ruleId, Dataset dataset, Logger globusLogge
if (dataset != null) {
GlobusEndpoint endpoint = getGlobusEndpoint(dataset);
if (endpoint != null) {
String accessToken = endpoint.getClientToken();
globusLogger.info("Start deleting permissions.");
try {
URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId()
+ "/access/" + ruleId);
MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "DELETE", null);
if (result.status != 200) {
globusLogger.warning("Cannot delete access rule " + ruleId);
} else {
globusLogger.info("Access rule " + ruleId + " was deleted successfully");
}
} catch (MalformedURLException ex) {
globusLogger.log(Level.WARNING,
"Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex);
deletePermission(ruleId, endpoint, globusLogger);
}
}
}
}

/**
* Call to delete a globus rule, via the ruleId and supplied endpoint
*
* @param ruleId - Globus rule id - assumed to be associated with the
* dataset's file path (should not be called with a user
* specified rule id w/o further checking)
* @param endpoint - the Globus endpoint associated with the rule
* @param globusLogger - a separate logger instance, may be null
*/
private void deletePermission(String ruleId, GlobusEndpoint endpoint, Logger globusLogger) {
globusLogger.fine("Start deleting rule " + ruleId + " for endpoint " + endpoint.getBasePath());
if (ruleId.length() > 0) {
if (endpoint != null) {
String accessToken = endpoint.getClientToken();
globusLogger.info("Start deleting permissions.");
try {
URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId()
+ "/access/" + ruleId);
MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "DELETE", null);
if (result.status != 200) {
globusLogger.warning("Cannot delete access rule " + ruleId);
} else {
globusLogger.info("Access rule " + ruleId + " was deleted successfully");
}
} catch (MalformedURLException ex) {
globusLogger.log(Level.WARNING,
"Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex);
}
}
}
Expand Down Expand Up @@ -209,6 +236,20 @@ public JsonObject requestAccessiblePaths(String principal, Dataset dataset, int
}
//The dir for the dataset's data exists, so try to request permission for the principal
int requestPermStatus = requestPermission(endpoint, dataset, permissions);

if (requestPermStatus == 409) {
// This is a special case - a 409 *may* mean that the rule already
// exists for this endnote and for this user (if, for example,
// Dataverse failed to remove it after the last upload has completed.
// That should be ok with us (but let's confirm that is indeed the
// case; alternatively it may mean that permissions cannot be issued
// for some other reason):
String ruleId = getRuleId(endpoint, principal, "rw");
if (ruleId != null) {
requestPermStatus = 201;
}
}

response.add("status", requestPermStatus);
if (requestPermStatus == 201) {
String driverId = dataset.getEffectiveStorageDriverId();
Expand Down Expand Up @@ -461,21 +502,26 @@ public GlobusTaskState getTask(String accessToken, String taskId, Logger globusL

MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "GET", null);

GlobusTaskState task = null;
GlobusTaskState taskState = null;

if (result.status == 200) {
task = parseJson(result.jsonResponse, GlobusTaskState.class, false);
taskState = parseJson(result.jsonResponse, GlobusTaskState.class, false);
}
// @todo some provision for a 403/permission denied here, due to an
// expired token maybe?

if (result.status != 200) {
// @todo It should probably retry it 2-3 times before giving up;
// similarly, it should probably differentiate between a "no such task"
// response and something intermittent like a server/network error or
// an expired token... i.e. something that's recoverable (?)
// edit: yes, but, should be done outside of this method, in the code
// that uses it
myLogger.warning("Cannot find information for the task " + taskId + " : Reason : "
+ result.jsonResponse.toString());
}

return task;
return taskState;
}

/**
Expand Down Expand Up @@ -638,7 +684,23 @@ public int setPermissionForDownload(Dataset dataset, String principal) {
permissions.setPath(endpoint.getBasePath() + "/");
permissions.setPermissions("r");

return requestPermission(endpoint, dataset, permissions);
// @todo: check specifically for a 409 here, which *may* indicate
// that the permission already exists on the collection - in which case
// we should confirm and proceed with the download, rather than give up.
int status = requestPermission(endpoint, dataset, permissions);

if (status == 409) {
// It's possible that the permission already exists (if, for example,
// Dataverse failed to delete it after the last download by this
// user, for whatever reason. We'll confirm that, and if that's the
// case, we'll just assume that it's ok to proceed with the download
// (rather than give up, as this code used to)
String ruleId = getRuleId(endpoint, principal, "r");
if (ruleId != null) {
return 201;
}
}
return status;
}

// Generates the URL to launch the Globus app for upload
Expand Down Expand Up @@ -760,15 +822,29 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques
String taskIdentifier = jsonData.getString("taskIdentifier");

GlobusEndpoint endpoint = getGlobusEndpoint(dataset);

// The first check on the status of the task:
// It is important to be careful here, and not give up on the task
// prematurely if anything goes wrong during this initial api call!
// So, perhaps a @todo - make a number of attempts until we get a
// valid response ?
GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "rw");
logger.fine("Found rule: " + ruleId);
String ruleId = taskState != null
? getRuleId(endpoint, taskState.getOwner_id(), "rw")
: null;

if (ruleId != null) {
logger.fine("Found rule: " + ruleId);
Long datasetId = rulesCache.getIfPresent(ruleId);
if (datasetId != null) {
// Will not delete rule
// This will only "invalidate" the local cache entry, will not
// delete or invalidate the actual Globus rule
rulesCache.invalidate(ruleId);
}
} else {
// @todo warning, etc.
// we'll proceed anyway, under the assumption that we will make
// another attempt to look it up later
}

// Wait before first check
Expand Down Expand Up @@ -810,7 +886,7 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques
// finish one way or another!)
taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger);
// @todo null check, or make sure it's never null
String taskStatus = GlobusUtil.getTaskStatus(taskState);
String taskStatus = GlobusUtil.getCompletedTaskStatus(taskState);

boolean taskSuccess = GlobusUtil.isTaskCompleted(taskState);

Expand Down Expand Up @@ -1204,30 +1280,57 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser
// If the rules_cache times out, the permission will be deleted. Presumably that
// doesn't affect a
// globus task status check
GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r");

// Wait before first check:
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()");
}

// The first check on the status of the task:
// It is important to be careful here, and not give up on the task
// prematurely if anything goes wrong during this initial api call!

GlobusTaskState taskState = null;

int retriesLimit = 3;
int retries = 0;

while (taskState == null && retries < retriesLimit) {
taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
retries++;
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()");
}
}

String ruleId = taskState != null
? getRuleId(endpoint, taskState.getOwner_id(), "r")
: null;

if (ruleId != null) {
logger.fine("Found rule: " + ruleId);
Long datasetId = rulesCache.getIfPresent(ruleId);
if (datasetId != null) {
logger.fine("Deleting from cache: rule: " + ruleId);
// Will not delete rule
logger.fine("Deleting from local cache: rule: " + ruleId);
// This will only "invalidate" the local cache entry, will not
// delete or invalidate the actual Globus rule
rulesCache.invalidate(ruleId);
}
} else {
// Something is wrong - the rule should be there (a race with the cache timing
// out?)
logger.warning("ruleId not found for taskId: " + taskIdentifier);
// @todo: do we need to bail out then, or ...?
logger.warning("ruleId not found for download taskId: " + taskIdentifier);
// We will proceed monitoring the transfer, even though the ruleId
// is null at the moment. The whole point of monitoring a download
// task is to remove the rule on the collection side once it's done,
// and we will need the rule id for that. But let's hope this was a
// temporary condition and we will eventually be able to look it up.
}

// Wait before first check
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()");
}


if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) {

// Save the task information in the database so that the Globus monitoring
Expand All @@ -1244,11 +1347,14 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser

fileHandler.close();

// return and forget
// return and forget; the Monitoring Service will pick it up on
// the next scheduled check
return;
}

// Check again:
// Old implementation:
// globusStatusCheck will loop continuously, until it determines that the
// task has completed - i.e., for the duration of the task
taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger);

processCompletedDownloadTask(taskState,
Expand Down Expand Up @@ -1541,33 +1647,38 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, GlobusTaskStat
String ruleId = globusTask.getRuleId();
Dataset dataset = globusTask.getDataset();
AuthenticatedUser authUser = globusTask.getLocalUser();

switch (globusTask.getTaskType()) {

if (GlobusTaskInProgress.TaskType.UPLOAD.equals(globusTask.getTaskType())) {
List<ExternalFileUploadInProgress> fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId());
case UPLOAD:
List<ExternalFileUploadInProgress> fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId());

if (fileUploadsInProgress == null || fileUploadsInProgress.size() < 1) {
// @todo log error message; do nothing
// (will this ever happen though?)
return;
}
if (fileUploadsInProgress == null || fileUploadsInProgress.size() < 1) {
// @todo log error message; do nothing
// (will this ever happen though?)
return;
}

JsonArrayBuilder filesJsonArrayBuilder = Json.createArrayBuilder();
JsonArrayBuilder filesJsonArrayBuilder = Json.createArrayBuilder();

for (ExternalFileUploadInProgress pendingFile : fileUploadsInProgress) {
String jsonInfoString = pendingFile.getFileInfo();
JsonObject fileObject = JsonUtil.getJsonObject(jsonInfoString);
filesJsonArrayBuilder.add(fileObject);
}
for (ExternalFileUploadInProgress pendingFile : fileUploadsInProgress) {
String jsonInfoString = pendingFile.getFileInfo();
JsonObject fileObject = JsonUtil.getJsonObject(jsonInfoString);
filesJsonArrayBuilder.add(fileObject);
}

JsonArray filesJsonArray = filesJsonArrayBuilder.build();
JsonArray filesJsonArray = filesJsonArrayBuilder.build();

processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus);
} else if (GlobusTaskInProgress.TaskType.DOWNLOAD.equals(globusTask.getTaskType())) {

processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger);

} else {
logger.warning("Unknown or null TaskType passed to processCompletedTask()");
processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus);
break;

case DOWNLOAD:

processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger);
break;

default:
logger.warning("Unknown or null TaskType passed to processCompletedTask()");
}

}
Expand All @@ -1579,13 +1690,29 @@ private void processCompletedDownloadTask(GlobusTaskState taskState,
Logger taskLogger) {
// The only thing to do on completion of a remote download
// transfer is to delete the permission ACL that Dataverse
// had negotiated for the user before the task was initialized:
// had negotiated for the user before the task was initialized ...

if (ruleId != null) {
deletePermission(ruleId, dataset, taskLogger);
GlobusEndpoint endpoint = getGlobusEndpoint(dataset);

if (endpoint != null) {
if (ruleId == null) {
// It is possible that, for whatever reason, we failed to look up
// the rule id when the monitoring of the task was initiated - but
// now that it has completed, let's try and look it up again:
//try {
getRuleId(endpoint, taskState.getOwner_id(), "r");
//} catch (MalformedURLException mue) {
// taskLogger.warning("Malformed URL Exception when looking up the rule for download task " + taskState.getTask_id());
// //@todo: the exception should probably be swallowed inside the method
//}
}

if (ruleId != null) {
deletePermission(ruleId, endpoint, taskLogger);
}
}

String taskStatus = GlobusUtil.getTaskStatus(taskState);
String taskStatus = GlobusUtil.getCompletedTaskStatus(taskState);

// ... plus log the outcome and send any notifications:
if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public String toString() {
@JoinColumn
private AuthenticatedUser user;

@Column(nullable=false)
// @Column(nullable=false) @todo we will need a flyway script in order to make
// this field nullable
private String ruleId;

@JoinColumn(nullable = false)
Expand Down
Loading

0 comments on commit f9e90e9

Please sign in to comment.