Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

escaping table names on spark connector #10577

Open
1 task done
akanz1 opened this issue Dec 13, 2024 · 0 comments
Open
1 task done

escaping table names on spark connector #10577

akanz1 opened this issue Dec 13, 2024 · 0 comments
Labels
bug Incorrect behavior inside of ibis

Comments

@akanz1
Copy link
Contributor

akanz1 commented Dec 13, 2024

What happened?

When running against a databricks instance using the spark connector i manage to work with all sorts of catalog names (backticks, hyphens, etc.) but cannot get the same to work for table names. Escaping the table name using backticks before getting the ibis table does not work as shows below.

table_name = "my-table"
connection = ibis.pyspark.connect(session=DatabricksSession.builder.sdkConfig(conf).getOrCreate())
table = connection.table(f"`{table_name}`")

Table looks like the following:

DatabaseTable: `my-table`
  col_1    int64
  col_2    string
table.count().execute()
traceback
  Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/code/.venv/lib/python3.10/site-packages/ibis/expr/types/core.py", line 396, in execute
    return self._find_backend(use_default=True).execute(
  File "/code/.venv/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py", line 450, in execute
    with self._safe_raw_sql(sql) as query:
  File "/opt/homebrew/Cellar/[email protected]/3.10.16/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/code/.venv/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py", line 428, in _safe_raw_sql
    yield self.raw_sql(query)
  File "/code/.venv/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py", line 433, in raw_sql
    return self._session.sql(query, **kwargs)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/session.py", line 653, in sql
    data, properties = self.client.execute_command(cmd.command(self._client))
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1205, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1627, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1604, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1913, in _handle_error
    self._handle_rpc_error(error)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1988, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'my'. SQLSTATE: 42601 (line 1, pos 32)

== SQL ==
SELECT COUNT(*) AS `CountStar("`my-table"`)` FROM `"`my-table"`` AS `t0`
--------------------------------^^^


JVM stacktrace:
org.apache.spark.sql.catalyst.parser.ParseException
	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:308)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:114)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:137)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:106)
	at com.databricks.sql.parser.DatabricksSqlParser.$anonfun$parsePlan$1(DatabricksSqlParser.scala:80)
	at com.databricks.sql.parser.DatabricksSqlParser.parse(DatabricksSqlParser.scala:101)
	at com.databricks.sql.parser.DatabricksSqlParser.parsePlan(DatabricksSqlParser.scala:77)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:949)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:454)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:948)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:947)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:3095)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2929)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2866)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:348)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:262)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:192)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:192)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:574)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:574)

This seems to be an issue with how the sqlglot expression is turned into a sqlstring as it gives me (see L.156, backends/sql/__init__.py:

'SELECT COUNT(*) AS `CountStar("`my-table"`)` FROM `"`my-table"`` AS `t0`'

Not escaping the table name leads to a different error

table_name = "my-table"
connection = ibis.pyspark.connect(session=DatabricksSession.builder.sdkConfig(conf).getOrCreate())
table = connection.table(table_name)
traceback
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/code/.venv/lib/python3.10/site-packages/ibis/backends/sql/__init__.py", line 137, in table
    table_schema = self.get_schema(name, catalog=catalog, database=database)
  File "/code/.venv/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py", line 546, in get_schema
    struct = PySparkType.to_ibis(df.schema)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/dataframe.py", line 1887, in schema
    self._cached_schema = self._session.client.schema(query)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1172, in schema
    schema = self._analyze(method="schema", plan=plan).schema
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1440, in _analyze
    self._handle_error(error)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1913, in _handle_error
    self._handle_rpc_error(error)
  File "/code/.venv/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 1988, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.ParseException: 
[INVALID_IDENTIFIER] The unquoted identifier my-table is invalid and must be back quoted as: `my-table`.
Unquoted identifiers can only contain ASCII letters ('a' - 'z', 'A' - 'Z'), digits ('0' - '9'), and underbar ('_').
Unquoted identifiers must also not start with a digit.
Different data sources and meta stores may impose additional restrictions on valid identifiers. SQLSTATE: 42602 (line 1, pos 4)

== SQL ==
my-table
----^^^


JVM stacktrace:
org.apache.spark.sql.catalyst.parser.ParseException
	at org.apache.spark.sql.errors.QueryParsingErrors$.invalidIdentifierError(QueryParsingErrors.scala:486)
	at org.apache.spark.sql.catalyst.parser.PostProcessor$.exitErrorIdent(parsers.scala:344)
	at org.apache.spark.sql.catalyst.parser.SqlBaseParser$ErrorIdentContext.exitRule(SqlBaseParser.java:41144)
	at org.antlr.v4.runtime.Parser.triggerExitRuleEvent(Parser.java:408)
	at org.antlr.v4.runtime.Parser.exitRule(Parser.java:642)
	at org.apache.spark.sql.catalyst.parser.SqlBaseParser.singleTableIdentifier(SqlBaseParser.java:1113)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.$anonfun$parseTableIdentifier$1(AbstractSqlParser.scala:70)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:96)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:137)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseTableIdentifier(AbstractSqlParser.scala:69)
	at com.databricks.sql.parser.DatabricksSqlParser.parseTableIdentifier(DatabricksSqlParser.scala:56)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformReadRel(SparkConnectPlanner.scala:1395)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:176)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:478)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:477)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:171)
	at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.transformRelation$1(SparkConnectAnalyzeHandler.scala:62)
	at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.process(SparkConnectAnalyzeHandler.scala:67)
	at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1(SparkConnectAnalyzeHandler.scala:49)
	at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1$adapted(SparkConnectAnalyzeHandler.scala:48)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.handle(SparkConnectAnalyzeHandler.scala:48)
	at org.apache.spark.sql.connect.service.SparkConnectService.analyzePlan(SparkConnectService.scala:102)
	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:801)
	at grpc_shaded.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
	at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.$anonfun$onHalfClose$1(AuthenticationInterceptor.scala:310)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$3(RequestContext.scala:286)
	at com.databricks.spark.connect.service.RequestContext$.com$databricks$spark$connect$service$RequestContext$$withLocalProperties(RequestContext.scala:473)
	at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$2(RequestContext.scala:286)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
	at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
	at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
	at com.databricks.spark.util.UniverseAttributionContextWrapper.withValue(AttributionContextUtils.scala:228)
	at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$1(RequestContext.scala:285)
	at com.databricks.spark.connect.service.RequestContext.withContext(RequestContext.scala:298)
	at com.databricks.spark.connect.service.RequestContext.runWith(RequestContext.scala:278)
	at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.onHalfClose(AuthenticationInterceptor.scala:310)
	at grpc_shaded.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
	at grpc_shaded.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
	at grpc_shaded.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
	at grpc_shaded.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:351)
	at grpc_shaded.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
	at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

What version of ibis are you using?

ibis: 9.5.0
pyspark: 3.5.0
sqlglot: 25.20.2
sqlglotrs: 0.2.12

What backend(s) are you using, if any?

spark

Relevant log output

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct
@akanz1 akanz1 added the bug Incorrect behavior inside of ibis label Dec 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Incorrect behavior inside of ibis
Projects
Status: backlog
Development

No branches or pull requests

1 participant