Skip to content

RxSession.close on Flux.cancel produces an error #1019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
simondaudin opened this issue Sep 20, 2021 · 3 comments · Fixed by #1057
Closed

RxSession.close on Flux.cancel produces an error #1019

simondaudin opened this issue Sep 20, 2021 · 3 comments · Fixed by #1057

Comments

@simondaudin
Copy link

Hello ✋

When calling Flux.cancel seems to produce an inner & not propagated exception.

  • Neo4j version: Community Neo4j 4.3.3
  • Neo4j Mode: Single instance
  • Driver version: Java Driver 4.3.4
  • Operating system: Ubuntu 20

Steps to reproduce

  1. Populate some element in the neo4j base
  2. Execute request as explained in https://neo4j.com/docs/java-manual/4.3/session-api/reactive/#java-driver-rx-result-consume
Flux.usingWhen(Mono.fromSupplier(driver::rxSession),
       rxSession -> rxSession.writeTransaction(tx -> {
        return Flux.from(tx.run("MATCH (n:Jedi) RETURN n.name LIMIT 1")
              .records())
           .map(record -> record.get(0).asString());
         }),
         RxSession::close
      )
      .next()
      .block()
  1. The flux ends without error
  2. Observes in the logs
org.neo4j.driver.exceptions.ClientException: Message 'COMMIT' cannot be handled by a session in the READY state.
        at org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:85)
        at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:108)
        at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:83)
        at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:59)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
        at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:

The Flux.next() operator transforms the Flux into a Mono & calls a .cancel() after the first emitted item.

One of my questions would be, should we still call RxSession.close() in case of cancellation ? I guess yes as written there https://neo4j.com/docs/java-manual/4.3/session-api/reactive/#java-driver-rx-cancellation

@injectives
Copy link
Contributor

Hi,

Thanks a lot for reporting this issue. The linked PR should fix it.

We recommend closing session in any case. Cancelling unconsumed transaction that has not been explicitly committed will result in rollback.

@injectives
Copy link
Contributor

Alternative code snippet that commits:

Function<RxSession,Mono<String>> txWork = session -> Mono.fromDirect( session.readTransaction(
        tx ->
        {
            RxResult rxResult = tx.run( "MATCH (n:Jedi) RETURN n.name AS value LIMIT 1" );
            return Mono.fromDirect( rxResult.records() )
                       .map( record -> record.get( "value" ).asString() );
        } ) );

var values = Mono.usingWhen(
        Mono.fromSupplier( driver::rxSession ),
        txWork,
        RxSession::close,
        ( session, error ) -> session.close(),
        RxSession::close
).block();

@simondaudin
Copy link
Author

Hi, thanks for taking a look into it, making a fix and the example of an alternative code, I appreciate it 👍

I'll try it out when I can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants