Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobCancellationException after client closes connection #211

Open
Alexander-N opened this issue Feb 8, 2022 · 6 comments
Open

JobCancellationException after client closes connection #211

Alexander-N opened this issue Feb 8, 2022 · 6 comments
Labels
Milestone

Comments

@Alexander-N
Copy link

When using ktor with a requestStream handler like this

fun Application.module() {
    install(WebSockets)
    install(RSocketSupport) {
        server = RSocketServer()
    }

    routing {
        rSocket("stream") {
            RSocketRequestHandler {
                log.info("New subscriber")
                requestStream {
                    flow {
                        repeat(2) { i ->
                            log.info("Emit $i")
                            emit(buildPayload { data("data: $i") })
                        }
                    }
                }
            }
        }
    }
}

I get the following exception after the client closes the connection:

kotlinx.coroutines.JobCancellationException: StandaloneCoroutine is cancelling
Caused by: kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
        at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
        at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
        at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
        at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-02-08 13:16:07.904 [eventLoopGroupProxy-4-1] ERROR Application - Unhandled exception caught for CoroutineName(call-handler)
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
        at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1141)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:938)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSessionImpl.kt:158)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:235)
        at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:191)
        at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:162)
        at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:406)
        at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:522)
        at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.resumeReceiveClosed(AbstractChannel.kt:989)
        at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:343)
        at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:272)
        at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:94)
        at io.ktor.http.cio.websocket.RawWebSocket$1.invokeSuspend(RawWebSocket.kt:66)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda-1$lambda-0(NettyApplicationEngine.kt:251)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

I'm executing the following code on the client side:

runBlocking {
    val client = HttpClient(OkHttp) {
        install(WebSockets)
        install(RSocketSupport)
    }
    val stream = client
        .rSocket("ws://localhost:8080/stream")
        .requestStream(Payload.Empty)
    stream
        .take(2)
        .toList()
    client.close()
}

The full example project to reproduce the error: https://github.com/Alexander-N/ktor-rsocket-error

@ghost
Copy link

ghost commented Feb 10, 2022

Hey, if I understand correctly, you get an error on server side, right? And from stacktrace I see, that the error thrown from ktor-websockets implementation, and not from rsocket-kotlin. I see, that may be we can try at our (rsocket-kotlin) side to close websocket safely (via close frame) inside implementation, but looks like it's an issue in exception reporting on ktor side.
I will do some tests later to check if ktor do the same, if just using plain websockets without rsocket-kotlin over it.

@Alexander-N
Copy link
Author

Thanks for looking into this! Yes, the error is on server side. If the websocket is not closed then ktor-websockets seems correct in throwing this exception, so I'm not sure if it's worth reporting this issue upstream.

@ghost
Copy link

ghost commented Feb 10, 2022

The main question is, what happens on ktor side, when closing HttpClient. Will it even try to close open websocket session safely (via close frame) or it will just close underlying connection. And as so, what is expected behavior (by design) is in ktor. I will take a look later and will give an answer here, and if needed will do a fix, or will create an issue in ktor repository.

@Alexander-N
Copy link
Author

That would be great, are you still planning to look into it @olme04?

@ghost
Copy link

ghost commented Feb 22, 2022

Yes, I will look into it in coming days

@whyoleg whyoleg added this to the 0.17.0 milestone Oct 5, 2022
@whyoleg
Copy link
Member

whyoleg commented Oct 18, 2022

@whyoleg whyoleg modified the milestone: 0.16.0 - Transport API/Internals rework, QUIC, Netty Nov 24, 2022
@whyoleg whyoleg added the bug label May 3, 2023
@whyoleg whyoleg removed this from the 0.16.0 - Transport API/Internals rework, QUIC, Netty milestone May 3, 2023
@whyoleg whyoleg added this to the 0.17.0 milestone Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants