Skip to content

Commit 7d16b81

Browse files
committed
Call close with the appropriate flag to commit or rollback on UnmanagedTransaction where possible to avoid double state acquisition (neo4j#1065)
* Call close with the appropriate flag to commit or rollback on UnmanagedTransaction where possible to avoid double state acquisition Calling `close` instead of separate `isOpen` and `commitAsync` requires less lock acquisitions and is safer. * Update tests
1 parent e820b34 commit 7d16b81

File tree

7 files changed

+77
-95
lines changed

7 files changed

+77
-95
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

+22-32
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private <T> void executeWork(CompletableFuture<T> resultFuture, UnmanagedTransac
146146
Throwable error = Futures.completionExceptionCause( completionError );
147147
if ( error != null )
148148
{
149-
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
149+
closeTxAfterFailedTransactionWork( tx, resultFuture, error );
150150
}
151151
else
152152
{
@@ -174,43 +174,33 @@ private <T> CompletionStage<T> safeExecuteWork(UnmanagedTransaction tx, AsyncTra
174174
}
175175
}
176176

177-
private <T> void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
177+
private <T> void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
178178
{
179-
if ( tx.isOpen() )
180-
{
181-
tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> {
182-
if ( rollbackError != null )
179+
tx.closeAsync().whenComplete(
180+
( ignored, rollbackError ) ->
183181
{
184-
error.addSuppressed( rollbackError );
185-
}
186-
resultFuture.completeExceptionally( error );
187-
} );
188-
}
189-
else
190-
{
191-
resultFuture.completeExceptionally( error );
192-
}
182+
if ( rollbackError != null )
183+
{
184+
error.addSuppressed( rollbackError );
185+
}
186+
resultFuture.completeExceptionally( error );
187+
} );
193188
}
194189

195190
private <T> void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, T result )
196191
{
197-
if ( tx.isOpen() )
198-
{
199-
tx.commitAsync().whenComplete( ( ignore, completionError ) -> {
200-
Throwable commitError = Futures.completionExceptionCause( completionError );
201-
if ( commitError != null )
192+
tx.closeAsync( true ).whenComplete(
193+
( ignored, completionError ) ->
202194
{
203-
resultFuture.completeExceptionally( commitError );
204-
}
205-
else
206-
{
207-
resultFuture.complete( result );
208-
}
209-
} );
210-
}
211-
else
212-
{
213-
resultFuture.complete( result );
214-
}
195+
Throwable commitError = Futures.completionExceptionCause( completionError );
196+
if ( commitError != null )
197+
{
198+
resultFuture.completeExceptionally( commitError );
199+
}
200+
else
201+
{
202+
resultFuture.complete( result );
203+
}
204+
} );
215205
}
216206
}

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,12 @@ else if ( beginError instanceof ConnectionReadTimeoutException )
134134

135135
public CompletionStage<Void> closeAsync()
136136
{
137-
return closeAsync( false, true );
137+
return closeAsync( false );
138+
}
139+
140+
public CompletionStage<Void> closeAsync( boolean commit )
141+
{
142+
return closeAsync( commit, true );
138143
}
139144

140145
public CompletionStage<Void> commitAsync()

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
130130
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
131131
{
132132
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
133-
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close );
133+
tx -> tx.close( true ), ( tx, error ) -> tx.close(), InternalRxTransaction::close );
134134
return session.retryLogic().retryRx( repeatableWork );
135135
}
136136

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.neo4j.driver.reactive.RxTransaction;
3131

3232
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
33-
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3433

3534
public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
3635
{
@@ -77,13 +76,13 @@ public <T> Publisher<T> rollback()
7776
return createEmptyPublisher( tx::rollbackAsync );
7877
}
7978

80-
Publisher<Void> commitIfOpen()
79+
Publisher<Void> close()
8180
{
82-
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
81+
return close( false );
8382
}
8483

85-
Publisher<Void> close()
84+
Publisher<Void> close( boolean commit )
8685
{
87-
return createEmptyPublisher( tx::closeAsync );
86+
return createEmptyPublisher( () -> tx.closeAsync( commit ) );
8887
}
8988
}

driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,21 @@ void shouldReturnFailingStageOnConflictingCompletingAction( boolean protocolComm
413413
private static Stream<Arguments> closingNotActionTransactionArgs()
414414
{
415415
return Stream.of(
416-
Arguments.of( true, 1, "commit" ),
417-
Arguments.of( false, 1, "rollback" ),
418-
Arguments.of( false, 0, "terminate" )
416+
Arguments.of( true, 1, "commit", null ),
417+
Arguments.of( false, 1, "rollback", null ),
418+
Arguments.of( false, 0, "terminate", null ),
419+
Arguments.of( true, 1, "commit", true ),
420+
Arguments.of( false, 1, "rollback", true ),
421+
Arguments.of( true, 1, "commit", false ),
422+
Arguments.of( false, 1, "rollback", false ),
423+
Arguments.of( false, 0, "terminate", false )
419424
);
420425
}
421426

422427
@ParameterizedTest
423428
@MethodSource( "closingNotActionTransactionArgs" )
424-
void shouldReturnCompletedWithNullStageOnClosingNotActiveTransaction( boolean protocolCommit, int expectedProtocolInvocations, String originalAction )
429+
void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommittingAborted(
430+
boolean protocolCommit, int expectedProtocolInvocations, String originalAction, Boolean commitOnClose )
425431
{
426432
Connection connection = mock( Connection.class );
427433
BoltProtocol protocol = mock( BoltProtocol.class );
@@ -431,7 +437,7 @@ void shouldReturnCompletedWithNullStageOnClosingNotActiveTransaction( boolean pr
431437
UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE );
432438

433439
CompletionStage<Void> originalActionStage = mapTransactionAction( originalAction, tx ).get();
434-
CompletionStage<Void> closeStage = tx.closeAsync();
440+
CompletionStage<Void> closeStage = commitOnClose != null ? tx.closeAsync( commitOnClose ) : tx.closeAsync();
435441

436442
assertTrue( originalActionStage.toCompletableFuture().isDone() );
437443
assertFalse( originalActionStage.toCompletableFuture().isCompletedExceptionally() );

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java

+27-29
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
199199
// Given
200200
NetworkSession session = mock( NetworkSession.class );
201201
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
202-
when( tx.isOpen() ).thenReturn( true );
203-
when( tx.commitAsync() ).thenReturn( completedWithNull() );
204-
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
202+
when( tx.closeAsync( true ) ).thenReturn( completedWithNull() );
205203

206204
when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) );
207205
when( session.retryLogic() ).thenReturn( new FixedRetryLogic( 1 ) );
@@ -213,7 +211,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
213211

214212
// Then
215213
verify( session ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
216-
verify( tx ).commitAsync();
214+
verify( tx ).closeAsync( true );
217215
}
218216

219217
@Test
@@ -223,25 +221,24 @@ void shouldRetryOnError() throws Throwable
223221
int retryCount = 2;
224222
NetworkSession session = mock( NetworkSession.class );
225223
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
226-
when( tx.isOpen() ).thenReturn( true );
227-
when( tx.commitAsync() ).thenReturn( completedWithNull() );
228-
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
224+
when( tx.closeAsync( false ) ).thenReturn( completedWithNull() );
229225

230226
when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) );
231227
when( session.retryLogic() ).thenReturn( new FixedRetryLogic( retryCount ) );
232228
InternalRxSession rxSession = new InternalRxSession( session );
233229

234230
// When
235-
Publisher<String> strings = rxSession.readTransaction( t ->
236-
Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ) );
231+
Publisher<String> strings = rxSession.readTransaction(
232+
t ->
233+
Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ) );
237234
StepVerifier.create( Flux.from( strings ) )
238-
// we lost the "a"s too as the user only see the last failure
239-
.expectError( RuntimeException.class )
240-
.verify();
235+
// we lost the "a"s too as the user only see the last failure
236+
.expectError( RuntimeException.class )
237+
.verify();
241238

242239
// Then
243240
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
244-
verify( tx, times( retryCount + 1 ) ).closeAsync();
241+
verify( tx, times( retryCount + 1 ) ).closeAsync( false );
245242
}
246243

