4#include <unordered_map>
5#include <unordered_set>
26 typedef std::unordered_map<Unit, std::unordered_set<Observer*>>
Map;
29 const unsigned int PORT = 0;
66 std::atomic<bool> _running;
67 std::unique_ptr<CSVLogger> _csv_logger;
72 db<Gateway>(
TRC) <<
"Gateway::Gateway(" <<
id <<
", entity_type) called!\n";
78 _can = _network->
bus();
85 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] created with address: " <<
addr.to_string() <<
"\n";
89 db<Gateway>(
TRC) <<
"Gateway::start() called for ID " << _id <<
"!\n";
90 if (_running.load()) {
91 db<Gateway>(
WRN) <<
"[Gateway " << _id <<
"] start() called but already running.\n";
94 _running.store(
true, std::memory_order_release);
101 db<Gateway>(
TRC) <<
"Gateway::~Gateway() called for ID " << _id <<
"!\n";
102 if (!_running.load()) {
105 _running.store(
false, std::memory_order_release);
114 _can->
detach(_can_observer,
c);
117 delete _can_observer;
118 _can_observer =
nullptr;
127 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] destroyed successfully\n";
137 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] sending external message of type " <<
static_cast<int>(
message->message_type())
138 <<
" for unit " <<
message->unit() <<
"\n";
144 db<Gateway>(
WRN) <<
"[Gateway " << _id <<
"] send called but gateway is not running\n";
149 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] external send result: " << (
result ?
"SUCCESS" :
"FAILED") <<
"\n";
155 if (!_running.load(std::memory_order_acquire)) {
156 db<Gateway>(
WRN) <<
"[Gateway " << _id <<
"] receive called but gateway is not running\n";
164 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] received external message of type " <<
static_cast<int>(
message->message_type())
165 <<
" for unit " <<
message->unit() <<
"\n";
176 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] ignoring message from self (origin: "
188 db<Gateway>(
ERR) <<
"[Gateway " << _id <<
"] received corrupted message with invalid type "
189 <<
static_cast<int>(
msg_type) <<
" from origin " <<
message->origin().to_string()
190 <<
", unit=" <<
message->unit() <<
", period=" <<
message->period().count()
191 <<
", value_size=" <<
message->value_size() <<
" - DROPPING MESSAGE\n";
195 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] handling external message of type " <<
static_cast<int>(
message->message_type())
196 <<
" for unit " <<
message->unit() <<
" from origin " <<
message->origin().to_string() <<
"\n";
205 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] forwarding INTEREST to CAN bus with modified origin\n";
209 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] forwarding RESPONSE to CAN bus with modified origin\n";
213 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] forwarding STATUS to CAN bus with modified origin\n";
227 while (
self->running()) {
242 db<Gateway>(
WRN) <<
"[Gateway " << _id <<
"] no internal message received\n";
249 db<Gateway>(
INF) <<
"[Gateway " << _id <<
"] received internal message of type " <<
static_cast<int>(
msg->message_type())
250 <<
" for unit " <<
msg->unit() <<
" external: " <<
msg->external() <<
"\n";
263 while (
self->running()) {
265 if (
self->internalReceive(&
msg)) {
267 if (
msg.origin() ==
self->_comms->address() || !
msg.external()) {
268 db<Gateway>(
INF) <<
"[Gateway " <<
self->_id <<
"] ignoring internal message from self (origin: "
269 <<
msg.
origin().to_string() <<
", self: " <<
self->_comms->address().to_string() <<
")\n";
273 db<Gateway>(
INF) <<
"[Gateway " <<
self->_id <<
"] forwarding internal message externally from origin "
274 <<
msg.origin().to_string() <<
"\n";
284 return _running.load(std::memory_order_acquire);
292 std::string
csv_file =
log_dir +
"/gateway_" + std::to_string(_id) +
"_messages.csv";
293 std::string
header =
"timestamp_us,message_type,direction,origin,destination,unit,period_us,value_size,latency_us";
298 if (!_csv_logger || !_csv_logger->is_open())
return;
310 switch (
msg.message_type()) {
333 msg_type_str =
"CORRUPTED_TYPE_" + std::to_string(
static_cast<int>(
msg.message_type()));
344 <<
msg.period().count() <<
","
345 <<
msg.value_size() <<
","
int send(Message *msg)
Definition bus.h:68
Definition communicator.h:16
bool receive(Message_T *message)
Definition communicator.h:93
const Address & address() const
Definition communicator.h:134
bool send(const Message_T *message)
Definition communicator.h:78
void release()
Definition communicator.h:123
void detach(Observer *o, C c)
Definition observed.h:147
void attach(Observer *o, C c)
Definition observed.h:140
D * updated()
Definition observer.h:124
void update(C c, D *d) override
Definition observer.h:116
const Address & address()
Definition gateway.h:287
bool receive(Message *msg)
Definition gateway.h:154
void log_message(const Message &msg, const std::string &direction)
Definition gateway.h:297
const unsigned int PORT
Definition gateway.h:29
Network::Message Message
Definition gateway.h:23
Gateway(const unsigned int id, Network::EntityType entity_type=Network::EntityType::VEHICLE)
Definition gateway.h:71
static void * internalLoop(void *arg)
Definition gateway.h:258
Network * network()
Definition gateway.h:48
static void * mainloop(void *arg)
Definition gateway.h:222
void setup_csv_logging(const std::string &log_dir)
Definition gateway.h:291
Network::Communicator Communicator
Definition gateway.h:20
~Gateway()
Definition gateway.h:100
Protocol::Address Address
Definition gateway.h:22
bool running() const
Definition gateway.h:283
std::unordered_map< Unit, std::unordered_set< Observer * > > Map
Definition gateway.h:26
CAN::Observer Observer
Definition gateway.h:25
static const unsigned int MAX_MESSAGE_SIZE
Definition gateway.h:28
void start()
Definition gateway.h:88
Network::Protocol Protocol
Definition gateway.h:21
bool send(Message *message)
Definition gateway.h:130
CAN * bus()
Definition gateway.h:45
bool internalReceive(Message *msg)
Definition gateway.h:238
Message::Unit Unit
Definition gateway.h:24
Template class for network messages with Clock integration.
Definition message.h:31
static Microseconds getSynchronizedTimestamp()
Definition message.h:490
const Origin & origin() const
Definition message.h:205
std::uint32_t Unit
Definition message.h:48
void stop()
Definition network.h:19
EntityType
Definition network.h:10
CAN * bus()
Definition network.h:71
Protocol * channel()
Definition network.h:67
const NIC::Address address()
Definition network.h:75
Definition protocol.h:134
const std::string to_string() const
Definition protocol.h:277
static const unsigned int MTU
Definition protocol.h:82
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
Header * header()
Definition protocol.h:3