Modules, Data flow connections, and Configuration

Modules

The DAQling building block is the "Module", a plugin that can be loaded at run-time by the bare daqling executable, which then acquires its functionalities.

The bare daqling executable (or "Core") includes the plugin loading mechanism, connection manager, configuration, and logging.

A Module is a class, inheriting a set of default methods from the DAQProcess base class, such as the runner() or start(int run_num) / stop().

The Module developer can extend its functionalities by defining specific behavior for the default methods, add methods and register custom commands.

Default FSM

         --configure-->         --start-->
(booted)                (ready)            (running)
         <-unconfigure-         <--stop--- 

Configuration

A DAQling JSON configuration is composed of any number of "components".

A component is a daqling executable with one or more Modules loaded. An example of the standard JSON configuration for one component with a single module is:

{
  "name": "mymodule01",
  "host": "localhost",
  "port": 5555,
  "modules":[
    {
      "type": "MyModule",
      "settings": {
      "my_value": 42
      },
      "connections": {
        "senders": [ ... ],
        "receivers": [ ... ]
      }
    }
  ],
  "loglevel": {"core": "INFO", "module": "DEBUG"},
}
  • "host": defines the machine where the component is spawned.
  • "port": is used for the command server of the component. Configuration and commands are sent to this socket.
  • "modules": is an array of the modules to be loaded by the component.
  • "type": is the exact name (case sentitive) of the Module to be loaded.
  • "loglevel": allows the definition of different logging levels between the Core application and its Module part.
  • "settings": include all configuration parameters that are specific to the specified Module
  • "connections": contains the configuration for the sender and receiver sockets (more in Data flow)

The file configs/demo.json shipped with DAQling, describes a demo data acquisition system composed of two event data producers ("ReadoutInterface"), one "EventBuilder" and one "FileWriter" and can be used as an example.

FileWriter module

The FileWriter is a performant file writer with multi-channel and file rotation features.

The configurable parameters are: * number of receive channels in "connections": {"receivers": [...]}. Data from different channels is written to different files. * maximum file size for rotation with "max_filesize":. Defaults to 1 GB. * amount of data to buffer before flushing to disk "buffer_size":. Defaults to 4 kB. * filename pattern, including destination path. Mandatory field, e.g. "/tmp/test-%D.%r.%c.%n.bin". %D is the full date, %r is the run number, %c is the channel identifier, and %n is the n-th generated file with rotation.

Data flow connections

Modules are connected via socket-like connection objects. The supported DAQling transports are TCP/IP (in-host and inter-host), UDP (in-host and inter-host) and IPC via shared memory (in-host only). Furthermore if the Modules to communicate are loaded in the same component: ZMQ InProc and a producer-consumer queue.

Connections can be configured in two directions relative to the Module: "senders": and "receivers": and they should always be part of a sender/receiver group, between two (or more) modules.

Receiver and sender connections in a single Module must have unique channel identifiers ("chid":) respectively. Both receiver and sender "chid"s start from 0.

In order to send and receive Binary chunks of data using the configured channels, the following methods are available. Where uint64_t chn must be the unique channel identifier "chid": from the configuration file for sender and receiver respectively.

In the following template methods, T must be a subclass of the DataType class. (see the Datatypes section)

For non-blocking calls:

template <class T>
bool ConnectionSubManager::send(uint64_t chn, T &msgBin)
template <class T>
bool ConnectionSubManager::receive(uint64_t chn, T &bin)

These methods will immediately return whether the call succeeded.

For a timed blocking call the following methods are available:

template <class T>
bool ConnectionSubManager::sleep_send(uint64_t chn, T &msgBin)
template <class T>
bool ConnectionSubManager::sleep_receive(uint64_t chn, T &bin)

These methods will wait for ‘ms’ milliseconds, and return whether the method succeeded. These methods can be useful to prevent throttling, while also enabling optimal use of the underlying connection API’s.

