// Copyright(c) 2022 to 2023 ZettaScale Technology and others // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License v. 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License // v. 1.0 which is available at // http://www.eclipse.org/org/documents/edl-v10.php. // // SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause #include #include #include #include #include "dds/dds.h" #include "dynsub.h" // Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data. // The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it // comes in two forms: MinimalTypeObject and CompleteTypeObject. Only the latter includes field names, so // that's what need. // // Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by // IDLC. So instead of including yet another copy, we simply refer to those. These files are not (yet?) // part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names // of the relevant header files. // // The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the // corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation. Rather than casting // pointers like we do here, they should be defined in a slightly different way so that they are not really // opaque. For now, this'll have to do. #include "dds/ddsi/ddsi_xt_typeinfo.h" // For convenience, the DDS participant is global static dds_entity_t participant; // Helper function to wait for a DCPSPublication to show up with the desired topic name, then calls // dds_find_topic to create a topic for that data writer's type up the retrieves the type object. static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj) { const dds_entity_t waitset = dds_create_waitset (participant); const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL); const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE); (void) dds_waitset_attach (waitset, dcpspublication_readcond, 0); const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout; dds_return_t ret = DDS_RETCODE_OK; *xtypeobj = NULL; while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, NULL, 0, abstimeout) > 0) { void *epraw = NULL; dds_sample_info_t si; if (dds_take (dcpspublication_reader, &epraw, &si, 1, 1) <= 0) continue; dds_builtintopic_endpoint_t *ep = epraw; const dds_typeinfo_t *typeinfo = NULL; // We are only interested in DCPSPublications where the topic name matches and that carry type information // (a non-XTypes capable DDS would not provide type information) because without that information there is // no way we can do anything interesting with it. if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo) { // Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does // require that topic discovery is enabled in the configuration. The advantage of using dds_find_topic // is that it creates a topic with the same name, type *and QoS*. That distinction only matters if // topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent: // - using a different topic QoS might result in an incompatible QoS notification if topic discovery is // enabled (everything would still work). // - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters // // So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS // as an approximation of the topic QoS. if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0) { fprintf (stderr, "dds_find_topic: %s ... continuing on the assumptions that topic discovery is disabled\n", dds_strretcode (*topic)); dds_topic_descriptor_t *descriptor; if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0) { fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret)); dds_return_loan (dcpspublication_reader, &epraw, 1); goto error; } if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0) { fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic)); dds_delete_topic_descriptor (descriptor); dds_return_loan (dcpspublication_reader, &epraw, 1); goto error; } dds_delete_topic_descriptor (descriptor); } // The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data if ((*xtypeobj = load_type_with_deps (participant, typeinfo)) == NULL) { fprintf (stderr, "loading type with all dependencies failed\n"); dds_return_loan (dcpspublication_reader, &epraw, 1); goto error; } } dds_return_loan (dcpspublication_reader, &epraw, 1); } if (*xtypeobj) { { struct ppc ppc; ppc_init (&ppc); ppc_print_to (&ppc, &(*xtypeobj)->_u.complete); } // If we got the type object, populate the type cache size_t align, size; build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size); fflush (stdout); struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info; if ((info = type_cache_lookup (&templ)) != NULL) { assert (info->release == NULL); info->release = *xtypeobj; } else { // not sure whether this is at all possible info = malloc (sizeof (*info)); *info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size }; type_cache_add (info); } } error: dds_delete (dcpspublication_reader); dds_delete (waitset); return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; } int main (int argc, char **argv) { dds_return_t ret = 0; dds_entity_t topic = 0; if (argc != 2) { fprintf (stderr, "usage: %s topicname\n", argv[0]); return 2; } participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL); if (participant < 0) { fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant)); return 1; } // The one magic step: get a topic and type object ... DDS_XTypes_TypeObject *xtypeobj; type_cache_init (); if ((ret = get_topic_and_typeobj (argv[1], DDS_SECS (10), &topic, &xtypeobj)) < 0) { fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret)); goto error; } // ... given those, we can create a reader just like we do normally ... const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL); // ... and create a waitset that allows us to wait for any incoming data ... const dds_entity_t waitset = dds_create_waitset (participant); const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE); (void) dds_waitset_attach (waitset, readcond, 0); while (1) { (void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY); void *raw = NULL; dds_sample_info_t si; if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0) goto error; else if (ret != 0) { // ... that we then print print_sample (si.valid_data, raw, &xtypeobj->_u.complete); if ((ret = dds_return_loan (reader, &raw, 1)) < 0) goto error; } } error: type_cache_free (); dds_delete (participant); return ret < 0; }