Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
gateway.h
Go to the documentation of this file.
1#ifndef GATEWAY_H
2#define GATEWAY_H
3
4#include <unordered_map>
5#include <unordered_set>
6#include <pthread.h>
7#include <chrono>
8#include <sstream>
9
10#include "api/traits.h"
12#include "api/network/bus.h"
14#include "api/util/csv_logger.h"
15#include "api/network/message.h"
16#include "api/util/debug.h"
17
18class Gateway {
19 public:
26 typedef std::unordered_map<Unit, std::unordered_set<Observer*>> Map;
27
28 inline static const unsigned int MAX_MESSAGE_SIZE = Protocol::MTU - sizeof(Protocol::Header) - sizeof(Protocol::TimestampFields);
29 const unsigned int PORT = 0;
30
32 ~Gateway();
33
34 void start();
35
36 bool send(Message* message);
37 bool receive(Message* msg);
39
40 bool running() const;
41 static void* mainloop(void* arg);
42 static void* internalLoop(void* arg);
43
44 const Address& address();
45 CAN* bus() { return _can; }
46
47 // New method to access network for RSU manager setup
48 Network* network() { return _network; }
49
50 // CSV logging methods
51 void setup_csv_logging(const std::string& log_dir);
52 void log_message(const Message& msg, const std::string& direction);
53
54 private:
55 void handle(Message* msg);
56 void subscribe(Message* message);
57 void publish(Message* message);
58
59 unsigned int _id;
60 Network* _network;
61 Communicator* _comms;
62 CAN* _can;
63 Observer* _can_observer;
64 pthread_t _receive_thread;
65 pthread_t _internal_thread;
66 std::atomic<bool> _running;
67 std::unique_ptr<CSVLogger> _csv_logger;
68};
69
70/******** Gateway Implementation ********/
71inline Gateway::Gateway(const unsigned int id, Network::EntityType entity_type) : _id(id), _running(false) {
72 db<Gateway>(TRC) << "Gateway::Gateway(" << id << ", entity_type) called!\n";
73 _network = new Network(id, entity_type);
74
75 // Sets communicator
76 Address addr(_network->address(), PORT);
77 _comms = new Communicator(_network->channel(), addr);
78 _can = _network->bus();
80 _can_observer = new Observer(c);
81
82 // CRITICAL FIX: Attach observer to CAN bus
83 _can->attach(_can_observer, c);
84
85 db<Gateway>(INF) << "[Gateway " << _id << "] created with address: " << addr.to_string() << "\n";
86}
87
88inline void Gateway::start() {
89 db<Gateway>(TRC) << "Gateway::start() called for ID " << _id << "!\n";
90 if (_running.load()) {
91 db<Gateway>(WRN) << "[Gateway " << _id << "] start() called but already running.\n";
92 return;
93 }
94 _running.store(true, std::memory_order_release);
95 pthread_create(&_receive_thread, nullptr, Gateway::mainloop, this);
96 pthread_create(&_internal_thread, nullptr, Gateway::internalLoop, this);
97 db<Gateway>(INF) << "[Gateway " << _id << "] threads started\n";
98}
99
101 db<Gateway>(TRC) << "Gateway::~Gateway() called for ID " << _id << "!\n";
102 if (!_running.load()) {
103 return;
104 }
105 _running.store(false, std::memory_order_release);
106
107 _comms->release();
108
109 // CRITICAL FIX: Detach observer from CAN bus before deleting
110 if (_can_observer) {
111 Message* dummy_msg = new Message();
113 _can_observer->update(c, dummy_msg);
114 _can->detach(_can_observer, c);
115 }
116 pthread_join(_internal_thread, nullptr);
117 delete _can_observer;
118 _can_observer = nullptr;
119
120 _network->stop();
121 pthread_join(_receive_thread, nullptr);
122 delete _network;
123 db<Gateway>(TRC) << "[Gateway " << _id << "] threads joined\n";
124
125 delete _comms;
126
127 db<Gateway>(INF) << "[Gateway " << _id << "] destroyed successfully\n";
128}
129
131
132 if (message->size() > MAX_MESSAGE_SIZE) {
133 db<Gateway>(WRN) << "[Gateway " << _id << "] message too large: " << message->size() << " > " << MAX_MESSAGE_SIZE << "\n";
134 return false;
135 }
136
137 db<Gateway>(INF) << "[Gateway " << _id << "] sending external message of type " << static_cast<int>(message->message_type())
138 << " for unit " << message->unit() << "\n";
139
140 // Log sent message to CSV
141 log_message(*message, "SEND");
142
143 if (!running()) {
144 db<Gateway>(WRN) << "[Gateway " << _id << "] send called but gateway is not running\n";
145 return false;
146 }
147 bool result = _comms->send(message);
148
149 db<Gateway>(INF) << "[Gateway " << _id << "] external send result: " << (result ? "SUCCESS" : "FAILED") << "\n";
150
151 return result;
152}
153
155 if (!_running.load(std::memory_order_acquire)) {
156 db<Gateway>(WRN) << "[Gateway " << _id << "] receive called but gateway is not running\n";
157 return false;
158 }
159
160 bool result = _comms->receive(message);
161
162 // Log received message to CSV if successful
163 if (result) {
164 db<Gateway>(INF) << "[Gateway " << _id << "] received external message of type " << static_cast<int>(message->message_type())
165 << " for unit " << message->unit() << "\n";
166 log_message(*message, "RECEIVE");
167 }
168
169 return result;
170}
171
172// TODO - Edit origin in message
173inline void Gateway::handle(Message* message) {
174 // CRITICAL FIX: Check if message originated from this gateway to prevent feedback loop
175 if (message->origin() == _comms->address()) {
176 db<Gateway>(INF) << "[Gateway " << _id << "] ignoring message from self (origin: "
177 << message->origin().to_string() << ", self: " << _comms->address().to_string() << ")\n";
178 return;
179 }
180
181 // Validate message type to detect corruption
182 auto msg_type = message->message_type();
188 db<Gateway>(ERR) << "[Gateway " << _id << "] received corrupted message with invalid type "
189 << static_cast<int>(msg_type) << " from origin " << message->origin().to_string()
190 << ", unit=" << message->unit() << ", period=" << message->period().count()
191 << ", value_size=" << message->value_size() << " - DROPPING MESSAGE\n";
192 return;
193 }
194
195 db<Gateway>(INF) << "[Gateway " << _id << "] handling external message of type " << static_cast<int>(message->message_type())
196 << " for unit " << message->unit() << " from origin " << message->origin().to_string() << "\n";
197
198
201
202 switch (modified_message.message_type())
203 {
205 db<Gateway>(INF) << "[Gateway " << _id << "] forwarding INTEREST to CAN bus with modified origin\n";
206 _can->send(&modified_message);
207 break;
209 db<Gateway>(INF) << "[Gateway " << _id << "] forwarding RESPONSE to CAN bus with modified origin\n";
210 _can->send(&modified_message);
211 break;
213 db<Gateway>(INF) << "[Gateway " << _id << "] forwarding STATUS to CAN bus with modified origin\n";
214 _can->send(&modified_message);
215 break;
216 default:
217 db<Gateway>(WRN) << "[Gateway " << _id << "] unknown message type: " << static_cast<int>(modified_message.message_type()) << "\n";
218 break;
219 }
220}
221
222inline void* Gateway::mainloop(void* arg) {
223 Gateway* self = reinterpret_cast<Gateway*>(arg);
224
225 db<Gateway>(INF) << "[Gateway " << self->_id << "] external receive loop started\n";
226
227 while (self->running()) {
228 Message msg;
229 if (self->receive(&msg)) {
230 self->handle(&msg);
231 }
232 }
233
234 db<Gateway>(INF) << "[Gateway " << self->_id << "] external receive loop ended\n";
235 return nullptr;
236}
237
239 // CRITICAL FIX: Get message from observer and copy it to the parameter
240 Message* received_msg = _can_observer->updated();
241 if (!received_msg) {
242 db<Gateway>(WRN) << "[Gateway " << _id << "] no internal message received\n";
243 return false;
244 }
245
246 // Copy the received message to the output parameter
247 *msg = *received_msg;
248
249 db<Gateway>(INF) << "[Gateway " << _id << "] received internal message of type " << static_cast<int>(msg->message_type())
250 << " for unit " << msg->unit() << " external: " << msg->external() << "\n";
251
252 // Clean up the received message
253 delete received_msg;
254
255 return true;
256}
257
258inline void* Gateway::internalLoop(void* arg) {
259 Gateway* self = reinterpret_cast<Gateway*>(arg);
260
261 db<Gateway>(INF) << "[Gateway " << self->_id << "] internal receive loop started\n";
262
263 while (self->running()) {
264 Message msg;
265 if (self->internalReceive(&msg)) {
266 // CRITICAL FIX: Check if message originated from this gateway to prevent feedback loop
267 if (msg.origin() == self->_comms->address() || !msg.external()) {
268 db<Gateway>(INF) << "[Gateway " << self->_id << "] ignoring internal message from self (origin: "
269 << msg.origin().to_string() << ", self: " << self->_comms->address().to_string() << ")\n";
270 continue;
271 }
272
273 db<Gateway>(INF) << "[Gateway " << self->_id << "] forwarding internal message externally from origin "
274 << msg.origin().to_string() << "\n";
275 self->send(&msg);
276 }
277 }
278
279 db<Gateway>(INF) << "[Gateway " << self->_id << "] internal receive loop ended\n";
280 return nullptr;
281}
282
283inline bool Gateway::running() const {
284 return _running.load(std::memory_order_acquire);
285}
286
288 return _comms->address();
289}
290
291inline void Gateway::setup_csv_logging(const std::string& log_dir) {
292 std::string csv_file = log_dir + "/gateway_" + std::to_string(_id) + "_messages.csv";
293 std::string header = "timestamp_us,message_type,direction,origin,destination,unit,period_us,value_size,latency_us";
294 _csv_logger = std::make_unique<CSVLogger>(csv_file, header);
295}
296
297inline void Gateway::log_message(const Message& msg, const std::string& direction) {
298 if (!_csv_logger || !_csv_logger->is_open()) return;
299
301
302 // Calculate latency for received messages
303 auto latency_us = 0L;
304 if (direction == "RECEIVE") {
305 latency_us = timestamp_us - msg.timestamp().count();
306 }
307
308 // Properly map message types instead of assuming unknown types are STATUS
309 std::string msg_type_str;
310 switch (msg.message_type()) {
312 msg_type_str = "INTEREST";
313 break;
315 msg_type_str = "RESPONSE";
316 break;
318 msg_type_str = "STATUS";
319 break;
321 msg_type_str = "REQ";
322 break;
324 msg_type_str = "KEY_RESPONSE";
325 break;
327 msg_type_str = "UNKNOWN";
328 break;
330 msg_type_str = "INVALID";
331 break;
332 default:
333 msg_type_str = "CORRUPTED_TYPE_" + std::to_string(static_cast<int>(msg.message_type()));
334 break;
335 }
336
337 std::ostringstream csv_line;
338 csv_line << timestamp_us << ","
339 << msg_type_str << ","
340 << direction << ","
341 << (direction == "SEND" ? address().to_string() : msg.origin().to_string()) << ","
342 << (direction == "SEND" ? "NETWORK" : address().to_string()) << ","
343 << msg.unit() << ","
344 << msg.period().count() << ","
345 << msg.value_size() << ","
346 << latency_us;
347
348 _csv_logger->log(csv_line.str());
349}
350
351#endif // GATEWAY_H
Definition bus.h:52
int send(Message *msg)
Definition bus.h:68
Definition communicator.h:16
bool receive(Message_T *message)
Definition communicator.h:93
const Address & address() const
Definition communicator.h:134
bool send(const Message_T *message)
Definition communicator.h:78
void release()
Definition communicator.h:123
void detach(Observer *o, C c)
Definition observed.h:147
void attach(Observer *o, C c)
Definition observed.h:140
Definition observer.h:79
D * updated()
Definition observer.h:124
void update(C c, D *d) override
Definition observer.h:116
Definition bus.h:10
Definition gateway.h:18
const Address & address()
Definition gateway.h:287
bool receive(Message *msg)
Definition gateway.h:154
void log_message(const Message &msg, const std::string &direction)
Definition gateway.h:297
const unsigned int PORT
Definition gateway.h:29
Network::Message Message
Definition gateway.h:23
Gateway(const unsigned int id, Network::EntityType entity_type=Network::EntityType::VEHICLE)
Definition gateway.h:71
static void * internalLoop(void *arg)
Definition gateway.h:258
Network * network()
Definition gateway.h:48
static void * mainloop(void *arg)
Definition gateway.h:222
void setup_csv_logging(const std::string &log_dir)
Definition gateway.h:291
Network::Communicator Communicator
Definition gateway.h:20
~Gateway()
Definition gateway.h:100
Protocol::Address Address
Definition gateway.h:22
bool running() const
Definition gateway.h:283
std::unordered_map< Unit, std::unordered_set< Observer * > > Map
Definition gateway.h:26
CAN::Observer Observer
Definition gateway.h:25
static const unsigned int MAX_MESSAGE_SIZE
Definition gateway.h:28
void start()
Definition gateway.h:88
Network::Protocol Protocol
Definition gateway.h:21
bool send(Message *message)
Definition gateway.h:130
CAN * bus()
Definition gateway.h:45
bool internalReceive(Message *msg)
Definition gateway.h:238
Message::Unit Unit
Definition gateway.h:24
Template class for network messages with Clock integration.
Definition message.h:31
static Microseconds getSynchronizedTimestamp()
Definition message.h:490
const Origin & origin() const
Definition message.h:205
std::uint32_t Unit
Definition message.h:48
Definition network.h:8
void stop()
Definition network.h:19
EntityType
Definition network.h:10
CAN * bus()
Definition network.h:71
Protocol * channel()
Definition network.h:67
const NIC::Address address()
Definition network.h:75
Definition protocol.h:134
const std::string to_string() const
Definition protocol.h:277
Definition protocol.h:43
Definition protocol.h:29
static const unsigned int MTU
Definition protocol.h:82
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
Header * header()
Definition protocol.h:3
Definition protocol.h:63