From bac8375ab4548cd975923cef4e33ad958d6ffc84 Mon Sep 17 00:00:00 2001 From: Ryan Yeats Date: Fri, 6 Sep 2024 16:03:39 -0700 Subject: [PATCH] Add core federation and broker connections to the management console --- .../activemq/artemis/logs/AuditLogger.java | 15 ++ .../management/BrokerConnectionControl.java | 66 +++++ .../core/management/FederationControl.java | 36 +++ .../FederationRemoteConsumerControl.java | 72 ++++++ .../management/FederationStreamControl.java | 90 +++++++ .../core/management/ObjectNameBuilder.java | 42 ++++ .../api/core/management/ResourceNames.java | 8 + .../amqp/connect/AMQPBrokerConnection.java | 9 + .../connect/AMQPBrokerConnectionManager.java | 5 +- .../impl/ActiveMQServerControlImpl.java | 2 + .../impl/BrokerConnectionControlImpl.java | 177 ++++++++++++++ .../impl/FederationControlImpl.java | 110 +++++++++ .../FederationRemoteConsumerControlImpl.java | 154 ++++++++++++ .../impl/FederationStreamControlImpl.java | 229 ++++++++++++++++++ .../artemis/core/server/BrokerConnection.java | 5 + .../server/federation/FederatedAbstract.java | 14 ++ .../federation/FederatedQueueConsumer.java | 2 + .../FederatedQueueConsumerImpl.java | 8 + .../core/server/federation/Federation.java | 6 + .../server/federation/FederationManager.java | 6 + .../server/management/ManagementService.java | 24 ++ .../impl/ManagementServiceImpl.java | 102 ++++++++ .../group/impl/ClusteredResetMockTest.java | 48 ++++ .../management/ActiveMQServerControlTest.java | 6 + 24 files changed, 1235 insertions(+), 1 deletion(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 13d78f72c6ef..4c3a21d08966 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2744,4 +2744,19 @@ static void getAuthorizationFailureCount(Object source) { @LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO) void getAuthorizationFailureCount(String user, Object source); + + static void startFederation(Object source) { + BASE_LOGGER.startFederation(getCaller(), source); + } + + @LogMessage(id = 601782, value = "User {} is starting a federation on target resource: {}", level = LogMessage.Level.INFO) + void startFederation(String user, Object source); + + static void stopFederation(Object source) { + BASE_LOGGER.stopFederation(getCaller(), source); + } + + @LogMessage(id = 601783, value = "User {} is stopping a federation on target resource: {}", level = LogMessage.Level.INFO) + void stopFederation(String user, Object source); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java new file mode 100644 index 000000000000..fe119bb61fd5 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerConnectionControl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BrokerConnectionControl is used to manage a BrokerConnections. + */ +public interface BrokerConnectionControl extends ActiveMQComponentControl { + + /** + * Returns {@code true} if this connection is open, {@code false} else. + */ + @Attribute(desc = "whether this connection is open") + boolean isOpen(); + + /** + * Returns the name of this broker connection + */ + @Attribute(desc = "name of this broker connection") + String getName(); + + /** + * Returns the connection uri for this broker connection. + */ + @Attribute(desc = "connection uri for this broker connection") + String getUri(); + + /** + * Returns the user this broker connection is using. + */ + @Attribute(desc = "the user this broker connection is using") + String getUser(); + + /** + * Returns the protocol this broker connection is using. + */ + @Attribute(desc = "protocol this broker connection is using") + String getProtocol(); + + /** + * Returns the retry interval used by this broker connection. + */ + @Attribute(desc = "retry interval used by this broker connection") + long getRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this broker connection. + */ + @Attribute(desc = "number of reconnection attempts used by this broker connection") + int getReconnectAttempts(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java new file mode 100644 index 000000000000..64e732477ab1 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationControl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A federationControl is used to manage a federation. + */ +public interface FederationControl extends ActiveMQComponentControl { + + /** + * Returns the name of this federation + */ + @Attribute(desc = "name of this federation") + String getName(); + + /** + * Returns the name of the user the federation is associated with. + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java new file mode 100644 index 000000000000..b254f5ca6ab4 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationRemoteConsumerControl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationRemoteConsumerControl { + + /** + * Returns the name of the queue that is being federated too + */ + @Attribute(desc = "name of the queue that is being federated too") + String getQueueName(); + + /** + * Returns the address this remote consumer will forward messages from. + */ + @Attribute(desc = "address this remote consumer will forward messages from") + String getAddress(); + + + /** + * Returns the priority of this remote consumer will consumer messages. + */ + @Attribute(desc = "address this remote consumer will consumer messages") + int getPriority(); + + /** + * Returns the routing type associated with this address. + */ + @Attribute(desc = "routing type for this address") + String getRoutingType(); + + /** + * Returns the filter string associated with this remote consumer. + */ + @Attribute(desc = "filter string associated with this remote consumer") + String getFilterString(); + + /** + * Returns the queue filter string associated with this remote consumer. + */ + @Attribute(desc = "queue filter string associated with this remote consumer") + String getQueueFilterString(); + + /** + * Returns the fully qualified queue name associated with this remote consumer. + */ + @Attribute(desc = "fully qualified queue name associated with this remote consumer") + String getFqqn(); + + /** + * Returns the number of messages that have been federated for this address. + */ + @Attribute(desc = "number of messages that have been federated for this address") + long getFederatedMessageCount(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java new file mode 100644 index 000000000000..ab2a0540f117 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/FederationStreamControl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core.management; + +/** + * A BridgeControl is used to manage a federation stream. + */ +public interface FederationStreamControl extends ActiveMQComponentControl { + + /** + * Returns the name of this federation stream + */ + @Attribute(desc = "name of this federation stream") + String getName(); + + /** + * Returns any list of static connectors used by this federation stream + */ + @Attribute(desc = "list of static connectors used by this federation stream") + String[] getStaticConnectors() throws Exception; + + /** + * Returns the name of the discovery group used by this federation stream. + */ + @Attribute(desc = "name of the discovery group used by this federation stream") + String getDiscoveryGroupName(); + + /** + * Returns the retry interval used by this federation stream. + */ + @Attribute(desc = "retry interval used by this federation stream") + long getRetryInterval(); + + /** + * Returns the retry interval multiplier used by this federation stream. + */ + @Attribute(desc = "retry interval multiplier used by this federation stream") + double getRetryIntervalMultiplier(); + + /** + * Returns the max retry interval used by this federation stream. + */ + @Attribute(desc = "max retry interval used by this federation stream") + long getMaxRetryInterval(); + + /** + * Returns the number of reconnection attempts used by this federation stream. + */ + @Attribute(desc = "number of reconnection attempts used by this federation stream") + int getReconnectAttempts(); + + /** + * Returns {@code true} if steam allows a shared connection, {@code false} else. + */ + @Attribute(desc = "whether this stream will allow the connection to be shared") + boolean isSharedConnection(); + + /** + * Returns {@code true} if this connection is configured to pull, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured to pull") + boolean isPull(); + + /** + * Returns {@code true} the connection is configured for HA, {@code false} else. + */ + @Attribute(desc = "whether this connection is configured for HA") + boolean isHA(); + + /** + * Returns the name of the user the federation is associated with + */ + @Attribute(desc = "name of the user the federation is associated with") + String getUser(); + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java index 2d103f591685..ab955b5ba198 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ObjectNameBuilder.java @@ -126,6 +126,15 @@ public ObjectName getBroadcastGroupObjectName(final String name) throws Exceptio return createObjectName("broadcast-group", name); } + /** + * Returns the ObjectName used by BrokerConnectionControl. + * + * @see BrokerConnectionControl + */ + public ObjectName getBrokerConnectionObjectName(String name) throws Exception { + return createObjectName("broker-connection", name); + } + /** * Returns the ObjectName used by BridgeControl. * @@ -135,6 +144,15 @@ public ObjectName getBridgeObjectName(final String name) throws Exception { return createObjectName("bridge", name); } + /** + * Returns the ObjectName used by FederationControl. + * + * @see FederationControl + */ + public ObjectName getFederationObjectName(String name) throws Exception { + return createObjectName("federation", name); + } + /** * Returns the ObjectName used by ClusterConnectionControl. * @@ -169,4 +187,28 @@ public ObjectName getManagementContextObjectName() throws Exception { public ObjectName getSecurityObjectName() throws Exception { return ObjectName.getInstance("hawtio:type=security,area=jmx,name=ArtemisJMXSecurity"); } + + /** + * Returns the ObjectName used by FederationStreamControl. + * + * @see FederationStreamControl + */ + public ObjectName getFederationStreamObjectName(SimpleString federationName, + SimpleString streamName) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s", getActiveMQServerName(), ObjectName.quote(federationName.toString()), ObjectName.quote(streamName.toString().toLowerCase()))); + } + + /** + * Returns the ObjectName used by FederationRemoteConsumerControl. + * + * @see FederationRemoteConsumerControl + */ + public ObjectName getFederationRemoteConsumerObjectName(final SimpleString federationName, + final SimpleString streamName, + final SimpleString address, + final SimpleString queueName, + RoutingType routingType) throws Exception { + return ObjectName.getInstance(String.format("%s,component=federations,name=%s,streamName=%s,address=%s,subcomponent=queues,routing-type=%s,queue=%s", getActiveMQServerName(), ObjectName.quote(federationName.toString()), ObjectName.quote(streamName.toString()), ObjectName.quote(address.toString()), ObjectName.quote(routingType.toString().toLowerCase()), ObjectName.quote(queueName.toString()))); + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java index e07f99ce4e72..c6f24d4c9b6a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ResourceNames.java @@ -34,8 +34,16 @@ public final class ResourceNames { public static final String ADDRESS = "address."; + public static final String BROKER_CONNECTION = "brokerconnection."; + public static final String BRIDGE = "bridge."; + public static final String FEDERATION = "federation."; + + public static final String FEDERATION_STREAM = "federationstream."; + + public static final String FEDERATION_REMOTE_CONSUMER = "federationremoteconsumer."; + public static final String ACCEPTOR = "acceptor."; public static final String DIVERT = "divert."; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index f47cf26dd367..9ed3edc028e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -194,6 +194,15 @@ public AMQPBrokerConnectConfiguration getConfiguration() { return brokerConnectConfiguration; } + @Override + public boolean isOpen() { + if (connection != null) { + return connection.isOpen(); + } else { + return false; + } + } + @Override public boolean isStarted() { return started; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java index 73f77b9cf63e..e8c1d49e995e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java @@ -89,7 +89,7 @@ private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server); amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection); server.registerBrokerConnection(amqpBrokerConnection); - + server.getManagementService().registerBrokerConnection(amqpBrokerConnection); if (start) { amqpBrokerConnection.start(); } @@ -142,6 +142,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } @@ -183,6 +184,7 @@ public void updateConfiguration(List configurati connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } @@ -211,6 +213,7 @@ public void stop() throws Exception { connection.stop(); } finally { server.unregisterBrokerConnection(connection); + server.getManagementService().unregisterBrokerConnection(connection.getName()); } } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 7076a2273615..6776ce2d66de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -4000,6 +4000,8 @@ public String listBrokerConnections() { obj.add("name", brokerConnection.getName()); obj.add("protocol", brokerConnection.getProtocol()); obj.add("started", brokerConnection.isStarted()); + obj.add("uri", brokerConnection.getConfiguration().getUri()); + obj.add("open", brokerConnection.isOpen()); connections.add(obj.build()); } return connections.build().toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java new file mode 100644 index 000000000000..6b4d5fc4dd87 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerConnectionControlImpl.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.BrokerConnection; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; +import javax.management.NotCompliantMBeanException; + +public class BrokerConnectionControlImpl extends AbstractControl implements BrokerConnectionControl { + + private final BrokerConnection brokerConnection; + + public BrokerConnectionControlImpl(BrokerConnection brokerConnection, + StorageManager storageManager) throws NotCompliantMBeanException { + super(BrokerConnectionControl.class, storageManager); + this.brokerConnection = brokerConnection; + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isOpen() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.brokerConnection); //TODO + } + clearIO(); + try { + return brokerConnection.isOpen(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startBrokerConnection(this.brokerConnection.getName()); + } + clearIO(); + try { + brokerConnection.stop(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getName(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUri() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getStaticConnectors(this.brokerConnection); //TODO + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUri(); + } finally { + blockOnIO(); + } + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getProtocol() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getProducerWindowSize(this.brokerConnection);//TODO + } + clearIO(); + try { + return brokerConnection.getProtocol(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.brokerConnection); + } + clearIO(); + try { + return brokerConnection.getConfiguration().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(BrokerConnectionControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(BrokerConnectionControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java new file mode 100644 index 000000000000..75f1d1fdeb56 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationControlImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +import org.apache.activemq.artemis.api.core.management.FederationControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.logs.AuditLogger; + +public class FederationControlImpl extends AbstractControl implements FederationControl { + + private final Federation federation; + + public FederationControlImpl(final Federation federation, final StorageManager storageManager) throws Exception { + super(FederationControl.class, storageManager); + this.federation = federation; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federation); + } + clearIO(); + try { + return federation.getConfig().getCredentials().getUser(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federation); + } + clearIO(); + try { + return federation.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.federation); + } + clearIO(); + try { + return federation.isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startFederation(this.federation); + } + clearIO(); + try { + federation.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.stopFederation(this.federation); + } + clearIO(); + try { + federation.stop(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationControl.class); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java new file mode 100644 index 000000000000..d469d3e5f9e5 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationRemoteConsumerControlImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import org.apache.activemq.artemis.api.core.management.FederationRemoteConsumerControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +public class FederationRemoteConsumerControlImpl extends AbstractControl implements FederationRemoteConsumerControl { + + private final FederatedConsumerKey federatedConsumerKey; + private final FederatedQueueConsumer federatedConsumer; + + public FederationRemoteConsumerControlImpl(final FederatedQueueConsumer federatedConsumer, + final StorageManager storageManager) throws Exception { + super(FederationRemoteConsumerControl.class, storageManager); + this.federatedConsumerKey = federatedConsumer.getKey(); + this.federatedConsumer = federatedConsumer; + } + + @Override + public String getAddress() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getAddress(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getAddress().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public int getPriority() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageExpiryThreadPriority(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getPriority(); + } finally { + blockOnIO(); + } + } + + @Override + public String getRoutingType() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRoutingType(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getRoutingType().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getFilterString().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getQueueName(federatedConsumer); + } + clearIO(); + try { + return federatedConsumerKey.getQueueName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getQueueFilterString() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getFilterString(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getQueueFilterString().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String getFqqn() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(federatedConsumer); //TODO + } + clearIO(); + try { + return federatedConsumerKey.getFqqn().toString(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationRemoteConsumerControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationRemoteConsumerControl.class); + } + + @Override + public long getFederatedMessageCount() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageCount(federatedConsumer); + } + clearIO(); + try { + return federatedConsumer.federatedMessageCount(); + } finally { + blockOnIO(); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java new file mode 100644 index 000000000000..2a1cd94ea11a --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/FederationStreamControlImpl.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl; + +import org.apache.activemq.artemis.api.core.management.FederationStreamControl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.federation.FederationStream; +import org.apache.activemq.artemis.logs.AuditLogger; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + +public class FederationStreamControlImpl extends AbstractControl implements FederationStreamControl { + + private final FederationStream federationStream; + + public FederationStreamControlImpl(final FederationStream federationStream, + final StorageManager storageManager) throws Exception { + super(FederationStreamControl.class, storageManager); + this.federationStream = federationStream; + } + + @Override + public String getUser() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getUser(this.federationStream); + } + clearIO(); + try { + return federationStream.getConfig().getConnectionConfiguration().getUsername(); + } finally { + blockOnIO(); + } + } + + @Override + public String getName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getName(this.federationStream); + } + clearIO(); + try { + return federationStream.getName().toString(); + } finally { + blockOnIO(); + } + } + + @Override + public String[] getStaticConnectors() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getStaticConnectors(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getStaticConnectors().toArray(new String[0]); + } finally { + blockOnIO(); + } + } + + @Override + public String getDiscoveryGroupName() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getDiscoveryGroupName(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getDiscoveryGroupName(); + } finally { + blockOnIO(); + } + } + + @Override + public long getRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public double getRetryIntervalMultiplier() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getRetryIntervalMultiplier(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getRetryIntervalMultiplier(); + } finally { + blockOnIO(); + } + } + + @Override + public long getMaxRetryInterval() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMaxRetryInterval(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getMaxRetryInterval(); + } finally { + blockOnIO(); + } + } + + @Override + public int getReconnectAttempts() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getReconnectAttempts(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().getReconnectAttempts(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isSharedConnection() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isSharedStore(this.federationStream); //TODO + } + clearIO(); + try { + return federationStream.getConnection().isSharedConnection(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isPull() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isHA(this.federationStream); //TODO + } + clearIO(); + try { + return federationStream.getConnection().isPull(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isHA() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isHA(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().getConfig().isHA(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isStarted() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.isStarted(this.federationStream); + } + clearIO(); + try { + return federationStream.getConnection().isStarted(); + } finally { + blockOnIO(); + } + } + + @Override + public void start() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.startFederation(this.federationStream); //TODO + } + clearIO(); + try { + federationStream.start(); + } finally { + blockOnIO(); + } + } + + @Override + public void stop() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.stopFederation(this.federationStream); //TODO + } + clearIO(); + try { + federationStream.stop(); + } finally { + blockOnIO(); + } + } + + @Override + protected MBeanOperationInfo[] fillMBeanOperationInfo() { + return MBeanInfoHelper.getMBeanOperationsInfo(FederationStreamControl.class); + } + + @Override + protected MBeanAttributeInfo[] fillMBeanAttributeInfo() { + return MBeanInfoHelper.getMBeanAttributesInfo(FederationStreamControl.class); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java index 7251eaad23eb..28c57a3ace21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java @@ -35,4 +35,9 @@ public interface BrokerConnection extends ActiveMQComponent { */ BrokerConnectConfiguration getConfiguration(); + /** + * @return if the connection is currently open. + */ + boolean isOpen(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java index 8cb020b87674..7770e6794a64 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java @@ -117,6 +117,15 @@ public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transfor remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback); remoteQueueConsumer.start(); remoteQueueConsumers.put(key, remoteQueueConsumer); + try { + server.getManagementService().registerFederationRemoteConsumer(remoteQueueConsumer); + final FederatedQueueConsumer finalRemoteQueueConsumer = remoteQueueConsumer; + server.callBrokerFederationPlugins(plugin -> plugin.beforeCloseFederatedQueueConsumer(finalRemoteQueueConsumer)); + } catch (Exception t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError("beforeCloseFederatedQueueConsumer", t); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + if (server.hasBrokerFederationPlugins()) { try { @@ -147,6 +156,11 @@ public synchronized void removeRemoteConsumer(FederatedConsumerKey key) { if (remoteQueueConsumer.decrementCount() <= 0) { remoteQueueConsumer.close(); remoteQueueConsumers.remove(key); + try { + server.getManagementService().unregisterFederationRemoteConsumer(federation.getName(), upstream.getName(), key.getAddress(), key.getQueueName(), key.getRoutingType()); + } catch (Exception e) { + //TODO + } } if (server.hasBrokerFederationPlugins()) { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java index d9d945a95e74..8580efbd0815 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumer.java @@ -52,4 +52,6 @@ static int getNextDelay(int delay, int delayMultiplier, int delayMax) { void start(); void close(); + + long federatedMessageCount(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index 2b854d9c5a1d..a1b7ad054578 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -65,6 +66,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi private ClientSessionFactoryInternal clientSessionFactory; private ClientSession clientSession; private ClientConsumerInternal clientConsumer; + private final AtomicLong federatedMessageCount = new AtomicLong(); private final AtomicInteger pendingPullCredit = new AtomicInteger(); private QueueHandle queueHandle; @@ -246,6 +248,11 @@ public synchronized void close() { } } + @Override + public long federatedMessageCount() { + return federatedMessageCount.get(); + } + private void scheduleDisconnect(int delay) { scheduledExecutorService.schedule(() -> { try { @@ -304,6 +311,7 @@ public void onMessage(ClientMessage clientMessage) { message = transformer == null ? message : transformer.transform(message); if (message != null) { server.getPostOffice().route(message, true); + federatedMessageCount.incrementAndGet(); } clientMessage.acknowledge(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java index e9b974ccca95..03c5d874515b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java @@ -80,9 +80,11 @@ public synchronized void stop() { for (FederationUpstream connection : upstreams.values()) { connection.stop(); + server.getManagementService().unregisterFederationStream(connection.getFederation().getName(), connection.getName()); //TODO unregister or just stop? } for (FederationDownstream connection : downstreams.values()) { connection.stop(); + server.getManagementService().unregisterFederationStream(connection.getFederation().getName(), connection.getName()); //TODO unregister or just stop? } upstreams.clear(); downstreams.clear(); @@ -109,11 +111,13 @@ public synchronized boolean undeploy(String name) { FederationUpstream federationConnection = upstreams.remove(name); if (federationConnection != null) { federationConnection.stop(); + server.getManagementService().unregisterFederationStream(federationConnection.getFederation().getName(), federationConnection.getName()); } FederationDownstream federationConnectionDownstream = downstreams.remove(name); if (federationConnectionDownstream != null) { federationConnectionDownstream.undeploy(); federationConnectionDownstream.stop(); + server.getManagementService().unregisterFederationStream(federationConnectionDownstream.getFederation().getName(), federationConnectionDownstream.getName()); } return true; } @@ -137,6 +141,7 @@ public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfi private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) { FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration); + server.getManagementService().registerFederationStream(upstream); upstreams.put(name, upstream); if (state == FederationManager.State.STARTED) { upstream.start(); @@ -180,6 +185,7 @@ private synchronized FederationDownstream deploy(String name, FederationDownstre if (state == FederationManager.State.STARTED) { downstream.start(); } + server.getManagementService().registerFederationStream(downstream); return downstream; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java index 9e213590cfb3..a455c1b06d53 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java @@ -23,11 +23,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.management.ManagementService; public class FederationManager implements ActiveMQComponent { private final ActiveMQServer server; + private final ManagementService managementService; + private Map federations = new HashMap<>(); private State state; @@ -47,6 +50,7 @@ enum State { public FederationManager(final ActiveMQServer server) { this.server = server; + this.managementService = server.getManagementService(); //TODO should this be passed in the constructor? } @Override @@ -55,6 +59,7 @@ public synchronized void start() throws ActiveMQException { deploy(); for (Federation federation : federations.values()) { federation.start(); + managementService.registerFederation(federation); } state = State.STARTED; } @@ -67,6 +72,7 @@ public synchronized void stop() { for (Federation federation : federations.values()) { federation.stop(); + managementService.unregisterFederation(federation.getConfig().getName()); } federations.clear(); state = State.STOPPED; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 3e543b5b257a..daac0fad6006 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -41,9 +41,13 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; @@ -124,6 +128,22 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, void unregisterBridge(String name) throws Exception; + void registerFederation(Federation federation); + + void unregisterFederation(String name); + + void registerFederationStream(FederationStream federationStream); + + void unregisterFederationStream(SimpleString federationName, SimpleString streamName); + + void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer); + + void unregisterFederationRemoteConsumer(SimpleString name, + SimpleString streamName, + SimpleString address, + SimpleString queueName, + RoutingType routingType); + void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception; void unregisterCluster(String name) throws Exception; @@ -145,4 +165,8 @@ void registerBroadcastGroup(BroadcastGroup broadcastGroup, Object getAttribute(String resourceName, String attribute, SecurityAuth auth); Object invokeOperation(String resourceName, String operation, Object[] params, SecurityAuth auth) throws Exception; + + void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception; + + void unregisterBrokerConnection(String name) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 113418a5cc8b..586b5de705e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -52,9 +52,11 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; +import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl; import org.apache.activemq.artemis.api.core.management.DivertControl; +import org.apache.activemq.artemis.api.core.management.FederationControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; @@ -67,9 +69,13 @@ import org.apache.activemq.artemis.core.management.impl.BaseBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl; import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl; +import org.apache.activemq.artemis.core.management.impl.BrokerConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ConnectionRouterControlImpl; import org.apache.activemq.artemis.core.management.impl.DivertControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationRemoteConsumerControlImpl; +import org.apache.activemq.artemis.core.management.impl.FederationStreamControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.JGroupsFileBroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; @@ -88,12 +94,16 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; @@ -418,6 +428,19 @@ public void unregisterBroadcastGroup(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name); } + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception { + BrokerConnectionControl control = new BrokerConnectionControlImpl(brokerConnection, storageManager); + registerInJMX(objectNameBuilder.getBrokerConnectionObjectName(brokerConnection.getName()), control); + registerInRegistry(ResourceNames.BROKER_CONNECTION + brokerConnection.getName(), control); + } + + @Override + public void unregisterBrokerConnection(String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getBrokerConnectionObjectName(name)); + unregisterFromRegistry(ResourceNames.BROKER_CONNECTION + name); + } + @Override public void registerBridge(final Bridge bridge) throws Exception { bridge.setNotificationService(this); @@ -432,6 +455,85 @@ public void unregisterBridge(final String name) throws Exception { unregisterFromRegistry(ResourceNames.BRIDGE + name); } + @Override + public void registerFederation(final Federation federation) { + try { + + FederationControl control = new FederationControlImpl(federation, storageManager); + registerInJMX(objectNameBuilder.getFederationObjectName(federation.getConfig().getName()), control); + registerInRegistry(ResourceNames.FEDERATION + federation.getName(), control); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void unregisterFederation(final String name) { + try { + + unregisterFromJMX(objectNameBuilder.getFederationObjectName(name)); + unregisterFromRegistry(ResourceNames.FEDERATION + name); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void registerFederationStream(final FederationStream federationStream) { + try { + + logger.info("Registering federationStream {} for {} with hashcode {}", federationStream.getName(), federationStream.getFederation().getName(), federationStream.hashCode()); + FederationStreamControlImpl control = new FederationStreamControlImpl(federationStream, storageManager); + registerInJMX(objectNameBuilder.getFederationStreamObjectName(federationStream.getFederation().getName(), federationStream.getName()), control); + registerInRegistry(ResourceNames.FEDERATION_STREAM + federationStream.getName(), control); + } catch (Exception e) { + throw new RuntimeException("Error registering: " + federationStream.getName() + " for " + federationStream.getFederation().getName(), e); + } + } + + @Override + public void unregisterFederationStream(final SimpleString federationName, final SimpleString streamName) { + try { + + unregisterFromJMX(objectNameBuilder.getFederationStreamObjectName(federationName, streamName)); + unregisterFromRegistry(ResourceNames.FEDERATION_STREAM + streamName); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void registerFederationRemoteConsumer(final FederatedQueueConsumer federatedQueueConsumer) { + try { + + FederationRemoteConsumerControlImpl control = new FederationRemoteConsumerControlImpl(federatedQueueConsumer, storageManager); + + registerInJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(federatedQueueConsumer.getFederation().getName(), federatedQueueConsumer.getFederationUpstream().getName(), federatedQueueConsumer.getKey().getAddress(), federatedQueueConsumer.getKey().getQueueName(), federatedQueueConsumer.getKey().getRoutingType()), control); + registerInRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + federatedQueueConsumer.getKey().getQueueName(), control); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + + @Override + public void unregisterFederationRemoteConsumer(final SimpleString name, + final SimpleString streamName, + final SimpleString address, + final SimpleString queueName, + RoutingType routingType) { + try { + unregisterFromJMX(objectNameBuilder.getFederationRemoteConsumerObjectName(name, streamName, address, queueName, routingType)); + unregisterFromRegistry(ResourceNames.FEDERATION_REMOTE_CONSUMER + name); + } catch (Exception e) { + //TODO + throw new RuntimeException(e); + } + } + @Override public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception { ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index e2ffb3f23195..ccde63d13dfa 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -45,9 +45,13 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer; +import org.apache.activemq.artemis.core.server.federation.Federation; +import org.apache.activemq.artemis.core.server.federation.FederationStream; import org.apache.activemq.artemis.core.server.management.GuardInvocationHandler; import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; import org.apache.activemq.artemis.core.server.cluster.Bridge; @@ -316,6 +320,40 @@ public void unregisterBridge(String name) throws Exception { } + @Override + public void registerFederation(Federation federation) { + + } + + @Override + public void unregisterFederation(String name) { + + } + + @Override + public void registerFederationStream(FederationStream federationStream) { + + } + + @Override + public void unregisterFederationStream(SimpleString federationName, SimpleString streamName) { + + } + + @Override + public void registerFederationRemoteConsumer(FederatedQueueConsumer federatedQueueConsumer) { + + } + + @Override + public void unregisterFederationRemoteConsumer(SimpleString name, + SimpleString streamame, + SimpleString address, + SimpleString queueName, + RoutingType routingType) { + + } + @Override public void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception { @@ -372,6 +410,16 @@ public Object invokeOperation(String resourceName, String operation, Object[] pa return null; } + @Override + public void registerBrokerConnection(BrokerConnection brokerConnection) { + + } + + @Override + public void unregisterBrokerConnection(String name) { + + } + @Override public void start() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index aaa50c78370e..7321c68a183e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -6092,6 +6092,12 @@ public boolean isStarted() { public BrokerConnectConfiguration getConfiguration() { return null; } + + @Override + public boolean isOpen() { + return false; + } + } Fake fake = new Fake("fake" + UUIDGenerator.getInstance().generateStringUUID()); server.registerBrokerConnection(fake);