Custom connections & queues

DAQling provides support for implementing custom connection- and queue-types. Custom connections, must be derived from the Sender and Receiver classes, while custom queues must inherit the Queue class, all in the daqling::core namespace.

The DataTypeWrapper class

Queues and connections in DAQling use the DataTypeWrapper class, for passing around data. The DataTypeWrapper is a class wrapping an arbitrary subclass of DataType. The functionality of DataTypeWrapper is to erase the subtype of DataType while still being a concrete class, unlike the abstract DataType class. This allows the queues to store DataTypeWrapper objects.

An overview of the functions in the DataTypeWrapper class to be used by connections can be seen in the line below. For further information on how to use the DataTypeWrapper, view the DAQling connections and queues.

void reconstruct_or_store(void *data, size_t size)

The reconstruct_or_store method, either reconstructs the internal DataType object (if such exists) or stores the raw data until the DataType subtype is known (when using queues along with a receiver, the DataType won't be known on the 'writing side' of the queue, but once the data has been transferred to the 'receiving side' where the type is known, the DataType object will be reconstructed).

DataType *getDataTypePtr()

The getDataTypePtr method, returns a pointer to the internal DataType object. It can be used to retrive the data to be sent, and call the detach method or other DataType methods which are required for the communication protocol.

DataTypeWrapper also has some conversion constructors and a move constructor + assignment functions, to provide for passing the DataTypeWrapper objects between connections and queues. For these, refer to the Common/DataType.hpp in DAQling.

Connections

Methods

Custom senders and receivers must respectively override:

bool send(DataTypeWrapper &bin);
bool receive(DataTypeWrapper &bin);

These methods are responsible for implementing the transport protocol.

Along with an optional sleeping version:

bool sleep_send(DataTypeWrapper &bin);
bool sleep_receive(DataTypeWrapper &bin);

Furthermore, the following methods can be overridden for both classes:

virtual void set_sleep_duration(uint ms);
virtual bool start();
virtual bool stop();

The start and stop methods will be called when the corresponding DAQling command is called. The set_sleep_duration method, is used to set the duration the sleeping send and receive should maximally block before returning.

Registration of connections

The custom connections are dynamically loaded by DAQling. This loading is based on the name of the connection, hence the following folder structure/naming convention is required:

/Daqling/src/Connections/<connection_name>/<connection_name>Sender.cpp
/Daqling/src/Connections/<connection_name>/<connection_name>Sender.hpp
/Daqling/src/Connections/<connection_name>/<connection_name>Receiver.cpp
/Daqling/src/Connections/<connection_name>/<connection_name>Receiver.hpp
/Daqling/src/Connections/<connection_name>/CMakeLists.txt

Add a subfolder in the Connections folder. Give it the name of your connection.

In the subfolder add header + source files for the connections. These must have the name of the connection + Sender/Receiver.

In the source files you must use the REGISTER_RECEIVER or REGISTER_SENDER macros. These must be used in the following way:

REGISTER_RECEIVER(<class_name>)
REGISTER_SENDER(<class_name>)

Example

Below is a snippet of the DAQling Dummy connection, which is a simplified example of a connection. To find inspiration from the entire Dummy connection implementation, or other connection implementations, view the Connections/ folder.

DummyReceiver.hpp:

class DummyReceiver : public daqling::core::Receiver {
public:
  DummyReceiver(uint chid, const nlohmann::json &j = NULL);

protected:
  bool receive(DataTypeWrapper &bin) override;
};

DummyReceiver.cpp:

using namespace daqling::connection;

REGISTER_RECEIVER(DummyReceiver)
DummyReceiver::DummyReceiver(uint chid, const nlohmann::json &j) : daqling::core::Receiver(chid) {}
bool DummyReceiver::receive(DataTypeWrapper &bin) {
  bin.getDataTypePtr()->detach();
  ERS_DEBUG(0, "Hello from DummyReceiver protocolHandler");
  return true;
}

Queues

Methods

Custom queue implementations must override the following functions from the Queue base class:

bool read(DataTypeWrapper &);
bool write(DataTypeWrapper &);
uint sizeGuess();
uint capacity();

read should peek the front item in the queue, copy/move it into the DataTypeWrapper argument, and pop the front item. The return value indicates success.

write should push the DataTypeWrapper argument to the queue. The return value indicates success.

SizeGuess should return the estimated number of items in the queue.

Capacity should return the maximum number of items in the queue.

Furthermore, the following methods can optionally be overridden:

bool sleep_read(DataTypeWrapper &);
bool sleep_write(DataTypeWrapper &);
void set_sleep_duration(uint ms);

The sleep_read/write methods should be blocking variants of the read and write methods. However, with a maximum blocking period of the duration set in the set_sleep_duration method.

Registration of queues

Queues are registered in a similar way to connections.

Custom queues should be put in the Queues/ folder in a subfolder with the name of the queue.

The folder should contain the following:

/Daqling/src/Queues/<queue_name>/<queue_name>.cpp
/Daqling/src/Queues/<queue_name>/<queue_name>.hpp
/Daqling/src/Queues/<queue_name>/CMakeLists.txt

The CMakeLists.txt file can be copied from an existing queue implementation, remember to change the source files in the CMakeLists.txt.

In the source files you must use the REGISTER_QUEUE macro. This must be used in the following way:

REGISTER_QUEUE(<class_name>)

Example

Below is a snippet of the DAQling "FollyProducerConsumer" queue implementation. To find inspiration from the entire Folly queue implementation, or other queue implementations, view the Queues/ folder.

FollyProducerConsumer.hpp:

namespace daqling {
namespace queue {
class FollyProducerConsumer : public daqling::core::Queue {
public:
  FollyProducerConsumer(const nlohmann::json &j);
  bool read(DataTypeWrapper & /*bin*/) override;
  bool write(DataTypeWrapper & /*bin*/) override;
  uint sizeGuess() override;
  uint capacity() override;

protected:
private:
  folly::ProducerConsumerQueue<DataTypeWrapper> m_queue;
};
} // namespace queue
} // namespace daqling

FollyProducerConsumer.cpp:

#include "FollyProducerConsumer.hpp"

using namespace daqling::queue;
REGISTER_QUEUE(FollyProducerConsumer)
FollyProducerConsumer::FollyProducerConsumer(const nlohmann::json &j)
    : m_queue(j.at("queue_size").get<unsigned int>()) {}
bool FollyProducerConsumer::read(DataTypeWrapper &bin) {
  return m_queue.read(bin);
}
bool FollyProducerConsumer::write(DataTypeWrapper &bin) {
  return m_queue.write(std::move(bin));
}
uint FollyProducerConsumer::sizeGuess() { return m_queue.sizeGuess(); }
uint FollyProducerConsumer::capacity() { return m_queue.capacity(); }

JSON Schema

To be able to use custom connections and queues, they must be registered in the json validation schema.

To do this, an entry should be made in the configs/schemas/refs/connection-schema.json file.

The json-schema for the connection should be added to definitions. Afterwards, a reference to it should be added to definitions/<type>

definitions/queues for queues, definitions/senders and definitions/receivers for connections.

A sender and receiver type can often have the same required fields, and the connection schema can thus be a single schema, which is added to both definition arrays.

For examples, refer to the existing schemas in configs/schemas/refs/connection-schema.json.

Implementation tips

Using json in setup

Custom implementations of Queue, Receiver and Sender must provide a constructor taking a const nlohmann::json & as argument. This will contain the JSON object described in the configurations file used to run DAQling.

Example for Sender/Receiver.

JSON config file:

{
    "name": "example01"
    ...
    "connections": {
        "senders": [
        {
            "type": "ZMQPair",
            "chid": 0,
            "transport": "ipc",
            "path": "/tmp/feed0",
        }
        ]
    }
}

ZMQPairSender constructor:

try {
    std::string connStr;
    if (j.at("transport") == "ipc") {
    connStr = "ipc://" + j.at("path").get<std::string>();
    } else if (j.at("transport") == "tcp") {
    connStr = "tcp://" + j.at("host").get<std::string>() + ":" +
                std::to_string(j.at("port").get<uint>());
    } else {
    throw InvalidTransportType(ERS_HERE, j.at("transport").get<std::string>().c_str());
    }
}

Notice how the constructor accesses the fields of the json connection object that it is passed.

Resources

A resource in DAQling is any class inheriting from the blank daqling::utilities::Resource interface.

The reason for the blank interface is to allow modules to share istances of an arbitrary type, thus providing more flexible and possibly efficient means of resource sharing, than via the daqling connections.

However, to allow for the sharing of resources between modules, the modules must live in the same process, meaning they should be created by the same daqling executable.

Resource factory

DAQling introduces the singleton class daqling::core::ResourceFactory. The purpose of this class is to create and manage resources to be accessed by modules in a daqling executable.

The creation of a resource is a three step process.

First, the Resource factory must load the library containing the resource. This is done using the typename of the resource. (If the library has already been loaded, this step is skipped.)

Second, the Resource factory constructs the object passing a json object with possible configuration settings to its constructor.

Third, the Resource factory puts a shared pointer to the created instance, into a resource map, identifying each created resource with an ID.

From the resource map, modules can request a shared pointer to a certain resource in the map, by specifying its ID. This is done using the getResource method:

std::shared_ptr<daqling::utilities::Resource> getResource(unsigned id);

Registering custom resources

To register custom resources, one has to create a class inherriting from the daqling::utilities::Resource class, as mentioned earlier.

This class must be put in a folder with a specific structure and CMakeLists.txt file. Refer to the daqling Resources/Dummy folder for an example.

The folder must have the name of the resource type, and containg .cpp and .hpp files for the resource class.

/Resources/<ResourceType>/<ResourceType>.hpp
/Resources/<ResourceType>/<ResourceType>.cpp

The last action to be taken, is to invoke the REGISTER_RESOURCE macro, within the source file of the resource to be registered:

REGISTER_RESOURCE(<resource_class_name>)

Specifying creation of resources

To have a resource created within a DAQling instance, it must be specified in the "resources" field of the json configuration file:

{
            "name": "daqling_instance_name",
            ...
            "modules": [ ...
            ],
            "resources": [
                {
                    "type": "zmq_context",
                    "id": 1
                },
                {
                    "type":"queue",
                    "id":2,
                    "resource":{
                        "type":"FollyProducerConsumer",
                        "data_type":"DataFragment<daqling::utilities::Binary>",
                        "queue_size":1000
                    }
                },
                {
                    "type":"my_custom_resource",
                    "id":3,
                    "resource":{
                        "settings1":"some_value",
                        "settings2":"some_other_value"
                    }
                }
            ]
        }
    ]
}

The "id" field must specify a unique ID, which the modules must supply to gain acces to the resource.

The "type" field is the class to be instantiated.

The "resource" field is a json object, to be passed to the constructor when creating the resource.

Accessing resources from a module or connection

To access a resource from a module or connection, the resource must be registered, and created from specifications in the configuration. If these two steps have not been performed, the resource will not be available!

To access a resource, get a reference to the singleton ResourceFactory:

auto Resource_factory = daqling::core::ResourceFactory::instance();

Then invoke the getResource method, with the ID of the requested resource:

auto my_resource = Resource_factory.getResource(<id>);

To gain access the methods of a custom resource, it must be downcasted to the specific type:

std::shared_ptr<MyResourceType> my_downcasted_resource = std::dynamic_pointer_cast<MyResourceType>(my_resource);

The above statements can be joined into a single or two statements, but are seperated here for clarity.

If two or more modules/connections call getResource with the same ID, they will all have a shared pointer to the same instance.

Thus concurrent access to the resource needs to be considered.