Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT consumer silently stops consuming data #2764

Open
marc-batto opened this issue Sep 25, 2024 · 0 comments
Open

MQTT consumer silently stops consuming data #2764

marc-batto opened this issue Sep 25, 2024 · 0 comments
Labels

Comments

@marc-batto
Copy link

Hi,

We've been using the mqtt functionality for the last couple of months and are really happy with it so far. Yesterday however the integration stopped working all of a sudden and I am not able to figure out what is going wrong. Our usecase is to simply send whatever comes in from the MQTT stream into an eventhub on Azure to do some processing over there. The code itself looks like this:

import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MqttRepublisher {

    @Incoming("mqtt-topic")
    @Outgoing("eventhub-egress")
    public Uni<byte[]> republish(byte[] byteList) {
        return Uni.createFrom().item(byteList);
    }
}

The application.properties looks like this:

%prod.mp.messaging.incoming.mqtt-topic.ssl=true
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.location=${CERT}
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.password=${KEY}
mp.messaging.incoming.mqtt-topic.topic=${MQTT_TOPIC}
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.type=PKCS12
%prod.mp.messaging.incoming.mqtt-topic.ssl.truststore.type=PKCS12
mp.messaging.incoming.mqtt-topic.type=smallrye-mqtt
mp.messaging.incoming.mqtt-topic.host=${MQTT_HOSTNAME}
mp.messaging.incoming.mqtt-topic.port=${MQTT_PORT}
mp.messaging.incoming.mqtt-topic.max-message-size=600000
%prod.mp.messaging.incoming.mqtt-topic.client-id=client-id
%prod.mp.messaging.incoming.mqtt-topic.username=client-id
mp.messaging.incoming.mqtt-topic.qos=1


mp.messaging.outgoing.eventhub-egress.topic=${MQTT_EVENTHUB_TOPIC}
mp.messaging.outgoing.eventhub-egress.connector=smallrye-kafka
mp.messaging.outgoing.eventhub-egress.key.serializer=org.apache.kafka.common.serialization.StringSerializer
%prod.mp.messaging.outgoing.eventhub-egress.bootstrap.servers=${EVENT_HUB_READINGS_BOOTSTRAP_SERVERS}
%prod.mp.messaging.outgoing.eventhub-egress.security.protocol=SASL_SSL
%prod.mp.messaging.outgoing.eventhub-egress.sasl.mechanism=PLAIN
%prod.mp.messaging.outgoing.eventhub-egress.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="${AZURE_EVENT_HUB_CONNECTION_STRING}";

In the logging I do not see anything happening at all, the MQTT broker sees there is an active connection. There are messages coming in into the broker on the topic.

When I restart the process it all works again as before, but it did cut off our data influx without any signs of going wrong. I would really like to know how this happened and how I could fix it.

Please let me know if I can supply you with any additional information that you need.

Java 21.0.3
maven 3.9.7
Running on a kubernetes cluster in Azure v 1.29.2
build container from -> ubi9/openjdk-21:1.20
Quarkus 3.11.3

Thanks!

@ozangunalp ozangunalp added the mqtt label Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants