Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create propagatable connection, add Transaction accessor #1231

Merged
merged 7 commits into from
Feb 28, 2023
8 changes: 7 additions & 1 deletion vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
}

/**
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result.
*/
default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
return getConnection()
Expand All @@ -164,6 +164,12 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
.onComplete(ar -> conn.close()));
}

/**
* Like {@link #withTransaction(Function, Handler)} but allows for setting the mode, defining how the acquired
* connection is managed during the execution of the function.
*/
<T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation, Function<SqlConnection, Future<@Nullable T>> function);

/**
* Get a connection from the pool and execute the given {@code function}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public interface SqlConnection extends SqlClient {
*/
Future<Transaction> begin();

/**
* @return the current transaction if it exists, otherwise null
*/
Transaction transaction();

/**
* @return whether the connection uses SSL
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.sqlclient;

import java.util.function.Function;

/**
* Defines how the acquired connection will be managed during the execution of the function provided in
* {@link Pool#withTransaction(TransactionPropagation, Function)}.
*/
public enum TransactionPropagation {

/**
* The acquired connection is not stored anywhere, making it local to the provided function execution and to
* wherever it is passed.
*/
NONE,

/**
* Keeps the acquired connection stored in the local context for as long as the given function executes.
* Any subsequent calls to {@link Pool#withTransaction} with this mode during the function execution
* will retrieve this connection from the context instead of creating another.
* The connection is removed from the local context when the function block has completed.
*/
CONTEXT

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.vertx.sqlclient.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
Expand All @@ -23,14 +24,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PrepareOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.*;
import io.vertx.sqlclient.spi.Driver;

import java.util.function.Function;
Expand Down Expand Up @@ -77,6 +71,12 @@ public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
return delegate.preparedQuery(sql);
}

@Override
public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
Function<SqlConnection, Future<@Nullable T>> function) {
return delegate.withTransaction(txPropagation, function);
}

@Override
public P connectHandler(Handler<SqlConnection> handler) {
delegate.connectHandler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package io.vertx.sqlclient.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.*;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
import io.vertx.sqlclient.spi.Driver;

import java.util.Objects;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -53,6 +51,8 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private long timerID;
private volatile Function<Context, Future<SqlConnection>> connectionProvider;

private static final String PROPAGATABLE_CONNECTION = "propagatable_connection";

public PoolImpl(VertxInternal vertx,
Driver driver,
QueryTracer tracer,
Expand Down Expand Up @@ -155,6 +155,48 @@ public Future<SqlConnection> getConnection() {
});
}

public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
Function<SqlConnection, Future<@Nullable T>> function) {
if (txPropagation == TransactionPropagation.CONTEXT) {
ContextInternal context = (ContextInternal) Vertx.currentContext();
SqlConnection sqlConnection = context.getLocal(PROPAGATABLE_CONNECTION);
if (sqlConnection == null) {
return startPropagatableConnection(function);
}
return context.succeededFuture(sqlConnection)
.flatMap(conn -> function.apply(conn)
.onFailure(err -> {
if (!(err instanceof TransactionRollbackException)) {
conn.transaction().rollback();
}
}));
}
return withTransaction(function);
}

private <T> Future<@Nullable T> startPropagatableConnection(Function<SqlConnection, Future<@Nullable T>> function) {
ContextInternal context = (ContextInternal) Vertx.currentContext();
return getConnection().onComplete(handler -> context.putLocal(PROPAGATABLE_CONNECTION, handler.result()))
.flatMap(conn -> conn
.begin()
.flatMap(tx -> function
.apply(conn)
.compose(
res -> tx
.commit()
.flatMap(v -> context.succeededFuture(res)),
err -> {
if (err instanceof TransactionRollbackException) {
return context.failedFuture(err);
} else {
return tx
.rollback()
.compose(v -> context.failedFuture(err), failure -> context.failedFuture(err));
}
}))
.onComplete(ar -> conn.close(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Object metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public Future<Transaction> begin() {
return tx.begin();
}

@Override
public Transaction transaction() {
return tx;
}

@Override
boolean autoCommit() {
return tx == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,45 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
}));
}));
}

@Test
public void testWithPropagatableConnectionTransactionCommit(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
vertx.runOnContext(handler -> {
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
).onComplete(ctx.asyncAssertSuccess(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(3, rows.size());
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
async.complete();
}))));
});
}

@Test
public void testWithPropagatableConnectionTransactionRollback(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
Throwable failure = new Throwable();
vertx.runOnContext(handler -> {
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(
v -> Future.failedFuture(failure)))
).onComplete(ctx.asyncAssertFailure(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(0, rows.size());
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
async.complete();
}))));
});
}
}