52 std::unique_ptr<ComponentData>
data,
bool external =
true);
62 static void*
run(
void* arg);
66 const bool external()
const {
return _external; }
67 std::string
name()
const {
return _name; }
87 void handle_interest(
Message* msg);
88 bool should_process_response();
97 std::atomic<bool> _running;
99 std::unique_ptr<CSVLogger> _csv_logger;
104 std::atomic<bool> _interest_active;
105 std::atomic<bool> _is_consumer;
106 std::atomic<long long> _last_response_timestamp;
112 std::unique_ptr<ComponentData> _component_data;
126 struct ResponseInfo {
156 std::unique_ptr<ComponentData>
data,
bool external)
160 _can_observer(nullptr),
162 _periodic_thread(nullptr),
165 _csv_logger(nullptr),
166 _interest_thread(nullptr),
169 _interest_active(false),
170 _is_consumer(type ==
Type::RESPONSE),
171 _last_response_timestamp(0),
173 _component_data(std::move(
data)),
174 _data_producer(producer),
175 _response_handler(handler),
176 _external(external) {
178 db<Agent>(
INF) <<
"[Agent] " << _name <<
" created with address: " << _address.to_string() <<
"\n";
180 throw std::invalid_argument(
"Gateway cannot be null");
185 _can->
attach(_can_observer,
c);
187 if (_is_consumer && !
handler) {
188 throw std::invalid_argument(
"Consumer agents must have a response handler");
191 throw std::invalid_argument(
"Producer agents must have a data producer");
198 db<Agent>(
INF) <<
"[Agent] " << _name <<
" initialized as consumer, waiting for application to start periodic interest\n";
200 db<Agent>(
INF) <<
"[Agent] " << _name <<
" initialized as producer, ready to handle INTEREST messages\n";
207 throw std::runtime_error(
"Failed to create agent thread");
218 db<Agent>(
INF) <<
"[Agent] " << _name <<
" destruction started\n";
221 _running.store(
false, std::memory_order_release);
223 if (_interest_thread) {
228 if (_periodic_thread) {
229 _periodic_thread->
join();
230 delete _periodic_thread;
231 _periodic_thread =
nullptr;
242 _can->
detach(_can_observer, _c);
243 delete _can_observer;
244 _can_observer =
nullptr;
247 db<Agent>(
INF) <<
"[Agent] " << _name <<
" destroyed successfully\n";
262 if (_is_consumer || !_data_producer || !_component_data) {
265 return _data_producer(unit, _component_data.get());
278 if (!_is_consumer || !_response_handler || !_component_data || !
msg) {
281 db<Agent>(
INF) <<
"[Agent] " << _name <<
" handling response for unit: " <<
msg->unit() <<
"\n";
287 auto ptr = _value_cache.
get(key);
290 db<Agent>(
INF) <<
"[Agent] " << _name <<
" found cached values for key: " << key <<
"\n";
298 _response_handler(
msg, _component_data.get());
312 _response_handler(
msg, _component_data.get());
319 db<Agent>(
INF) <<
"[Agent] " << _name <<
" NO CACHE FOUND FOR KEY: " << key <<
"\n";
326 _response_handler(
msg, _component_data.get());
327 db<Agent>(
INF) <<
"[Agent] " << _name <<
" ADDED CACHE FOR KEY: " << key <<
"\n";
332 db<Agent>(
INF) <<
"[Agent] " << _name <<
" sending INTEREST for unit: " << unit <<
" with period: " << period.count() <<
" microseconds" <<
" external: " << _external <<
"\n";
333 if (period == Microseconds::zero())
337 _interest_period = period;
340 msg.external(_external);
354 db<Agent>(
INF) <<
"[Agent] " << _name <<
" waiting for messages...\n";
355 (*msg) = *(_can_observer->
updated());
356 db<Agent>(
INF) <<
"[Agent] " << _name <<
" messages received\n";
367 while (
agent->running()) {
372 db<Agent>(
WRN) <<
"[Agent] " <<
agent->name() <<
" received an empty (received=" <<
received <<
") or invalid message\n";
378 db<Agent>(
INF) <<
"[Agent] " <<
agent->name() <<
" received message of type: " <<
static_cast<int>(
msg->message_type())
379 <<
" for unit: " <<
msg->unit() <<
" with size: " <<
msg->value_size() <<
"\n";
383 if (
agent->should_process_response()) {
384 db<Agent>(
INF) <<
"[Agent] " <<
agent->name() <<
" processing RESPONSE message (period filter passed)\n";
387 db<Agent>(
INF) <<
"[Agent] " <<
agent->name() <<
" discarding RESPONSE message (period filter failed)\n";
400 return _running.load(std::memory_order_acquire);
403inline void Agent::handle_interest(
Message*
msg) {
404 db<Agent>(
INF) <<
"[Agent] " << _name <<
" received INTEREST for unit: " <<
msg->unit() <<
" with period: " <<
msg->period().count() <<
" microseconds\n";
408 if (_c.
type() != Type::INTEREST) {
409 db<Agent>(
WRN) <<
"[Agent] " << _name <<
" ignoring INTEREST message (not a producer)\n";
416 long int key = (
static_cast<long int>(
consumer_id) << 16) | (
msg->unit() & 0xFFFF);
420 info.period =
msg->period();
422 if (_active_interests.
contains(key)) {
423 *_active_interests.
get(key) = info;
425 _active_interests.
add(key, info);
431 if (!_periodic_thread) {
433 _periodic_thread->
start(_producer_gcd.count());
435 _periodic_thread->
set_period(_producer_gcd.count());
453 if (!_periodic_thread || !_periodic_thread->
running()) {
460 _active_interests.
for_each([&](
long int key, ResponseInfo& info){
461 if ((key & 0xFFFF) == unit && info.termination_time.count() > 0) {
462 has_active_entry = true;
463 info.termination_time -= _producer_gcd;
464 if (info.termination_time.count() <= 0) counters_updated = true;
473 db<Agent>(
INF) <<
"[Agent] " << _name <<
" sending RESPONSE for unit: " << unit <<
"\n";
489 std::string
header =
"timestamp_us,message_type,direction,origin,destination,unit,period_us,value_size,latency_us";
494 if (!_csv_logger || !_csv_logger->is_open())
return;
508 << (
direction ==
"SEND" ? _address.to_string() :
msg.origin().to_string()) <<
","
509 << (
direction ==
"SEND" ?
"BROADCAST" : _address.to_string()) <<
","
511 <<
msg.period().count() <<
","
512 <<
msg.value_size() <<
","
527 db<Agent>(
WRN) <<
"[Agent] " << _name <<
" is not a consumer, cannot start periodic interest\n";
531 if (_interest_active.load()) {
532 db<Agent>(
INF) <<
"[Agent] " << _name <<
" updating interest period from "
533 << _requested_period.count() <<
" to " << period.count() <<
" microseconds\n";
538 _requested_period = period;
539 _interest_period = period;
540 _interest_active.store(
true, std::memory_order_release);
542 if (!_interest_thread) {
544 _interest_thread->
start(period.count());
545 db<Agent>(
INF) <<
"[Agent] " << _name <<
" started periodic INTEREST for unit: "
546 << unit <<
" with period: " << period.count() <<
" microseconds\n";
556 if (_interest_active.load()) {
557 _interest_active.store(
false, std::memory_order_release);
559 if (_interest_thread) {
560 _interest_thread->
join();
561 delete _interest_thread;
562 _interest_thread =
nullptr;
565 db<Agent>(
INF) <<
"[Agent] " << _name <<
" stopped periodic INTEREST\n";
575 if (!_interest_active.load() || !
running()) {
579 db<Agent>(
TRC) <<
"[Agent] " << _name <<
" sending periodic INTEREST for unit: "
580 << unit <<
" with period: " << _requested_period.count() <<
" microseconds" <<
" external: " << _external <<
"\n";
583 msg.external(_external);
597 if (_interest_thread) {
606inline bool Agent::should_process_response() {
608 if (_interest_period == Microseconds::zero()) {
613 auto last_timestamp = _last_response_timestamp.load(std::memory_order_acquire);
628inline void Agent::recompute_gcd() {
630 _active_interests.
for_each([&](
long int , ResponseInfo& info){
631 if (info.termination_time.count() > 0) {
632 long long p = info.period.count();
633 gcd_us = gcd_us == 0 ? p : std::gcd(gcd_us, p);
643 if (_periodic_thread) {
657 return _periodic_thread && _periodic_thread->
running();
EPOS-inspired Agent implementation using composition over inheritance.
Definition agent.h:35
static constexpr int MAX_RESPONSES_PER_INTEREST
Definition agent.h:48
void update_interest_period(Microseconds new_period)
Update the period for periodic interest sending.
Definition agent.h:594
Message::Array Value
Definition agent.h:39
Message::Microseconds Microseconds
Definition agent.h:42
Message::Type Type
Definition agent.h:41
Message::Origin Address
Definition agent.h:38
Address address() const
Definition agent.h:648
static const unsigned int _units_per_vehicle
Definition agent.h:46
bool thread_running()
Definition agent.h:656
int send(Unit unit, Microseconds period)
Definition agent.h:331
CAN::Observer Observer
Definition agent.h:44
int receive(Message *msg)
Definition agent.h:353
void stop_periodic_interest()
Stop sending periodic INTEREST messages.
Definition agent.h:555
Periodic_Thread< Agent > Thread
Definition agent.h:43
void log_message(const Message &msg, const std::string &direction)
Definition agent.h:493
void send_interest(Unit unit)
Send a single INTEREST message (called by periodic thread)
Definition agent.h:574
bool running()
Definition agent.h:399
CAN::Message Message
Definition agent.h:37
~Agent()
Destructor with proper cleanup order.
Definition agent.h:217
const bool external() const
Definition agent.h:66
int can_send(Message *msg)
Definition agent.h:652
void handle_response(Message *msg)
Handle response using function pointer instead of virtual method.
Definition agent.h:276
int start_periodic_interest(Unit unit, Microseconds period)
Start sending periodic INTEREST messages for the specified unit and period.
Definition agent.h:525
static void * run(void *arg)
Definition agent.h:364
Message::Unit Unit
Definition agent.h:40
virtual void reply(Unit unit)
Reply method that calls function pointer instead of virtual method.
Definition agent.h:446
Agent(CAN *bus, const std::string &name, Unit unit, Type type, Address address, DataProducer producer, ResponseHandler handler, std::unique_ptr< ComponentData > data, bool external=true)
Constructor for EPOS-inspired Agent using composition.
Definition agent.h:154
void set_csv_logger(const std::string &log_dir)
Definition agent.h:487
Value get(Unit unit)
Get data using function pointer instead of virtual method.
Definition agent.h:260
std::string name() const
Definition agent.h:67
int send(Message *msg)
Definition bus.h:68
void detach(Observer *o, C c)
Definition observed.h:147
void attach(Observer *o, C c)
Definition observed.h:140
D * updated()
Definition observer.h:124
const Type type() const
Definition bus.h:40
Template class for network messages with Clock integration.
Definition message.h:31
std::vector< std::uint8_t > Array
Definition message.h:49
static Microseconds getSynchronizedTimestamp()
Definition message.h:490
std::chrono::microseconds Microseconds
Definition message.h:50
Channel::Address Origin
Definition message.h:45
Type
Definition message.h:35
std::uint32_t Unit
Definition message.h:48
Definition periodicThread.h:72
void start(std::int64_t period)
Definition periodicThread.h:130
void join()
Definition periodicThread.h:116
void set_period(std::int64_t period)
Definition periodicThread.h:160
bool running()
Definition periodicThread.h:231
A hash cache with a static size, using linear probing for collision resolution.
Definition static_size_hashed_cache.h:15
void for_each(F &&fn)
Iterate over all occupied entries and apply a functor. The functor receives (key, value&) as paramete...
Definition static_size_hashed_cache.h:101
void add(long int key, V value)
Adds a key-value pair to the cache. If the key already exists, its value is updated....
Definition static_size_hashed_cache.h:32
V * get(long int key)
Retrieves a pointer to the value associated with a given key. It uses linear probing to find the key.
Definition static_size_hashed_cache.h:55
bool contains(long int key) const
Determines if a key is already in the data structure. It uses linear probing to find the key.
Definition static_size_hashed_cache.h:78
std::vector< std::uint8_t >(* DataProducer)(std::uint32_t unit, ComponentData *data)
Function pointer type for data production.
Definition component_functions.hpp:22
void(* ResponseHandler)(void *msg, ComponentData *data)
Function pointer type for response handling.
Definition component_functions.hpp:34
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
T * data()
Definition protocol.h:24
Header * header()
Definition protocol.h:3