Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support topic instances #753

Open
wants to merge 13 commits into
base: rolling
Choose a base branch
from
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,20 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const override;

TypeSupport();

protected:
void set_members(const message_type_support_callbacks_t * members);

private:
const message_type_support_callbacks_t * members_;
const message_type_support_key_callbacks_t * key_callbacks_;
bool has_data_;
};

Expand Down
9 changes: 9 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
info->topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
Expand Down Expand Up @@ -381,6 +390,15 @@ rmw_create_client(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
info->request_topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (request_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

// Creates DataReader
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
Expand Down Expand Up @@ -384,6 +393,15 @@ rmw_create_service(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (response_fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
writer_qos.history(),
writer_qos.resource_limits());
}

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
info->response_topic_,
Expand Down
18 changes: 18 additions & 0 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ __create_dynamic_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down Expand Up @@ -659,6 +668,15 @@ __create_subscription(
return nullptr;
}

// Apply resource limits QoS if the type is keyed
if (fastdds_type->m_isGetKeyDefined &&
!participant_info->leave_middleware_default_qos)
{
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
reader_qos.history(),
reader_qos.resource_limits());
}

info->datareader_qos_ = reader_qos;

// create_datareader
Expand Down
64 changes: 64 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ TypeSupport::TypeSupport()
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
key_is_unbounded_ = false;
key_max_serialized_size_ = 0;
members_ = nullptr;
key_callbacks_ = nullptr;
has_data_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
Expand Down Expand Up @@ -57,6 +62,16 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
m_typeSize = 4 + data_size;
// Account for RTPS submessage alignment
m_typeSize = (m_typeSize + 3) & ~3;

if (nullptr != members->key_callbacks) {
key_callbacks_ = members->key_callbacks;
m_isGetKeyDefined = true;

key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
if (!key_is_unbounded_) {
key_buffer_.reserve(key_max_serialized_size_);
}
}
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
Expand Down Expand Up @@ -129,6 +144,55 @@ bool TypeSupport::deserializeROSmessage(
return true;
}

bool TypeSupport::get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
bool force_md5,
const void * impl) const
{
assert(ros_message);
(void)impl;

// retrieve estimated serialized size in case key is unbounded
if (key_is_unbounded_) {
key_max_serialized_size_ = (std::max) (
key_max_serialized_size_,
key_callbacks_->get_serialized_size_key(ros_message));
key_buffer_.reserve(key_max_serialized_size_);
}

eprosima::fastcdr::FastBuffer fast_buffer(
reinterpret_cast<char *>(key_buffer_.data()),
key_max_serialized_size_);

eprosima::fastcdr::Cdr ser(
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);

key_callbacks_->cdr_serialize_key(ros_message, ser);

const size_t max_serialized_key_length = 16;

auto ser_length = ser.get_serialized_data_length();

// check for md5
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) {
md5_.init();
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
md5_.finalize();

for (uint8_t i = 0; i < max_serialized_key_length; ++i) {
ihandle->value[i] = md5_.digest[i];
}
} else {
memset(ihandle->value, 0, max_serialized_key_length);
for (uint8_t i = 0; i < ser_length; ++i) {
ihandle->value[i] = key_buffer_[i];
}
}

return true;
}

MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t * members)
{
assert(members);
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
} else {
this->m_typeSize++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(members);
this->m_isGetKeyDefined = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}
Expand Down
14 changes: 14 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
} else {
this->m_typeSize++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
this->m_isGetKeyDefined = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}
Expand Down Expand Up @@ -92,6 +99,13 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
} else {
this->m_typeSize++;
}

if (this->members_->has_any_key_member_) {
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
this->m_isGetKeyDefined = true;
this->key_buffer_.reserve(this->key_max_serialized_size_);
}

// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}
Expand Down
44 changes: 44 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport

bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message, eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, bool force_md5,
const void * impl) const override;
};

class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
Expand Down Expand Up @@ -170,28 +174,68 @@ class TypeSupport : public BaseTypeSupport
bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;

bool get_key_hash_from_ros_message(
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5,
const void * impl) const override;

protected:
explicit TypeSupport(const void * ros_type_support);

size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment);
size_t calculateMaxSerializedKeySize(const MembersType * members);

const MembersType * members_;

private:
size_t calculateMaxSerializedSize(
const MembersType * members,
size_t current_alignment,
bool compute_key,
bool & is_key_unbounded);

size_t getEstimatedSerializedSize(
const MembersType * members,
const void * ros_message,
size_t current_alignment) const;

size_t getEstimatedSerializedKeySize(
const MembersType * members,
const void * ros_message) const;

size_t getEstimatedSerializedSize(
const MembersType * members,
const void * ros_message,
size_t current_alignment,
bool compute_key) const;

bool serializeROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message) const;

bool serializeKeyROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message) const;

bool serializeROSmessage(
eprosima::fastcdr::Cdr & ser,
const MembersType * members,
const void * ros_message,
bool compute_key) const;

bool deserializeROSmessage(
eprosima::fastcdr::Cdr & deser,
const MembersType * members,
void * ros_message) const;

bool get_key_hash_from_ros_message(
const MembersType * members,
void * ros_message,
eprosima::fastrtps::rtps::InstanceHandle_t * ihandle,
bool force_md5) const;
};

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Loading