6#include "proto/main.pb.h"
7#include <protocol/transport.h>
9#include <net/settings.h>
10#include <protocol/protocol.h>
11#include <protocol/protocol_oob.h>
12#include <protocol/registry.h>
13#include <run/run_manager.h>
14#include <utils/StringPrint.h>
15#include <utils/logging.h>
16#include <utils/serial_lines.h>
22#include <utils/streaming_json.h>
25 unsigned int start = 0, end = strlen(str) - 1;
28 while (isspace(str[start])) {
33 while (end > start && isspace(str[end])) {
38 if (start > 0 || end < (strlen(str) - 1)) {
39 memmove(str, str + start, end - start + 1);
40 str[end - start + 1] =
'\0';
44void msg::Broker::handleMessage(transport::Transport &
transport, pb_Envelope &in_envelope) {
45 auto msg_stream =
transport.ctrl_output();
47 if (in_envelope.which_kind != pb_Envelope_message_v1_tag) {
48 msg_stream->report_error(
"handler only supports v1 messages!");
52 int envelope_kind = in_envelope.which_kind;
54 if (envelope_kind == pb_Envelope_generic_tag) {
55 message_kind = in_envelope.kind.generic.which_kind;
56 }
else if (envelope_kind == pb_Envelope_message_v1_tag) {
57 message_kind = in_envelope.kind.message_v1.which_kind;
59 msg_stream->report_error(
"Unknown envelope kind.");
64 auto msg_handler = msg::handlers::Registry::get().lookup(envelope_kind, message_kind);
66 msg_stream->report_error(
"Error unknown handler for message type.");
70 if (msg_handler->handle(
transport, in_envelope))
73 msg_stream->report_error(
"Error while handling streaming message.");
74 LOG_ALWAYS(
"Error while handling message.");
79bool msg::Broker::process(transport::Transport &
transport) {
81 auto result =
transport.ctrl_input()->read(envelope);
84 pb_release(pb_Envelope_fields, &envelope);
88 transport.ctrl_output()->report_error(result.err_value());
89 LOG_ALWAYS(result.err_value().c_str());
90 LOG_ALWAYS(
"Malformed input. Expecting protobuf.");
utils::SerialLineReader serial_line_reader