Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer/channel does not reauthenticate based on SaslAuthenticateResponse #1080

Open
moh-incom opened this issue Nov 22, 2024 · 1 comment

Comments

@moh-incom
Copy link

moh-incom commented Nov 22, 2024

Describe the bug
When a consumer's connection outlives its SASL authentication (in this case an OAuth token), then consumer will fail to fetch new records because the broker no longer accepts any requests other than SaslHandshakeRequest and SaslAuthenticateRequest. See KIP-368 for more details.

As an example, if tokens have a valid duration of 5 minutes, the consumer will run fine for 5 minutes after which it will begin failing when it sends new requests. For example, this HeartbeatRequest for the group coordinator fails because the channel's session is expired:

ERROR:aiokafka.consumer.group_coordinator:Error sending HeartbeatRequest_v1 to node 2 [KafkaConnectionError: Connection at my.redacted-kafka.broker closed] -- marking coordinator dead

I suspect this is also an issue for admin and producer clients.

Expected behaviour
I expect the consumer to reauthenticate before its session expires.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.12.0
  • Kafka Broker version (kafka-topics.sh --version): 3.7.0
  • Other information (Confluent Cloud version, etc.): Self-hosted Kafka with Strimzi OAuth authentication plugin

Reproducible example

The problem can be reproduced with a simple consumer such as this together with a Kafka broker configured to use secured OAuth SASL authentication for example using Strimzi. If it could be of value, I will try to create a Dockerfile to setup such a broker.

import asyncio
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from azure.identity import DefaultAzureCredential

class KafkaOAuthBearerTokenProvider(AbstractTokenProvider):
    async def token(self):
        # logic for fetching token
        ...
async def consume():
    # Kafka consumer configuration with SASL authentication
    consumer = AIOKafkaConsumer(
        'sometopic',
        bootstrap_servers='my.redacted-kafka.broker,my.redacted-kafka.broker2,my.redacted-kafka.broker3',
        auto_offset_reset='earliest',
        security_protocol='SASL_PLAINTEXT', 
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider= KafkaOAuthBearerTokenProvider(),
        group_id= "someconsumergroup"
    )
    # Start the consumer
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print(f"Consumed message: {msg.value.decode('utf-8')} from partition: {msg.partition} and offset: {msg.offset} and header: {msg.headers}")
    finally:
        # Ensure the consumer is stopped gracefully
        await consumer.stop()
@moh-incom
Copy link
Author

I was able to hack reauthentication together by accessing the consumer's internal AIOKafkaConnections and running _do_sasl_handshake() periodically combined with some rather tricky commit locking, heartbeat stopping/starting and an additional lock to ensure that the client does not fetch more records while the handshake is ongoing. If any requests are made during the handshake, the broker will terminate the connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant