Skip to content

Commit 556ceff

Browse files
committed
Ensure reactive transaction function gets rolled back on cancellation
This update also ensures that session cancellation does not result in multiple rollback attempts.
1 parent a44346c commit 556ceff

File tree

2 files changed

+130
-137
lines changed

2 files changed

+130
-137
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

+129-136
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.EnumSet;
2323
import java.util.concurrent.CompletionException;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.locks.Lock;
26+
import java.util.concurrent.locks.ReentrantLock;
2527
import java.util.function.BiFunction;
2628

2729
import org.neo4j.driver.Bookmark;
@@ -41,87 +43,45 @@
4143

4244
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4345
import static org.neo4j.driver.internal.util.Futures.failedFuture;
46+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
4447

4548
public class UnmanagedTransaction
4649
{
4750
private enum State
4851
{
49-
/** The transaction is running with no explicit success or failure marked */
52+
/**
53+
* The transaction is running with no explicit success or failure marked
54+
*/
5055
ACTIVE,
5156

5257
/**
53-
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
54-
* fatal connection error.
58+
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a fatal connection error.
5559
*/
5660
TERMINATED,
5761

58-
/** This transaction has successfully committed */
59-
COMMITTED,
60-
61-
/** This transaction has been rolled back */
62-
ROLLED_BACK
63-
}
64-
65-
/**
66-
* This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block.
67-
*/
68-
private static final class StateHolder
69-
{
70-
private static final EnumSet<State> OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED );
71-
private static final StateHolder ACTIVE_HOLDER = new StateHolder( State.ACTIVE, null );
72-
private static final StateHolder COMMITTED_HOLDER = new StateHolder( State.COMMITTED, null );
73-
private static final StateHolder ROLLED_BACK_HOLDER = new StateHolder( State.ROLLED_BACK, null );
74-
7562
/**
76-
* The actual state.
63+
* This transaction has successfully committed
7764
*/
78-
final State value;
65+
COMMITTED,
7966

8067
/**
81-
* If this holder contains a state of {@link State#TERMINATED}, this represents the cause if any.
68+
* This transaction has been rolled back
8269
*/
83-
final Throwable causeOfTermination;
84-
85-
static StateHolder of( State value )
86-
{
87-
switch ( value )
88-
{
89-
case ACTIVE:
90-
return ACTIVE_HOLDER;
91-
case COMMITTED:
92-
return COMMITTED_HOLDER;
93-
case ROLLED_BACK:
94-
return ROLLED_BACK_HOLDER;
95-
case TERMINATED:
96-
default:
97-
throw new IllegalArgumentException( "Cannot provide a default state holder for state " + value );
98-
}
99-
}
100-
101-
static StateHolder terminatedWith( Throwable cause )
102-
{
103-
return new StateHolder( State.TERMINATED, cause );
104-
}
105-
106-
private StateHolder( State value, Throwable causeOfTermination )
107-
{
108-
this.value = value;
109-
this.causeOfTermination = causeOfTermination;
110-
}
111-
112-
boolean isOpen()
113-
{
114-
return OPEN_STATES.contains( this.value );
115-
}
70+
ROLLED_BACK
11671
}
11772

73+
private static final EnumSet<State> OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED );
74+
11875
private final Connection connection;
11976
private final BoltProtocol protocol;
12077
private final BookmarkHolder bookmarkHolder;
12178
private final ResultCursorsHolder resultCursors;
12279
private final long fetchSize;
123-
124-
private volatile StateHolder state = StateHolder.of( State.ACTIVE );
80+
private final Lock lock = new ReentrantLock();
81+
private State state = State.ACTIVE;
82+
private CompletionStage<Void> commitStage;
83+
private CompletionStage<Void> rollbackStage;
84+
private Throwable causeOfTermination;
12585

