CCF
Loading...
Searching...
No Matches
session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/node/session.h"
7#include "tcp/msg_types.h"
8
9#include <span>
10
11namespace ccf
12{
13 class ThreadedSession : public Session,
14 public std::enable_shared_from_this<ThreadedSession>
15 {
16 private:
17 size_t execution_thread;
18
19 struct SendRecvMsg
20 {
21 std::vector<uint8_t> data;
22 std::shared_ptr<ThreadedSession> self;
23 };
24
25 public:
26 ThreadedSession(int64_t thread_affinity)
27 {
28 execution_thread =
30 thread_affinity);
31 }
32
33 // Implement Session::handle_incoming_data by dispatching a thread message
34 // that eventually invokes the virtual handle_incoming_data_thread()
35 void handle_incoming_data(std::span<const uint8_t> data) override
36 {
37 auto [_, body] = ringbuffer::read_message<::tcp::tcp_inbound>(data);
38
39 auto msg = std::make_unique<::threading::Tmsg<SendRecvMsg>>(
41 msg->data.self = this->shared_from_this();
42 msg->data.data.assign(body.data, body.data + body.size);
43
45 execution_thread, std::move(msg));
46 }
47
49 std::unique_ptr<::threading::Tmsg<SendRecvMsg>> msg)
50 {
51 msg->data.self->handle_incoming_data_thread(std::move(msg->data.data));
52 }
53
54 virtual void handle_incoming_data_thread(std::vector<uint8_t>&& data) = 0;
55
56 // Implement Session::sent_data by dispatching a thread message
57 // that eventually invokes the virtual send_data_thread()
58 void send_data(std::span<const uint8_t> data) override
59 {
60 auto msg =
61 std::make_unique<::threading::Tmsg<SendRecvMsg>>(&send_data_cb);
62 msg->data.self = this->shared_from_this();
63 msg->data.data.assign(data.begin(), data.end());
64
66 execution_thread, std::move(msg));
67 }
68
69 static void send_data_cb(
70 std::unique_ptr<::threading::Tmsg<SendRecvMsg>> msg)
71 {
72 msg->data.self->send_data_thread(std::move(msg->data.data));
73 }
74
75 virtual void send_data_thread(std::vector<uint8_t>&& data) = 0;
76 };
77}
Definition session.h:11
Definition session.h:15
void handle_incoming_data(std::span< const uint8_t > data) override
Definition session.h:35
virtual void handle_incoming_data_thread(std::vector< uint8_t > &&data)=0
ThreadedSession(int64_t thread_affinity)
Definition session.h:26
void send_data(std::span< const uint8_t > data) override
Definition session.h:58
static void send_data_cb(std::unique_ptr<::threading::Tmsg< SendRecvMsg > > msg)
Definition session.h:69
static void handle_incoming_data_cb(std::unique_ptr<::threading::Tmsg< SendRecvMsg > > msg)
Definition session.h:48
virtual void send_data_thread(std::vector< uint8_t > &&data)=0
static ThreadMessaging & instance()
Definition thread_messaging.h:278
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:312
uint16_t get_execution_thread(uint32_t i)
Definition thread_messaging.h:365
Definition app_interface.h:15
Definition thread_messaging.h:27