52 return highest | (second << 8) | (third << 16) | (lowest << 24);
62 uint64_t upperLong = upper;
63 uint64_t lowerLong = lower;
65 return upperLong | (lowerLong << 32);
69WebsocketsEndpoint::WebsocketsEndpoint(std::shared_ptr<network::TcpClient> client,
70 FragmentsPolicy fragmentsPolicy)
71 : _client(client), _fragmentsPolicy(fragmentsPolicy), _recvMode(RecvMode_Normal),
72 _streamBuilder(fragmentsPolicy == FragmentsPolicy_Notify ? true : false),
73 _closeReason(CloseReason_None) {
77WebsocketsEndpoint::WebsocketsEndpoint(
const WebsocketsEndpoint &other)
78 : _client(other._client), _fragmentsPolicy(other._fragmentsPolicy), _recvMode(other._recvMode),
79 _streamBuilder(other._streamBuilder), _closeReason(other._closeReason), _useMasking(other._useMasking) {
81 const_cast<WebsocketsEndpoint &
>(other)._client =
nullptr;
84WebsocketsEndpoint::WebsocketsEndpoint(
const WebsocketsEndpoint &&other)
85 : _client(other._client), _fragmentsPolicy(other._fragmentsPolicy), _recvMode(other._recvMode),
86 _streamBuilder(other._streamBuilder), _closeReason(other._closeReason), _useMasking(other._useMasking) {
88 const_cast<WebsocketsEndpoint &
>(other)._client =
nullptr;
92WebsocketsEndpoint &WebsocketsEndpoint::operator=(
const WebsocketsEndpoint &other) {
93 this->_client = other._client;
94 this->_fragmentsPolicy = other._fragmentsPolicy;
95 this->_recvMode = other._recvMode;
96 this->_streamBuilder = other._streamBuilder;
97 this->_closeReason = other._closeReason;
98 this->_useMasking = other._useMasking;
100 const_cast<WebsocketsEndpoint &
>(other)._client =
nullptr;
106WebsocketsEndpoint &WebsocketsEndpoint::operator=(
const WebsocketsEndpoint &&other) {
107 this->_client = other._client;
108 this->_fragmentsPolicy = other._fragmentsPolicy;
109 this->_recvMode = other._recvMode;
110 this->_streamBuilder = other._streamBuilder;
111 this->_closeReason = other._closeReason;
112 this->_useMasking = other._useMasking;
114 const_cast<WebsocketsEndpoint &
>(other)._client =
nullptr;
120void WebsocketsEndpoint::setInternalSocket(std::shared_ptr<network::TcpClient> socket) {
121 this->_client = socket;
125bool WebsocketsEndpoint::poll() {
return this->_client->poll(); }
128 uint32_t numRead = socket.read(buffer, len);
129 while (numRead ==
static_cast<uint32_t>(-1) && socket.available()) {
130 numRead = socket.read(buffer, len);
144 uint64_t extendedPayload = header.payload;
146 if (header.payload == 126) {
150 tmp = (tmp << 8) | (tmp >> 8);
151 extendedPayload = tmp;
152 }
else if (header.payload == 127) {
158 return extendedPayload;
166std::string
readData(network::TcpClient &socket, uint64_t extendedPayload) {
167 const uint64_t BUFFER_SIZE = _WS_BUFFER_SIZE;
169 std::string data(extendedPayload,
'\0');
170 uint8_t buffer[BUFFER_SIZE];
171 uint64_t done_reading = 0;
172 while (done_reading < extendedPayload && socket.available()) {
174 extendedPayload - done_reading >= BUFFER_SIZE ? BUFFER_SIZE : extendedPayload - done_reading;
178 if (!socket.available())
181 for (uint64_t i = 0; i < numReceived; i++) {
182 data[done_reading + i] =
static_cast<char>(buffer[i]);
185 done_reading += numReceived;
190void remaskData(std::string &data,
const uint8_t *
const maskingKey, uint64_t payloadLength) {
191 for (uint64_t i = 0; i < payloadLength; i++) {
192 data[i] = data[i] ^ maskingKey[i % 4];
197WebsocketsFrame WebsocketsEndpoint::_recv() {
198 auto header = readHeaderFromSocket(*this->_client);
199 if (!_client->available())
200 return WebsocketsFrame();
202 uint64_t payloadLength = readExtendedPayloadLength(*this->_client, header);
203 if (!_client->available())
204 return WebsocketsFrame();
206#ifdef _WS_CONFIG_MAX_MESSAGE_SIZE
207 if (payloadLength > _WS_CONFIG_MAX_MESSAGE_SIZE) {
208 return WebsocketsFrame();
212 uint8_t maskingKey[4];
215 readMaskingKey(*this->_client, maskingKey);
216 if (!_client->available())
217 return WebsocketsFrame();
220 WebsocketsFrame frame;
222 frame.payload =
readData(*this->_client, payloadLength);
223 if (!_client->available())
224 return WebsocketsFrame();
228 remaskData(frame.payload, maskingKey, payloadLength);
232 frame.fin = header.fin;
233 frame.mask = header.mask;
235 frame.mask_buf[0] = maskingKey[0];
236 frame.mask_buf[1] = maskingKey[1];
237 frame.mask_buf[2] = maskingKey[2];
238 frame.mask_buf[3] = maskingKey[3];
240 frame.opcode = header.opcode;
241 frame.payload_length = payloadLength;
247WebsocketsMessage WebsocketsEndpoint::handleFrameInStreamingMode(WebsocketsFrame &frame) {
248 if (frame.isControlFrame()) {
249 auto msg = WebsocketsMessage::CreateFromFrame((frame));
250 this->handleMessageInternally(
msg);
252 }
else if (frame.isBeginningOfFragmentsStream()) {
253 this->_recvMode = RecvMode_Streaming;
255 if (this->_streamBuilder.isEmpty()) {
256 this->_streamBuilder.first(frame);
258 if (this->_fragmentsPolicy == FragmentsPolicy_Notify) {
259 return WebsocketsMessage(this->_streamBuilder.type(), std::move(frame.payload), MessageRole::First);
263 }
else if (frame.isContinuesFragment()) {
264 this->_streamBuilder.append(frame);
265 if (this->_streamBuilder.isOk()) {
267 if (this->_fragmentsPolicy == FragmentsPolicy_Notify) {
268 return WebsocketsMessage(this->_streamBuilder.type(), std::move(frame.payload),
269 MessageRole::Continuation);
273 }
else if (frame.isEndOfFragmentsStream()) {
274 this->_recvMode = RecvMode_Normal;
275 this->_streamBuilder.end(frame);
276 if (this->_streamBuilder.isOk()) {
278 if (this->_fragmentsPolicy == FragmentsPolicy_Aggregate) {
279 auto completeMessage = this->_streamBuilder.build();
280 this->_streamBuilder = WebsocketsMessage::StreamBuilder(
false);
281 this->handleMessageInternally(completeMessage);
282 return completeMessage;
284 auto messageType = this->_streamBuilder.type();
285 this->_streamBuilder = WebsocketsMessage::StreamBuilder(
true);
286 return WebsocketsMessage(messageType, std::move(frame.payload), MessageRole::Last);
292 close(CloseReason_ProtocolError);
297WebsocketsMessage WebsocketsEndpoint::handleFrameInStandardMode(WebsocketsFrame &frame) {
299 if (frame.isNormalUnfragmentedMessage() || frame.isControlFrame()) {
300 auto msg = WebsocketsMessage::CreateFromFrame(std::move(frame));
301 this->handleMessageInternally(
msg);
303 }
else if (frame.isBeginningOfFragmentsStream()) {
304 return handleFrameInStreamingMode(frame);
308 close(CloseReason_ProtocolError);
312WebsocketsMessage WebsocketsEndpoint::recv() {
313 auto frame = _recv();
314 if (frame.isEmpty()) {
318 if (this->_recvMode == RecvMode_Normal) {
319 return handleFrameInStandardMode(frame);
321 return handleFrameInStreamingMode(frame);
326void WebsocketsEndpoint::handleMessageInternally(WebsocketsMessage &
msg) {
328 pong(internals::fromInterfaceString(
msg.data()));
329 }
else if (
msg.isClose()) {
331 if (internals::fromInterfaceString(
msg.data()).size() >= 2) {
332 uint16_t reason = *(
reinterpret_cast<const uint16_t *
>(
msg.data().c_str()));
333 reason = reason >> 8 | reason << 8;
336 this->_closeReason = CloseReason_GoingAway;
338 close(this->_closeReason);
342bool WebsocketsEndpoint::send(
const char *data,
const size_t len,
const uint8_t opcode,
const bool fin) {
343 return this->send(data, len, opcode, fin, this->_useMasking);
346bool WebsocketsEndpoint::send(
const std::string &data,
const uint8_t opcode,
const bool fin) {
347 return this->send(data, opcode, fin, this->_useMasking);
350bool WebsocketsEndpoint::send(
const std::string &data,
const uint8_t opcode,
const bool fin,
const bool mask,
351 const char *maskingKey) {
352 return send(data.c_str(), data.size(), opcode, fin, mask, maskingKey);
356std::string WebsocketsEndpoint::getHeader(uint64_t len, uint8_t opcode,
bool fin,
bool mask) {
357 std::string header_data;
360 auto header = MakeHeader<Header>(len, opcode, fin, mask);
361 header_data = std::string(
reinterpret_cast<char *
>(&header), 2 + 0);
362 }
else if (len < 65536) {
363 auto header = MakeHeader<HeaderWithExtended16>(len, opcode, fin, mask);
364 header.extendedPayload = (len << 8) | (len >> 8);
365 header_data = std::string(
reinterpret_cast<char *
>(&header), 2 + 2);
367 auto header = MakeHeader<HeaderWithExtended64>(len, opcode, fin, mask);
371 header_data = std::string(
reinterpret_cast<char *
>(&header), 2);
372 header_data += std::string(
reinterpret_cast<char *
>(&header.extendedPayload), 8);
378void remaskData(std::string &data,
const char *
const maskingKey,
size_t first,
size_t len) {
379 for (
size_t i = first; i < first + len; i++) {
380 data[i] = data[i] ^ maskingKey[i % 4];
384bool WebsocketsEndpoint::send(
const char *data,
const size_t len,
const uint8_t opcode,
const bool fin,
385 const bool mask,
const char *maskingKey) {
387#ifdef _WS_CONFIG_MAX_MESSAGE_SIZE
388 if (len > _WS_CONFIG_MAX_MESSAGE_SIZE) {
393 std::string message_data = getHeader(len, opcode, fin, mask);
396 message_data += std::string(maskingKey, 4);
399 size_t data_start = message_data.size();
400 message_data += std::string(data, len);
402 if (mask && memcmp(maskingKey, __TINY_WS_INTERNAL_DEFAULT_MASK, 4) != 0) {
403 remaskData(message_data, maskingKey, data_start, len);
406 this->_client->send(message_data);
410void WebsocketsEndpoint::close(CloseReason reason) {
411 this->_closeReason = reason;
413 if (!this->_client->available())
416 if (reason == CloseReason_None) {
417 send(
nullptr, 0, internals::ContentType::Close,
true, this->_useMasking);
419 uint16_t reasonNum =
static_cast<uint16_t
>(reason);
420 reasonNum = (reasonNum >> 8) | (reasonNum << 8);
421 send(
reinterpret_cast<const char *
>(&reasonNum), 2, internals::ContentType::Close,
true,
424 this->_client->close();
427CloseReason WebsocketsEndpoint::getCloseReason()
const {
return _closeReason; }
429bool WebsocketsEndpoint::ping(
const std::string &
msg) {
431 if (
msg.size() > 125) {
434 return send(
msg, ContentType::Ping,
true, this->_useMasking);
438bool WebsocketsEndpoint::ping(
const std::string &&
msg) {
440 if (
msg.size() > 125) {
443 return send(
msg, ContentType::Ping,
true, this->_useMasking);
447bool WebsocketsEndpoint::pong(
const std::string &
msg) {
449 if (
msg.size() > 125) {
452 return this->send(
msg, ContentType::Pong,
true, this->_useMasking);
456bool WebsocketsEndpoint::pong(
const std::string &&
msg) {
458 if (
msg.size() > 125) {
461 return this->send(
msg, ContentType::Pong,
true, this->_useMasking);
465void WebsocketsEndpoint::setFragmentsPolicy(FragmentsPolicy newPolicy) { this->_fragmentsPolicy = newPolicy; }
467FragmentsPolicy WebsocketsEndpoint::getFragmentsPolicy()
const {
return this->_fragmentsPolicy; }
469WebsocketsEndpoint::~WebsocketsEndpoint() {}