12686
public UnmanagedTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
12787
{
@@ -164,50 +124,63 @@ else if ( beginError instanceof ConnectionReadTimeoutException )
164124

165125
public CompletionStage<Void> closeAsync()
166126
{
167-
if ( isOpen() )
168-
{
169-
return rollbackAsync();
170-
}
171-
else
172-
{
173-
return completedWithNull();
174-
}
127+
return executeWithLock( lock, () -> isOpen() ? rollbackAsync() : completedWithNull() );
175128
}
176129

177130
public CompletionStage<Void> commitAsync()
178131
{
179-
if ( state.value == State.COMMITTED )
132+
return executeWithLock( lock, () ->
180133
{
181-
return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) );
182-
}
183-
else if ( state.value == State.ROLLED_BACK )
184-
{
185-
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
186-
}
187-
else
188-
{
189-
return resultCursors.retrieveNotConsumedError()
190-
.thenCompose( error -> doCommitAsync( error ).handle( handleCommitOrRollback( error ) ) )
191-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( true, error ) );
192-
}
134+
if ( state == State.COMMITTED )
135+
{
136+
return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) );
137+
}
138+
else if ( state == State.ROLLED_BACK )
139+
{
140+
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
141+
}
142+
else if ( commitStage != null )
143+
{
144+
return commitStage;
145+
}
146+
else
147+
{
148+
CompletionStage<Void> stage = resultCursors.retrieveNotConsumedError()
149+
.thenCompose( error -> doCommitAsync( error ).handle( handleCommitOrRollback( error ) ) )
150+
.whenComplete( ( ignore, error ) -> releaseConnection( error ) );
151+
commitStage = stage;
152+
stage.whenComplete( ( ignored, error ) -> updateStateAfterCommitOrRollback( true, error ) );
153+
return stage;
154+
}
155+
} );
193156
}
194157

195158
public CompletionStage<Void> rollbackAsync()
196159
{
197-
if ( state.value == State.COMMITTED )
198-
{
199-
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
200-
}
201-
else if ( state.value == State.ROLLED_BACK )
160+
return executeWithLock( lock, () ->
202161
{
203-
return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) );
204-
}
205-
else
206-
{
207-
return resultCursors.retrieveNotConsumedError()
208-
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
209-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
210-
}
162+
if ( state == State.COMMITTED )
163+
{
164+
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
165+
}
166+
else if ( state == State.ROLLED_BACK )
167+
{
168+
return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) );
169+
}
170+
else if ( rollbackStage != null )
171+
{
172+
return rollbackStage;
173+
}
174+
else
175+
{
176+
CompletionStage<Void> stage = resultCursors.retrieveNotConsumedError()
177+
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
178+
.whenComplete( ( ignore, error ) -> releaseConnection( error ) );
179+
rollbackStage = stage;
180+
stage.whenComplete( ( ignored, error ) -> updateStateAfterCommitOrRollback( false, error ) );
181+
return stage;
182+
}
183+
} );
211184
}
212185

213186
public CompletionStage<ResultCursor> runAsync( Query query )
@@ -219,7 +192,7 @@ public CompletionStage<ResultCursor> runAsync( Query query )
219192
return cursorStage.thenCompose( AsyncResultCursor::mapSuccessfulRunCompletionAsync ).thenApply( cursor -> cursor );
220193
}
221194

222-
public CompletionStage<RxResultCursor> runRx(Query query)
195+
public CompletionStage<RxResultCursor> runRx( Query query )
223196
{
224197
ensureCanRunQueries();
225198
CompletionStage<RxResultCursor> cursorStage =
@@ -230,22 +203,27 @@ public CompletionStage<RxResultCursor> runRx(Query query)
230203

231204
public boolean isOpen()
232205
{
233-
return state.isOpen();
206+
State currentState = executeWithLock( lock, () -> state );
207+
return OPEN_STATES.contains( currentState );
234208
}
235209

236210
public void markTerminated( Throwable cause )
237211
{
238-
if ( state.value == State.TERMINATED )
212+
executeWithLock( lock, () ->
239213
{
240-
if ( state.causeOfTermination != null )
214+
if ( state == State.TERMINATED )
241215
{
242-
addSuppressedWhenNotCaptured( state.causeOfTermination, cause );
216+
if ( causeOfTermination != null )
217+
{
218+
addSuppressedWhenNotCaptured( causeOfTermination, cause );
219+
}
243220
}
244-
}
245-
else
246-
{
247-
state = StateHolder.terminatedWith( cause );
248-
}
221+
else
222+
{
223+
state = State.TERMINATED;
224+
causeOfTermination = cause;
225+
}
226+
} );
249227
}
250228

