Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
communicator.h
Go to the documentation of this file.
1#ifndef COMMUNICATOR_H
2#define COMMUNICATOR_H
3
4#include <cstdint>
5#include <cstring>
6#include <atomic>
7#include <string>
8
9#include "api/traits.h"
10#include "api/network/message.h"
11#include "api/util/observer.h"
12#include "api/util/debug.h"
13
14template <typename Channel>
15class Communicator: public Concurrent_Observer<typename Channel::Observer::Observed_Data, typename Channel::Observer::Observing_Condition>
16{
17
18 public:
20 typedef typename Channel::Buffer Buffer;
21 typedef typename Channel::Address Address;
22 typedef typename Channel::Port Port;
24 static constexpr const unsigned int MAX_MESSAGE_SIZE = Channel::MTU; // Maximum message size in bytes
25
26 // Constructor and Destructor
29
30 // Communication methods
31 bool send(const Message_T* message);
33
34 // Address getter
35 const Address& address() const;
36
37 // Release thread waiting for buffer
38 void release();
39
40 // Atomic variable running
41 std::atomic<bool> _running;
42
43 // Deleted copy constructor and assignment operator to prevent copying
44 Communicator(const Communicator&) = delete;
46
47 private:
48
49 using Observer::update;
50 // Update method for Observer pattern
51 void update(typename Channel::Observed* obs, typename Channel::Observer::Observing_Condition c, Buffer* buf);
52
53 private:
54 Channel* _channel;
55 Address _address;
56};
57
58/*************** Communicator Implementation *****************/
59template <typename Channel>
60Communicator<Channel>::Communicator(Channel* channel, Address address) : Observer(address.port()), _address(address) {
61 if (!channel)
62 throw std::invalid_argument("Channel cannot be null");
63
64 _channel = channel;
65 _channel->attach(this, address);
66 _running.store(true, std::memory_order_release);
67}
68
69template <typename Channel>
71 db<Communicator>(TRC) << "Communicator<Channel>::~Communicator() called for address: " << _address.to_string() << "\n";
72
73 _channel->detach(this, _address);
74 db<Communicator>(INF) << "[Communicator] Channel detached from address: " << _address.to_string() << "\n";
75}
76
77template <typename Channel>
79 db<Communicator>(TRC) << "Communicator<Channel>::send() called!\n";
80
81 if (!_running.load(std::memory_order_acquire)) {
82 db<Communicator>(WRN) << "[Communicator] Not running, skipping send!\n";
83 return false;
84 }
85
86 int result = _channel->send(_address, Address::BROADCAST, message->data(), message->size());
87 db<Communicator>(INF) << "[Communicator] Channel::send() return value " << std::to_string(result) << "\n";
88
89 return result;
90}
91
92template <typename Channel>
94 db<Communicator>(TRC) << "Communicator<Channel>::receive() called!\n";
95
96 if (!_running.load(std::memory_order_acquire)) {
97 db<Communicator>(WRN) << "[Communicator] Not running, skipping receive!\n";
98 return false;
99 }
100
101 Buffer* buf = Observer::updated();
102 if (!buf) {
103 db<Communicator>(WRN) << "[Communicator] No buffer available for receiving message!\n";
104 return false;
105 }
106
107 std::uint8_t temp_data[MAX_MESSAGE_SIZE];
108
109 int result = _channel->receive(buf, nullptr, temp_data, buf->size()); // Assuming Channel::receive fills 'from'
110 db<Communicator>(INF) << "[Communicator] Channel::receive() returned " << result << "\n";
111
112 if (result <= 0)
113 return false;
114
115 // Deserialize the raw data into the message
116 *message = Message_T::deserialize(temp_data, result);
117 db<Communicator>(INF) << "[Communicator] Received message from: " << message->origin().to_string() << "\n";
118
119 return true;
120}
121
122template <typename Channel>
124 _running.store(false, std::memory_order_release);
125 update(nullptr, this->rank(), nullptr);
126}
127
128template <typename Channel>
129void Communicator<Channel>::update(typename Channel::Observed* obs, typename Channel::Observer::Observing_Condition c, Buffer* buf) {
130 Observer::update(c, buf); // releases the thread waiting for data
131}
132
133template <typename Channel>
135 return _address;
136}
137
138#endif // COMMUNICATOR_H
Definition buffer.h:8
Definition communicator.h:16
Communicator(const Communicator &)=delete
bool receive(Message_T *message)
Definition communicator.h:93
Channel::Buffer Buffer
Definition communicator.h:20
~Communicator()
Definition communicator.h:70
const Address & address() const
Definition communicator.h:134
bool send(const Message_T *message)
Definition communicator.h:78
static constexpr const unsigned int MAX_MESSAGE_SIZE
Definition communicator.h:24
Channel::Port Port
Definition communicator.h:22
Message< Channel > Message_T
Definition communicator.h:23
Communicator(Channel *channel, Address address)
Definition communicator.h:60
void release()
Definition communicator.h:123
Concurrent_Observer< typename Channel::Observer::Observed_Data, typename Channel::Observer::Observing_Condition > Observer
Definition communicator.h:19
std::atomic< bool > _running
Definition communicator.h:41
Communicator & operator=(const Communicator &)=delete
Channel::Address Address
Definition communicator.h:21
Definition observer.h:79
void update(C c, D *d) override
Definition observer.h:116
Template class for network messages with Clock integration.
Definition message.h:31
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185