9#include <proto/main.pb.h>
10#include <protocol/protocol_oob.h>
12#include <utils/logging.h>
13#include <utils/streaming_json.h>
19void client::RunProtobufDataHandler::handle(
const run::RunStateChange change,
20 const run::Run &
run,
const char *reason) {
23 auto& run_state_change =
msg.kind.run_state_change_message = pb_RunStateChangeMessage pb_RunStateChangeMessage_init_default;
24 run_state_change.has_run =
true;
25 std::strncpy(run_state_change.run.id,
run.id.c_str(),
sizeof(run_state_change.run.id));
26 run_state_change.run.chunk =
run.next_chunk();
28 run_state_change.has_time =
true;
29 auto& time = run_state_change.time;
30 time.prefix = pb_Prefix_NANO;
31 time.value = change.t;
33 run_state_change.old =
static_cast<pb_RunState
>(change.old);
34 run_state_change.new_ =
static_cast<pb_RunState
>(change.new_);
37 std::strncpy(run_state_change.reason, reason,
sizeof(run_state_change.reason));
40 stream->write(envelope);
48FASTRUN
bool encode_byte_array(pb_ostream_t *stream,
const pb_field_t *field,
void *
const *arg) {
49 if (!pb_encode_tag_for_field(stream, field))
52 auto ptr =
static_cast<SizedPtr*
>(*arg);
53 return pb_encode_string(stream,
static_cast<const uint8_t*
>(ptr->data), ptr->size);
56FASTRUN
void client::RunProtobufDataHandler::prepare(
const run::Run &
run,
size_t channel_stride) {
58 auto& run_data =
msg.kind.run_data_message;
59 run_data = pb_RunDataMessage pb_RunDataMessage_init_default;
60 run_data.has_entity =
true;
61 run_data.has_run =
true;
62 std::strncpy(run_data.run.id,
run.id.c_str(),
sizeof(run_data.run.id));
64 auto& entity = run_data.entity;
65 auto& entity_id = carrier::Carrier::get().get_entity_id();
69 ss.read(entity.path,
sizeof(entity.path));
71 run_data.has_data =
true;
72 auto& data = run_data.data;
75 .which_kind = pb_DataType_integer_tag,
78 .signess = pb_IntegerType_Signedness_Unsigned,
84 data.channel_stride = channel_stride;
87 auto&
carrier = carrier::Carrier::get();
88 auto& adc_channels =
carrier.get_adc_channels();
90 auto& channel_idx = data.channels_count = 0;
91 for (
size_t idx = 0; idx < adc_channels.size(); ++idx) {
92 auto& channel = adc_channels[idx];
93 if (channel.idx == carrier::ADCChannel::DISABLED)
continue;
95 constexpr double adc_discrete_gain = -0.00015265824;
96 constexpr double adc_discrete_offset = 1.25;
98 auto &pb_channel = data.channels[channel_idx];
100 pb_channel.gain = adc_discrete_gain * channel.gain;
101 pb_channel.offset = adc_discrete_offset * channel.gain + channel.offset;
102 pb_channel.probe = channel.probe;
109FASTRUN run::RunHandleResult client::RunProtobufDataHandler::handle(
const run::Run &
run, uint16_t *data,
110 size_t sample_count) {
111 if (sample_count == 0)
112 return run::RunHandleResult::ok({});
114 auto& message = envelope.kind.message_v1;
115 auto& run_data = message.kind.run_data_message;
117 auto& pb_data = run_data.data;
118 if (pb_data.channels_count == 0)
119 return run::RunHandleResult::ok({});
121 pb_data.sample_count = sample_count;
122 run_data.run.chunk =
run.next_chunk();
123 auto& values = pb_data.data;
126 .size = sample_count * pb_data.channel_stride *
sizeof(uint16_t),
127 .data =
reinterpret_cast<uint8_t*
>(data)
129 values.arg = &sized_ptr;
130 return stream->write(envelope);
133run::RunHandleResult client::RunProtobufDataHandler::handle_op_end(
const run::Run &
run,
134 std::array<std::array<float, 8>, 6> data) {
136 auto& run_data =
msg.kind.run_data_end_message;
137 run_data = pb_RunDataEndMessage pb_RunDataEndMessage_init_default;
138 run_data.has_run =
true;
139 std::strncpy(run_data.run.id,
run.id.c_str(),
sizeof(run_data.run.id));
140 run_data.run.chunk =
run.next_chunk();
142 run_data.has_entity =
true;
143 auto& entity = run_data.entity;
144 auto& entity_id = carrier::Carrier::get().get_entity_id();
145 entity_id.copy(entity.path,
sizeof(entity.path));
147 run_data.has_data =
true;
148 auto& data_pb = run_data.data;
149 data_pb.sample_count = 1;
150 data_pb.channel_stride = 48;
151 data_pb.has_type =
true;
153 .which_kind = pb_DataType_float__tag,
162 .size = 48 *
sizeof(float),
163 .data =
reinterpret_cast<void*
>(data.data())
167 auto& idx = data_pb.channels_count = 0;
168 for (; idx < 48; ++idx) {
169 auto& scaling = data_pb.channels[idx];
172 scaling.offset = 0.0;
176 data_pb.data.arg = &sized_ptr;
177 return stream->write(envelope);
pb_MessageV1 & init_v1_message(pb_Envelope &envelope, pb_size_t which_kind)
FASTRUN bool encode_byte_array(pb_ostream_t *stream, const pb_field_t *field, void *const *arg)