‘ms’ is set by the methods:

void ConnectionSubManager::set_sender_sleep_duration(uint64_t chn, uint ms)
void ConnectionSubManager::set_receiver_sleep_duration(uint64_t chn, uint ms)

To access the assigned ConnectionSubManager from within a DAQling module, use the m_connections attribute derived from the DAQProcess class.

This is an instance of the ConnectionSubManager and can be used like:

m_connections.set_sender_sleep_duration(...)
m_connections.sleep_send(...)

ZMQ

The ZMQ DAQling connections support two communication patterns: "Pair" or "Publish/Subscribe". The main difference is that a Pair is exclusive between two sockets while the Publish/Subscribe can publish to single or multiple subscribers and subscribe to a single publisher.

ZMQ TCP/IP

Connection pairs must share the same "port": and "host":. Note that "senders": are binding to one or multiple interfaces relative to the host the component is running on. Therefore the "host" field can either be set to "*" (binding to all interfaces of the host) or the name or IP of a specific interface (e.g. "eth0" or "192.168.1.1").

An example of sender/receive pair configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "name":"example01",
      "connections": {
        "senders": [
          {
            "type": "ZMQPair",
            "chid": 0,
            "transport": "tcp",
            "host": "*",
            "port": 8101
          }
        ]
      }
    }
  ]
},
{
  "name": "example02"
  ...
  "modules":[
    {
      ...
      "connections": {
        "receivers": [
          {
            "type": "ZMQPair",
            "chid": 0,
            "transport": "tcp",
            "host": "localhost",
            "port": 8101
          }
        ]
      }
    }
  ]
}

An example of single publisher and multiple subscribers is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "connections": {
        "senders": [
          {
            "type": "ZMQPubSub",
            "chid": 0,
            "transport": "tcp",
            "host": "*",
            "port": 8101
          }
        ]
      }
    }
  ]
},
{
  "name": "example02"
  ...
  "modules":[
    {
      "connections": {
        "receivers": [
          {
            "type": "ZMQPubSub",
            "chid": 0,
            "connections":[
              {
                "transport": "tcp",
                "host": "localhost",
                "port": 8101
              }
            ]
          }
        ]
      }
    }
  ]
},
{
  "name": "example03"
  ...
  "modules":[
    {
      "connections": {
        "receivers": [
          {
            "type": "ZMQPubSub",
            "chid": 0,
            "connections":[
              {
                "transport": "tcp",
                "host": "localhost",
                "port": 8101
              }
            ]
          }
        ]
      }
    }
  ]
}

Subscribers connections support filtering of published messages. The two optional configuration fields filter (integer) and filter_size (integer) are used to compare the first filter_size bytes of the message with filter itself. In case of equality, the message is passed to ConnectionSubManager::receive() and otherwise dropped.

Also note that the Publish/Subscribe pattern does not guarantee reliable delivery by design and therefore, in case of high throughput, sockets might silently drop messages.

ZMQ IPC via shared memory

Connection pairs must belong to components spawned on the same "host": and share the same "path":.

An example of sender/receive pair configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "connections": {
        "senders": [
          {
            "type": "ZMQPair",
            "chid": 0,
            "transport": "ipc",
            "path": "/tmp/feed0",
          }
        ]
      }
    }
  ]
},
{
  "name": "example02"
  ...
  "modules":[
    {
      ...
      "connections": {
        "receivers": [
          {
            "type": "ZMQPair",
            "chid": 0,
            "transport": "ipc",
            "path": "/tmp/feed0",
          }
        ]
      }
    }
  ]
}

Same applies to Publish/Subscribe pattern with multiple connections.

ZMQ InProc

Connection pairs must be loaded in the same component. There is no check of this on the framework level, so this is up to the user to assert.

Connection pairs must share the same "endpoint": which ZMQ uses to identiy which sockets should communicate.