251229
private void addSuppressedWhenNotCaptured( Throwable currentCause, Throwable newCause )
@@ -267,39 +245,40 @@ public Connection connection()
267245

268246
private void ensureCanRunQueries()
269247
{
270-
if ( state.value == State.COMMITTED )
271-
{
272-
throw new ClientException( "Cannot run more queries in this transaction, it has been committed" );
273-
}
274-
else if ( state.value == State.ROLLED_BACK )
275-
{
276-
throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" );
277-
}
278-
else if ( state.value == State.TERMINATED )
248+
executeWithLock( lock, () ->
279249
{
280-
throw new ClientException( "Cannot run more queries in this transaction, " +
281-
"it has either experienced an fatal error or was explicitly terminated", state.causeOfTermination );
282-
}
250+
if ( state == State.COMMITTED )
251+
{
252+
throw new ClientException( "Cannot run more queries in this transaction, it has been committed" );
253+
}
254+
else if ( state == State.ROLLED_BACK )
255+
{
256+
throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" );
257+
}
258+
else if ( state == State.TERMINATED )
259+
{
260+
throw new ClientException( "Cannot run more queries in this transaction, " +
261+
"it has either experienced an fatal error or was explicitly terminated", causeOfTermination );
262+
}
263+
} );
283264
}
284265

285266
private CompletionStage<Void> doCommitAsync( Throwable cursorFailure )
286267
{
287-
if ( state.value == State.TERMINATED )
288-
{
289-
return failedFuture( new ClientException( "Transaction can't be committed. " +
290-
"It has been rolled back either because of an error or explicit termination",
291-
cursorFailure != state.causeOfTermination ? state.causeOfTermination : null ) );
292-
}
293-
return protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark );
268+
ClientException exception = executeWithLock(
269+
lock, () -> state == State.TERMINATED
270+
? new ClientException( "Transaction can't be committed. " +
271+
"It has been rolled back either because of an error or explicit termination",
272+
cursorFailure != causeOfTermination ? causeOfTermination : null )
273+
: null
274+
);
275+
return exception != null ? failedFuture( exception ) : protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark );
294276
}
295277

296278
private CompletionStage<Void> doRollbackAsync()
297279
{
298-
if ( state.value == State.TERMINATED )
299-
{
300-
return completedWithNull();
301-
}
302-
return protocol.rollbackTransaction( connection );
280+
State currentState = executeWithLock( lock, () -> state );
281+
return currentState == State.TERMINATED ? completedWithNull() : protocol.rollbackTransaction( connection );
303282
}
304283

305284
private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
@@ -315,17 +294,8 @@ private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable
315294
};
316295
}
317296

318-
private void handleTransactionCompletion( boolean commitOnSuccess, Throwable throwable )
297+
private void releaseConnection( Throwable throwable )
319298
{
320-
if ( commitOnSuccess && throwable == null )
321-
{
322-
state = StateHolder.of( State.COMMITTED );
323-
}
324-
else
325-
{
326-
state = StateHolder.of( State.ROLLED_BACK );
327-
}
328-
329299
if ( throwable instanceof AuthorizationExpiredException )
330300
{
331301
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
@@ -339,4 +309,27 @@ else if ( throwable instanceof ConnectionReadTimeoutException )
339309
connection.release(); // release in background
340310
}
341311
}
312+
313+
private void updateStateAfterCommitOrRollback( boolean commitAttempt, Throwable throwable )
314+
{
315+
executeWithLock( lock, () ->
316+
{
317+
if ( commitAttempt && throwable == null )
318+
{
319+
state = State.COMMITTED;
320+
}
321+
else
322+
{
323+
state = State.ROLLED_BACK;
324+
}
325+
if ( commitAttempt )
326+
{
327+
commitStage = null;
328+
}
329+
else
330+
{
331+
rollbackStage = null;
332+
}
333+
} );
334+
}
342335
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
130130
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
131131
{
132132
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
133-
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null );
133+
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close );
134134
return session.retryLogic().retryRx( repeatableWork );
135135
}
136136

0 commit comments

Comments
 (0)