Skip to content

Commit

Permalink
Fixes RealtimeQuickStartWithMinion (#14642)
Browse files Browse the repository at this point in the history
  • Loading branch information
Harnoor7 authored Dec 12, 2024
1 parent 167fb67 commit 7965055
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public class BootstrapTableTool {
private final AuthProvider _authProvider;
private final String _tableDir;
private final MinionClient _minionClient;
private final String _validationTypesToSkip;

public BootstrapTableTool(String controllerProtocol, String controllerHost, int controllerPort, String tableDir,
AuthProvider authProvider) {
AuthProvider authProvider, String validationTypesToSkip) {
Preconditions.checkNotNull(controllerProtocol);
Preconditions.checkNotNull(controllerHost);
Preconditions.checkNotNull(tableDir);
Expand All @@ -75,6 +76,13 @@ public BootstrapTableTool(String controllerProtocol, String controllerHost, int
_minionClient =
new MinionClient(String.format("%s://%s:%s", controllerProtocol, controllerHost, controllerPort), authProvider);
_authProvider = authProvider;
_validationTypesToSkip = validationTypesToSkip;
}

public BootstrapTableTool(String controllerProtocol, String controllerHost, int controllerPort, String tableDir,
AuthProvider authProvider) {
this(controllerProtocol, controllerHost, controllerPort, tableDir,
authProvider, null);
}

public boolean execute()
Expand Down Expand Up @@ -137,6 +145,7 @@ private boolean createTable(File schemaFile, File offlineTableConfigFile, File r
return new AddTableCommand().setSchemaFile(schemaFile.getAbsolutePath())
.setOfflineTableConfigFile(offlineTableConfigFile.getAbsolutePath())
.setRealtimeTableConfigFile(realtimeTableConfigFile.getAbsolutePath())
.setValidationTypesToSkip(_validationTypesToSkip)
.setControllerProtocol(_controllerProtocol).setControllerHost(_controllerHost)
.setControllerPort(String.valueOf(_controllerPort)).setExecute(true).setAuthProvider(_authProvider).execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public static void printStatus(Quickstart.Color color, String message) {
public abstract void execute()
throws Exception;

protected String getValidationTypesToSkip() {
return null;
}

protected List<QuickstartTableRequest> bootstrapOfflineTableDirectories(File quickstartTmpDir)
throws IOException {
List<QuickstartTableRequest> quickstartTableRequests = new ArrayList<>();
Expand All @@ -173,13 +177,13 @@ protected List<QuickstartTableRequest> bootstrapOfflineTableDirectories(File qui
File dataDir = new File(baseDir, "rawdata");
Preconditions.checkState(dataDir.mkdirs());
copyResourceTableToTmpDirectory(directory, tableName, baseDir, dataDir, false);
quickstartTableRequests.add(new QuickstartTableRequest(baseDir.getAbsolutePath()));
quickstartTableRequests.add(new QuickstartTableRequest(baseDir.getAbsolutePath(), getValidationTypesToSkip()));
}
} else {
String tableName = getTableName();
File baseDir = new File(quickstartTmpDir, tableName);
copyFilesystemTableToTmpDirectory(getBootstrapDataDir(), tableName, baseDir);
quickstartTableRequests.add(new QuickstartTableRequest(baseDir.getAbsolutePath()));
quickstartTableRequests.add(new QuickstartTableRequest(baseDir.getAbsolutePath(), getValidationTypesToSkip()));
}
return quickstartTableRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ public class QuickstartTableRequest {
private File _tableRequestFile;
private File _ingestionJobFile;
private String _bootstrapTableDir;
private String _validationTypesToSkip;

public QuickstartTableRequest(String bootstrapTableDir) {
_bootstrapTableDir = bootstrapTableDir;
}

public QuickstartTableRequest(String bootstrapTableDir, String validationTypesToSkip) {
_bootstrapTableDir = bootstrapTableDir;
_validationTypesToSkip = validationTypesToSkip;
}

public QuickstartTableRequest(String tableName, File schemaFile, File tableRequest, File ingestionJobFile) {
_tableName = tableName;
_schemaFile = schemaFile;
Expand Down Expand Up @@ -95,6 +101,14 @@ public String getBootstrapTableDir() {
return _bootstrapTableDir;
}

public String getValidationTypesToSkip() {
return _validationTypesToSkip;
}

public void setValidationTypesToSkip(String validationTypesToSkip) {
_validationTypesToSkip = validationTypesToSkip;
}

public void setBootstrapTableDir(String bootstrapTableDir) {
_bootstrapTableDir = bootstrapTableDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.pinot.tools;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;

Expand All @@ -32,7 +32,7 @@
* This quickstart shows how RealtimeToOfflineSegmentsTask and MergeRollupTask minion
* tasks continuously optimize segments as data gets ingested into Realtime table.
*/
public class RealtimeQuickStartWithMinion extends RealtimeQuickStart {
public class RealtimeQuickStartWithMinion extends HybridQuickstart {
@Override
public List<String> types() {
return Arrays.asList("REALTIME_MINION", "REALTIME-MINION");
Expand Down Expand Up @@ -66,4 +66,21 @@ public Map<String, Object> getConfigOverrides() {
properties.putIfAbsent("controller.task.scheduler.enabled", true);
return properties;
}

@Override
protected String[] getDefaultBatchTableDirectories() {
return new String[]{"examples/minions/stream/githubEvents"};
}

@Override
protected Map<String, String> getDefaultStreamTableDirectories() {
return ImmutableMap.<String, String>builder()
.put("githubEvents", "examples/minions/stream/githubEvents")
.build();
}

@Override
protected String getValidationTypesToSkip() {
return "TASK";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class AddTableCommand extends AbstractDatabaseBaseAdminCommand {
+ "creating new one")
private boolean _update = false;

@CommandLine.Option(names = {"-validationTypesToSkip"}, required = false, description =
"comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)")
private String _validationTypesToSkip = null;

private String _controllerAddress;

@Override
Expand Down Expand Up @@ -112,15 +116,32 @@ public AddTableCommand setSchemaFile(String schemaFile) {
return this;
}

public String getValidationTypesToSkip() {
return _validationTypesToSkip;
}

public AddTableCommand setValidationTypesToSkip(String validationTypesToSkip) {
_validationTypesToSkip = validationTypesToSkip;
return this;
}

public boolean sendTableCreationRequest(JsonNode node)
throws IOException {
String res = AbstractBaseAdminCommand.sendRequest("POST",
ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsCreate(), node.toString(), getHeaders(),
getTableCreationRequestURL(), node.toString(), getHeaders(),
makeTrustAllSSLContext());
LOGGER.info(res);
return res.contains("successfully added");
}

private String getTableCreationRequestURL() {
String baseURL = ControllerRequestURLBuilder.baseUrl(_controllerAddress).forTableConfigsCreate();
if (_validationTypesToSkip != null) {
return String.format(baseURL + "?validationTypesToSkip=%s", _validationTypesToSkip);
}
return baseURL;
}

public boolean sendTableUpdateRequest(JsonNode node, String tableName)
throws IOException {
String res = AbstractBaseAdminCommand.sendRequest("PUT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void bootstrapTable()
throws Exception {
for (QuickstartTableRequest request : _tableRequests) {
if (!new BootstrapTableTool("http", "localhost", _controllerPorts.get(0),
request.getBootstrapTableDir(), _authProvider).execute()) {
request.getBootstrapTableDir(), _authProvider, request.getValidationTypesToSkip()).execute()) {
throw new RuntimeException("Failed to bootstrap table with request - " + request);
}
}
Expand Down

0 comments on commit 7965055

Please sign in to comment.