Also the connection pairs must share the same "id:" since this is used by DAQling to load the same zmq_context for both the sender and receiver, which is required for InProc communication.

An example of InProc sender/receive pair configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      "name": "examplemodule01",
      ...
      "connections": {
        "senders": [
        {
          "type": "ZMQPair",
          "chid": 0,
          "id":1,
          "transport": "inproc",
          "endpoint":"localEndpoint1"
        }
        ]
      }
    },
    {
      "name": "examplemodule02",
      ...
      "connections": {
        "receivers": [
        {
          "type": "ZMQPair",
          "chid": 0,
          "id":1,
          "transport": "inproc",
          "endpoint":"localEndpoint1"
        }
        ]
      }
    }
  ],
  "resources":[
    {
      "type": "zmq_context",
      "id": 1
    }
  ]
}

Same applies to Publish/Subscribe pattern with multiple connections.

Boost Asio UDP/IP

Connection pairs must share the same "dest_port": and "host":. The "host" field must be set to a specific IP (e.g. "127.0.0.1").

An example of UDP sender/receive pair configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "connections": {
        "senders": [
          {
            "type": "BoostAsioUdp",
            "chid": 0,
            "transport": "udp",
            "host": "127.0.0.1",
            "dest_port": 8101
          }
        ]
      }
    }
  ]
},
{
  "name": "example02"
  ...
  "modules":[
    {
      ...
      "connections": {
        "receivers": [
          {
            "type": "BoostAsioUdp",
            "chid": 0,
            "transport": "udp",
            "host": "127.0.0.1",
            "dest_port": 8101
          }
        ]
      }
    }
  ]
}

The BoostAsioUDP connection type also has the option to add a "src_port": field, which could potentially be used to filter between different sources on the receiver side. This would however require an extension of the BoostAsioUDPReceiver class.

In-process queue

Local connection pairs must be loaded in the same component. There is no check of this on the framework level, so this is up to the user to assert.

This connection uses an instance of a DAQling queue object.

An example of a local queue sender/receive pair configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      "name": "examplemodule01",
      ...
      "connections": {
        "senders": [
        {
          "type": "local",
          "chid": 0,
          "id":1
        }
        ]
      }
    },
    {
      "name": "examplemodule02",
      ...
      "connections": {
        "receivers": [
        {
          "type": "local",
          "chid": 0,
          "id":1
        }
        ]
      }
    }
  ],
  "resources":[
    {
        "type":"queue",
        "id":1,
        "resource":{
            "type":"FollyProducerConsumer",
            "queue_size":1000
        }
    }
  ]
}

Queues

DAQling also provides support for adding a "Single-Producer/Single-Consumer" queue to a connection.

The queue will be plugged in-between the ConnectionSubManager and the actual connection. All send or receive calls will respectively write to or read from the queue. A separate thread will take binaries from the Queue, and pass to the send method of the connection for senders. For receivers, the thread will call receive on the connection, and put the resulting binaries into the queue.

In general, using a queue along with the connection tends to improve performance. The buffer functionality provided by the queue allows DAQling processes to run without being affected by fluctuations in the amount of instantaneous traffic.

Folly ProducerConsumer

The Folly queue requires a "queue_size": field.

An example of Folly queue configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "connections": {
        "senders": [
          {
            "type": "<some connection type>",
            "chid": 0,
            ...
            "queue":
            {
              "type": "FollyProducerConsumer",
              "queue_size": 1000
            }
          }
        ]
      }
    }
  ]
}

Moodycamel ReaderWriter

The Moodycamel queue requires a "queue_size": field.

An example of Moodycamel queue configuration is:

{
  "name": "example01"
  ...
  "modules":[
    {
      ...
      "connections": {
        "senders": [
          {
            "type": "<some connection type>",
            "chid": 0,
            ...
            "queue":
            {
              "type": "MoodyReaderWriter",
              "queue_size": 1000
            }
          }
        ]
      }
    }
  ]
}