8#include <protocol/protocol_oob.h>
10#include <utils/logging.h>
11#include <utils/streaming_json.h>
13FLASHMEM
void client::RunStateChangeNotificationHandler::handle(
const run::RunStateChange change,
14 const run::Run &
run) {
16 envelope_out[
"type"] =
"run_state_change";
17 auto msg = envelope_out.createNestedObject(
"msg");
20 msg[
"old"] = run::RunStateNames[
static_cast<size_t>(change.old)];
21 msg[
"new"] = run::RunStateNames[
static_cast<size_t>(change.new_)];
22 serializeJson(envelope_out, target);
27FLASHMEM client::RunDataNotificationHandler::RunDataNotificationHandler(carrier::Carrier &
carrier,
31FLASHMEM
void client::RunDataNotificationHandler::handle(
volatile uint32_t *data,
size_t outer_count,
32 size_t inner_count,
const run::Run &
run) {
35 size_t inner_count_actual = inner_count;
36 size_t inner_count_desired =
run.daq_config.get_num_channels();
40 auto new_buffer_length = calculate_total_buffer_length(outer_count, inner_count_desired);
41 if (new_buffer_length < actual_buffer_length) {
43 auto new_end_of_data = calculate_outer_buffer_position(outer_count, inner_count_desired);
45 memset(str_buffer + new_end_of_data,
'\0', actual_buffer_length - new_buffer_length);
47 memcpy(str_buffer + new_end_of_data -
sizeof(
',' ), MESSAGE_END,
48 sizeof(MESSAGE_END) - 1);
50 *(str_buffer + new_end_of_data +
sizeof(MESSAGE_END) - 2) =
'\n';
52 actual_buffer_length = new_buffer_length;
53 }
else if (new_buffer_length > actual_buffer_length) {
56 LOG_ERROR(
"RunDataNotificationHandler::handle should not have to increase buffer size.")
61 auto buffer = str_buffer + BUFFER_LENGTH_STATIC;
62 size_t inner_length = 3 + inner_count_desired * 7 - 1;
63 for (
size_t outer_i = 0; outer_i < outer_count; outer_i++) {
64 for (
size_t inner_i = 0; inner_i < inner_count_desired; inner_i++) {
65 const uint32_t number = data[outer_i * inner_count_actual + inner_i];
66 const char *float_repr = daq::raw_to_str(number);
67 char *dst = buffer + outer_i * inner_length + 1 + inner_i * 7;
68 constexpr size_t len = 6;
69 char float_repr_fallback[len + 2];
73 snprintf(float_repr_fallback, len + 1,
"% 1.3f",
74 daq::raw_to_float(number, daq::details::raw_zero_offsets[inner_i]));
75 float_repr = float_repr_fallback;
77 memcpy(dst, float_repr, len);
83 target.write(str_buffer, actual_buffer_length);
88FLASHMEM
void client::RunDataNotificationHandler::stream(
volatile uint32_t *buffer, run::Run &
run) {
92FLASHMEM
void client::RunDataNotificationHandler::prepare(run::Run &
run) {
93 memcpy(str_buffer, MESSAGE_START, strlen(MESSAGE_START));
94 memcpy(str_buffer + BUFFER_IDX_ENTITY_ID,
carrier.get_entity_id().c_str(), BUFFER_LENGTH_ENTITY_ID);
98 size_t inner_count_actual =
run.daq_config.get_num_channels_min_power_of_two();
99 size_t inner_count_desired =
run.daq_config.get_num_channels();
103 size_t outer_count = daq::stream::details::BUFFER_SIZE / inner_count_actual / 2;
105 actual_buffer_length = calculate_total_buffer_length(outer_count, inner_count_desired);
106 memset(str_buffer + BUFFER_LENGTH_STATIC,
'-', actual_buffer_length - BUFFER_LENGTH_STATIC);
107 memcpy(str_buffer + BUFFER_IDX_RUN_ID,
run.id.c_str(), BUFFER_LENGTH_RUN_ID);
108 auto buffer = str_buffer + BUFFER_LENGTH_STATIC - 1;
109 for (
size_t outer_i = 0; outer_i < outer_count; outer_i++) {
111 for (
size_t inner_i = 0; inner_i < inner_count_desired; inner_i++) {
118 memcpy(buffer, MESSAGE_END,
sizeof(MESSAGE_END) - 1);
119 str_buffer[actual_buffer_length - 1] =
'\n';
122FLASHMEM
size_t client::RunDataNotificationHandler::calculate_inner_buffer_length(
size_t inner_count) {
124 return 3 + inner_count * 7 - 1;
127FLASHMEM
size_t client::RunDataNotificationHandler::calculate_outer_buffer_position(
size_t outer_count,
128 size_t inner_count) {
130 auto inner_length = calculate_inner_buffer_length(inner_count);
131 return BUFFER_LENGTH_STATIC + outer_count * inner_length;
134FLASHMEM
size_t client::RunDataNotificationHandler::calculate_total_buffer_length(
size_t outer_count,
135 size_t inner_count) {
136 auto inner_length = calculate_inner_buffer_length(inner_count);
138 return BUFFER_LENGTH_STATIC + outer_count * inner_length -
sizeof(
',' ) +
139 sizeof(MESSAGE_END) -
sizeof(
'\0' ) +
sizeof(
'\n' );
142FLASHMEM
void client::StreamingRunDataNotificationHandler::handle(uint16_t *data,
size_t outer_count,
143 size_t inner_count,
const run::Run &
run) {
144 utils::StreamingJson doc(target);
146 doc.kv(
"type",
"run_data");
149 doc.kv(
"id",
run.id);
153 doc.val(
carrier.get_entity_id());
159 for (
size_t outer = 0; outer < outer_count; outer++) {
161 for (
size_t inner = 0; inner < inner_count; inner++) {
163 const uint32_t number = data[outer * inner_count + inner];
164 const char *float_repr =
nullptr;
166 doc.json(float_repr);
170 target.printf(
"% 1.3f", daq::raw_to_float(number, daq::details::raw_zero_offsets[inner]));
175 if (outer != outer_count - 1)