247244
@Test
@@ -251,33 +248,34 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
251248
int retryCount = 2;
252249
NetworkSession session = mock( NetworkSession.class );
253250
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
254-
when( tx.isOpen() ).thenReturn( true );
255-
when( tx.commitAsync() ).thenReturn( completedWithNull() );
256-
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );
251+
when( tx.closeAsync( false ) ).thenReturn( completedWithNull() );
252+
when( tx.closeAsync( true ) ).thenReturn( completedWithNull() );
257253

258254
when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) );
259255
when( session.retryLogic() ).thenReturn( new FixedRetryLogic( retryCount ) );
260256
InternalRxSession rxSession = new InternalRxSession( session );
261257

262258
// When
263259
AtomicInteger count = new AtomicInteger();
264-
Publisher<String> strings = rxSession.readTransaction( t -> {
265-
// we fail for the first few retries, and then success on the last run.
266-
if ( count.getAndIncrement() == retryCount )
267-
{
268-
return Flux.just( "a" );
269-
}
270-
else
271-
{
272-
return Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) );
273-
}
274-
} );
260+
Publisher<String> strings = rxSession.readTransaction(
261+
t ->
262+
{
263+
// we fail for the first few retries, and then success on the last run.
264+
if ( count.getAndIncrement() == retryCount )
265+
{
266+
return Flux.just( "a" );
267+
}
268+
else
269+
{
270+
return Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) );
271+
}
272+
} );
275273
StepVerifier.create( Flux.from( strings ) ).expectNext( "a" ).verifyComplete();
276274

277275
// Then
278276
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
279-
verify( tx, times( retryCount ) ).closeAsync();
280-
verify( tx ).commitAsync();
277+
verify( tx, times( retryCount ) ).closeAsync( false );
278+
verify( tx ).closeAsync( true );
281279
}
282280

283281
@Test

driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java

+6-22
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import static org.junit.jupiter.api.Assertions.assertThrows;
4949
import static org.mockito.ArgumentMatchers.any;
5050
import static org.mockito.Mockito.mock;
51-
import static org.mockito.Mockito.never;
5251
import static org.mockito.Mockito.verify;
5352
import static org.mockito.Mockito.when;
5453
import static org.neo4j.driver.Values.parameters;
@@ -140,43 +139,28 @@ void shouldMarkTxIfFailedToRun( Function<RxTransaction, RxResult> runReturnOne )
140139
}
141140

142141
@Test
143-
void shouldCommitWhenOpen()
142+
void shouldDelegateConditionalClose()
144143
{
145144
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
146-
when( tx.isOpen() ).thenReturn( true );
147-
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
148-
149-
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
150-
Publisher<Void> publisher = rxTx.commitIfOpen();
151-
StepVerifier.create( publisher ).verifyComplete();
152-
153-
verify( tx ).commitAsync();
154-
}
155-
156-
@Test
157-
void shouldNotCommitWhenNotOpen()
158-
{
159-
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
160-
when( tx.isOpen() ).thenReturn( false );
161-
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );
145+
when( tx.closeAsync( true ) ).thenReturn( Futures.completedWithNull() );
162146

163147
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
164-
Publisher<Void> publisher = rxTx.commitIfOpen();
148+
Publisher<Void> publisher = rxTx.close( true );
165149
StepVerifier.create( publisher ).verifyComplete();
166150

167-
verify( tx, never() ).commitAsync();
151+
verify( tx ).closeAsync( true );
168152
}
169153

170154
@Test
171155
void shouldDelegateClose()
172156
{
173157
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
174-
when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() );
158+
when( tx.closeAsync( false ) ).thenReturn( Futures.completedWithNull() );
175159

176160
InternalRxTransaction rxTx = new InternalRxTransaction( tx );
177161
Publisher<Void> publisher = rxTx.close();
178162
StepVerifier.create( publisher ).verifyComplete();
179163

180-
verify( tx ).closeAsync();
164+
verify( tx ).closeAsync( false );
181165
}
182166
}

0 commit comments

Comments
 (0)