Skip to content

Commit

Permalink
Merge pull request #1231 from technical-debt-collector/master
Browse files Browse the repository at this point in the history
Create propagatable connection, add Transaction accessor
  • Loading branch information
vietj authored Feb 28, 2023
2 parents cad7a73 + 54d1bdd commit 8b28a02
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 13 deletions.
8 changes: 7 additions & 1 deletion vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
}

/**
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result
* Like {@link #withTransaction(Function, Handler)} but returns a {@code Future} of the asynchronous result.
*/
default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
return getConnection()
Expand All @@ -164,6 +164,12 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
.onComplete(ar -> conn.close()));
}

/**
* Like {@link #withTransaction(Function, Handler)} but allows for setting the mode, defining how the acquired
* connection is managed during the execution of the function.
*/
<T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation, Function<SqlConnection, Future<@Nullable T>> function);

/**
* Get a connection from the pool and execute the given {@code function}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public interface SqlConnection extends SqlClient {
*/
Future<Transaction> begin();

/**
* @return the current transaction if it exists, otherwise null
*/
Transaction transaction();

/**
* @return whether the connection uses SSL
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.sqlclient;

import java.util.function.Function;

/**
* Defines how the acquired connection will be managed during the execution of the function provided in
* {@link Pool#withTransaction(TransactionPropagation, Function)}.
*/
public enum TransactionPropagation {

/**
* The acquired connection is not stored anywhere, making it local to the provided function execution and to
* wherever it is passed.
*/
NONE,

/**
* Keeps the acquired connection stored in the local context for as long as the given function executes.
* Any subsequent calls to {@link Pool#withTransaction} with this mode during the function execution
* will retrieve this connection from the context instead of creating another.
* The connection is removed from the local context when the function block has completed.
*/
CONTEXT

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.vertx.sqlclient.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
Expand All @@ -23,14 +24,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PrepareOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.*;
import io.vertx.sqlclient.spi.Driver;

import java.util.function.Function;
Expand Down Expand Up @@ -77,6 +71,12 @@ public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
return delegate.preparedQuery(sql);
}

@Override
public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
Function<SqlConnection, Future<@Nullable T>> function) {
return delegate.withTransaction(txPropagation, function);
}

@Override
public P connectHandler(Handler<SqlConnection> handler) {
delegate.connectHandler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package io.vertx.sqlclient.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.*;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
import io.vertx.sqlclient.spi.Driver;

import java.util.Objects;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -53,6 +51,8 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private long timerID;
private volatile Function<Context, Future<SqlConnection>> connectionProvider;

private static final String PROPAGATABLE_CONNECTION = "propagatable_connection";

public PoolImpl(VertxInternal vertx,
Driver driver,
QueryTracer tracer,
Expand Down Expand Up @@ -155,6 +155,48 @@ public Future<SqlConnection> getConnection() {
});
}

public <T> Future<@Nullable T> withTransaction(TransactionPropagation txPropagation,
Function<SqlConnection, Future<@Nullable T>> function) {
if (txPropagation == TransactionPropagation.CONTEXT) {
ContextInternal context = (ContextInternal) Vertx.currentContext();
SqlConnection sqlConnection = context.getLocal(PROPAGATABLE_CONNECTION);
if (sqlConnection == null) {
return startPropagatableConnection(function);
}
return context.succeededFuture(sqlConnection)
.flatMap(conn -> function.apply(conn)
.onFailure(err -> {
if (!(err instanceof TransactionRollbackException)) {
conn.transaction().rollback();
}
}));
}
return withTransaction(function);
}

private <T> Future<@Nullable T> startPropagatableConnection(Function<SqlConnection, Future<@Nullable T>> function) {
ContextInternal context = (ContextInternal) Vertx.currentContext();
return getConnection().onComplete(handler -> context.putLocal(PROPAGATABLE_CONNECTION, handler.result()))
.flatMap(conn -> conn
.begin()
.flatMap(tx -> function
.apply(conn)
.compose(
res -> tx
.commit()
.flatMap(v -> context.succeededFuture(res)),
err -> {
if (err instanceof TransactionRollbackException) {
return context.failedFuture(err);
} else {
return tx
.rollback()
.compose(v -> context.failedFuture(err), failure -> context.failedFuture(err));
}
}))
.onComplete(ar -> conn.close(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Object metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public Future<Transaction> begin() {
return tx.begin();
}

@Override
public Transaction transaction() {
return tx;
}

@Override
boolean autoCommit() {
return tx == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,45 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
}));
}));
}

@Test
public void testWithPropagatableConnectionTransactionCommit(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
vertx.runOnContext(handler -> {
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
).onComplete(ctx.asyncAssertSuccess(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(3, rows.size());
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
async.complete();
}))));
});
}

@Test
public void testWithPropagatableConnectionTransactionRollback(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
Throwable failure = new Throwable();
vertx.runOnContext(handler -> {
pool.withTransaction(TransactionPropagation.CONTEXT, c ->
pool.withTransaction(TransactionPropagation.CONTEXT, conn ->
conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(
v -> Future.failedFuture(failure)))
).onComplete(ctx.asyncAssertFailure(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(0, rows.size());
ctx.assertNull(Vertx.currentContext().getLocal("propagatable_connection"));
async.complete();
}))));
});
}
}

0 comments on commit 8b28a02

Please sign in to comment.