Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create propagatable connection, add Transaction accessor #1231

Merged
merged 7 commits into from
Feb 28, 2023
39 changes: 39 additions & 0 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ static Pool pool(Vertx vertx, SqlConnectOptions database, PoolOptions options) {
*/
Future<SqlConnection> getConnection();

Future<SqlConnection> getPropagatableConnection();

boolean propagatableConnectionIsActive();

Future<Void> setPropagatableConnection(SqlConnection propagatableConnection);

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -164,6 +170,39 @@ default <T> void withTransaction(Function<SqlConnection, Future<@Nullable T>> fu
.onComplete(ar -> conn.close()));
}


default <T> Future<@Nullable T> withPropagatedTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
if (propagatableConnectionIsActive()) {
return getPropagatableConnection()
.flatMap(conn -> function.apply(conn)
.onFailure(err -> {
if (!(err instanceof TransactionRollbackException)) {
conn.getTransaction().rollback();
}
}));
} else {
return getPropagatableConnection()
.flatMap(conn -> conn
.begin()
.flatMap(tx -> function
.apply(conn)
.compose(
res -> tx
.commit()
.flatMap(v -> Future.succeededFuture(res)),
err -> {
if (err instanceof TransactionRollbackException) {
return Future.failedFuture(err);
} else {
return tx
.rollback()
.compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
}
}))
.onComplete(ar -> conn.close(v -> setPropagatableConnection(null))));
}
}

/**
* Get a connection from the pool and execute the given {@code function}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public interface SqlConnection extends SqlClient {
*/
Future<Transaction> begin();

Transaction getTransaction();
technical-debt-collector marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return whether the connection uses SSL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ public Future<SqlConnection> getConnection() {
return delegate.getConnection();
}

@Override
public Future<SqlConnection> getPropagatableConnection() {
return delegate.getPropagatableConnection();
}

@Override
public boolean propagatableConnectionIsActive() {
return delegate.propagatableConnectionIsActive();
}

@Override
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
return delegate.setPropagatableConnection(propagatableConnection);
}

@Override
public Query<RowSet<Row>> query(String sql) {
return delegate.query(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private long timerID;
private volatile Function<Context, Future<SqlConnection>> connectionProvider;

private volatile SqlConnection propagatableConnection;

public PoolImpl(VertxInternal vertx,
Driver driver,
QueryTracer tracer,
Expand Down Expand Up @@ -155,6 +157,24 @@ public Future<SqlConnection> getConnection() {
});
}

@Override
public Future<SqlConnection> getPropagatableConnection() {
if (propagatableConnection == null) {
technical-debt-collector marked this conversation as resolved.
Show resolved Hide resolved
return getConnection().onComplete(c -> setPropagatableConnection(c.result()));
}
return Future.succeededFuture(propagatableConnection);
}

@Override
public boolean propagatableConnectionIsActive() {
return propagatableConnection != null;
}

@Override
public Future<Void> setPropagatableConnection(SqlConnection propagatableConnection) {
return Future.future(handler -> this.propagatableConnection = propagatableConnection);
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Object metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public Future<Transaction> begin() {
return tx.begin();
}

@Override
public Transaction getTransaction() {
return tx;
}

@Override
boolean autoCommit() {
return tx == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,36 @@ public void testWithTransactionImplicitRollback(TestContext ctx) {
}));
}));
}

@Test
public void testWithPropagatedConnectionTransactionCommit(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
pool.withPropagatedTransaction(c ->
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty()).flatMap(v ->
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (2, 'hello-2')").execute().mapEmpty())).flatMap(v2 ->
c.query("INSERT INTO mutable (id, val) VALUES (3, 'hello-3')").execute().mapEmpty())
).onComplete(ctx.asyncAssertSuccess(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(3, rows.size());
ctx.assertFalse(pool.propagatableConnectionIsActive());
async.complete();
}))));
}

@Test
public void testWithPropagatedConnectionTransactionRollback(TestContext ctx) {
Async async = ctx.async();
Pool pool = createPool();
Throwable failure = new Throwable();
pool.withPropagatedTransaction(c ->
pool.withPropagatedTransaction(conn -> conn.query("INSERT INTO mutable (id, val) VALUES (1, 'hello-1')").execute().mapEmpty().flatMap(v -> Future.failedFuture(failure)))
).onComplete(ctx.asyncAssertFailure(v -> pool
.query("SELECT id, val FROM mutable")
.execute(ctx.asyncAssertSuccess(rows -> {
ctx.assertEquals(0, rows.size());
async.complete();
}))));
}
}