Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
agent.h
Go to the documentation of this file.
1#ifndef AGENT_HPP
2#define AGENT_HPP
3
4#include <string>
5#include <sstream>
6#include <memory>
7#include <atomic>
8#include <cassert>
9#include <stdexcept>
10#include <pthread.h>
11#include <numeric>
12
13#include "api/util/debug.h"
14#include "api/network/bus.h"
16#include "api/util/csv_logger.h"
17#include "api/traits.h"
21
35class Agent {
36 public:
45 // Number of units a vehicle could produce
46 static const unsigned int _units_per_vehicle = 5;
47 // Fixed-response feature: number of responses a producer sends per INTEREST
48 static constexpr int MAX_RESPONSES_PER_INTEREST = 5;
49
50 Agent(CAN* bus, const std::string& name, Unit unit, Type type, Address address,
51 DataProducer producer, ResponseHandler handler,
52 std::unique_ptr<ComponentData> data, bool external = true);
53 ~Agent();
54
55 // Non-virtual interface - eliminates race condition
56 Value get(Unit unit);
57 void handle_response(Message* msg);
58
59 int send(Unit unit, Microseconds period); // Send will always be INTEREST (this is exposed to application)
60 int receive(Message* msg);
61
62 static void* run(void* arg); // Run will always receive INTEREST messages, and set periodic thread
63 bool running();
64 void external(const bool external);
65 // External flag for message filtering
66 const bool external() const { return _external; }
67 std::string name() const { return _name; }
68
69 // CSV logging methods
70 void set_csv_logger(const std::string& log_dir);
71 void log_message(const Message& msg, const std::string& direction);
72
75 void send_interest(Unit unit);
76 void update_interest_period(Microseconds new_period);
77
78 protected:
79 virtual void reply(Unit unit);
80
81 bool thread_running();
82 int can_send(Message* msg);
83 Address address() const;
84
85 private:
86
87 void handle_interest(Message* msg);
88 bool should_process_response();
89 void recompute_gcd();
90
91 Address _address;
92 CAN* _can;
93 std::string _name;
94 Observer* _can_observer;
95 pthread_t _thread;
96 Thread* _periodic_thread;
97 std::atomic<bool> _running;
98 Condition _c;
99 std::unique_ptr<CSVLogger> _csv_logger;
100
101 Thread* _interest_thread; // Periodic thread for sending INTEREST
102 Microseconds _requested_period; // Period requested by this consumer
103 Microseconds _interest_period; // Current INTEREST period for response filtering
104 std::atomic<bool> _interest_active; // Control interest sending
105 std::atomic<bool> _is_consumer; // Track if this agent is a consumer
106 std::atomic<long long> _last_response_timestamp; // Timestamp of last processed RESPONSE (ยตs)
107
108 // Producer GCD period tracking (for fixed-response feature)
109 Microseconds _producer_gcd;
110
111 // EPOS-inspired composition members
112 std::unique_ptr<ComponentData> _component_data;
113 DataProducer _data_producer;
114 ResponseHandler _response_handler;
115
116 // Cache for storing the last value for each unit
117 struct ValueCache {
118 Unit unit;
119 Microseconds timestamp;
120 unsigned int size;
121 };
122
124
125 // === Fixed-Response feature data ===
126 struct ResponseInfo {
127 Microseconds termination_time{Microseconds::zero()};
128 Microseconds period{Microseconds::zero()};
129 };
130
131 // key = (consumerId โŠ• unit) โ€“ same 16-bit scheme used elsewhere
133
134 bool _external;
135};
136
137/****** Agent Implementation *****/
138
154inline Agent::Agent(CAN* bus, const std::string& name, Unit unit, Type type, Address address,
155 DataProducer producer, ResponseHandler handler,
156 std::unique_ptr<ComponentData> data, bool external)
157 : _address(address),
158 _can(bus),
159 _name(name),
160 _can_observer(nullptr),
161 _thread(),
162 _periodic_thread(nullptr),
163 _running(false),
164 _c(),
165 _csv_logger(nullptr),
166 _interest_thread(nullptr),
167 _requested_period(Microseconds::zero()),
168 _interest_period(Microseconds::zero()),
169 _interest_active(false),
170 _is_consumer(type == Type::RESPONSE),
171 _last_response_timestamp(0),
172 _producer_gcd(Microseconds::zero()),
173 _component_data(std::move(data)),
174 _data_producer(producer),
175 _response_handler(handler),
176 _external(external) {
177
178 db<Agent>(INF) << "[Agent] " << _name << " created with address: " << _address.to_string() << "\n";
179 if (!bus)
180 throw std::invalid_argument("Gateway cannot be null");
181
182 Condition c(unit, type);
183 _c = c;
184 _can_observer = new Observer(c);
185 _can->attach(_can_observer, c);
186
187 if (_is_consumer && !handler) {
188 throw std::invalid_argument("Consumer agents must have a response handler");
189 }
190 if (!_is_consumer && !producer) {
191 throw std::invalid_argument("Producer agents must have a data producer");
192 }
193
194 // Phase 1.2: Consumer initialization - No automatic INTEREST sending
195 if (_is_consumer) {
196 // Don't send initial INTEREST here anymore
197 // Application will call start_periodic_interest() when ready
198 db<Agent>(INF) << "[Agent] " << _name << " initialized as consumer, waiting for application to start periodic interest\n";
199 } else {
200 db<Agent>(INF) << "[Agent] " << _name << " initialized as producer, ready to handle INTEREST messages\n";
201 }
202
203 _running = true;
204 int result = pthread_create(&_thread, nullptr, Agent::run, this);
205 if (result != 0) {
206 _running = false;
207 throw std::runtime_error("Failed to create agent thread");
208 }
209}
210
218 db<Agent>(INF) << "[Agent] " << _name << " destruction started\n";
219
220 // First, stop the running flag to prevent new operations
221 _running.store(false, std::memory_order_release);
222
223 if (_interest_thread) {
225 }
226
227 // Stop periodic thread first if it exists - CRITICAL: Do this before anything else
228 if (_periodic_thread) {
229 _periodic_thread->join();
230 delete _periodic_thread;
231 _periodic_thread = nullptr;
232 }
233
234 // Cancel the main agent thread and wait for it to exit
235 if (_thread) {
236 pthread_cancel(_thread);
237 pthread_join(_thread, nullptr);
238 }
239
240 // Detach from CAN bus before deleting observer
241 if (_can_observer) {
242 _can->detach(_can_observer, _c);
243 delete _can_observer;
244 _can_observer = nullptr;
245 }
246
247 db<Agent>(INF) << "[Agent] " << _name << " destroyed successfully\n";
248}
249
261 // Only producers should generate data
262 if (_is_consumer || !_data_producer || !_component_data) {
263 return Value(); // Return empty vector if not a producer
264 }
265 return _data_producer(unit, _component_data.get());
266}
267
277 // Only consumers should handle responses
278 if (!_is_consumer || !_response_handler || !_component_data || !msg) {
279 return; // Silently ignore if not a consumer
280 }
281 db<Agent>(INF) << "[Agent] " << _name << " handling response for unit: " << msg->unit() << "\n";
282
283 auto origin_paddr = msg->origin().paddr();
284 uint16_t key = (static_cast<uint16_t>(origin_paddr.bytes[4]) << 8) | origin_paddr.bytes[5];
285
286 if (_value_cache.contains(key)) {
287 auto ptr = _value_cache.get(key);
288 auto cached_values = *ptr;
289 bool unit_found = false;
290 db<Agent>(INF) << "[Agent] " << _name << " found cached values for key: " << key << "\n";
291
292 for (unsigned int i = 0; i < _units_per_vehicle; ++i) {
293 if (cached_values[i].unit == msg->unit()) {
294 unit_found = true;
296 if ((current_timestamp.count() - cached_values[i].timestamp.count()) >= _interest_period.count()) {
297 cached_values[i].timestamp = current_timestamp;
298 _response_handler(msg, _component_data.get());
299 }
300 break;
301 }
302 }
303
304 db<Agent>(INF) << "[Agent] " << _name << "Unit" << (unit_found ? "FOUND" : " NOT FOUND") << "\n";
305
306 if (!unit_found) {
307 for (unsigned int i = 0; i < _units_per_vehicle; ++i) {
308 if (cached_values[i].timestamp.count() == 0) { // Assuming timestamp 0 means empty
309 cached_values[i].unit = msg->unit();
311 cached_values[i].size = msg->value_size();
312 _response_handler(msg, _component_data.get());
313 break;
314 }
315 }
316 }
317
318 } else {
319 db<Agent>(INF) << "[Agent] " << _name << " NO CACHE FOUND FOR KEY: " << key << "\n";
320 ValueCache new_values[_units_per_vehicle]; // Zero-initialize
321 new_values[0].unit = msg->unit();
323 new_values[0].size = msg->value_size();
324
325 _value_cache.add(key, new_values);
326 _response_handler(msg, _component_data.get());
327 db<Agent>(INF) << "[Agent] " << _name << " ADDED CACHE FOR KEY: " << key << "\n";
328 }
329}
330
331inline int Agent::send(Unit unit, Microseconds period) {
332 db<Agent>(INF) << "[Agent] " << _name << " sending INTEREST for unit: " << unit << " with period: " << period.count() << " microseconds" << " external: " << _external << "\n";
333 if (period == Microseconds::zero())
334 return 0;
335
336 // Store the INTEREST period for RESPONSE filtering
337 _interest_period = period;
338
339 Message msg(Message::Type::INTEREST, _address, unit, period);
340 msg.external(_external);
341
342 // Log sent message to CSV
343 log_message(msg, "SEND");
344
345 int result = _can->send(&msg);
346
347 if (!result)
348 return -1;
349
350 return result;
351}
352
354 db<Agent>(INF) << "[Agent] " << _name << " waiting for messages...\n";
355 (*msg) = *(_can_observer->updated());
356 db<Agent>(INF) << "[Agent] " << _name << " messages received\n";
357
358 // Log received message to CSV
359 log_message(*msg, "RECEIVE");
360
361 return msg->size();
362}
363
364inline void* Agent::run(void* arg) {
365 Agent* agent = reinterpret_cast<Agent*>(arg);
366
367 while (agent->running()) {
368 Message* msg = new Message();
369 int received = agent->receive(msg);
370
371 if (received <= 0 || !msg) {
372 db<Agent>(WRN) << "[Agent] " << agent->name() << " received an empty (received=" << received << ") or invalid message\n";
373 delete msg; // Clean up the message object
374
375 continue; // Skip processing if no valid message was received
376 }
377
378 db<Agent>(INF) << "[Agent] " << agent->name() << " received message of type: " << static_cast<int>(msg->message_type())
379 << " for unit: " << msg->unit() << " with size: " << msg->value_size() << "\n";
380
381 if (msg->message_type() == Message::Type::RESPONSE) {
382 // Filter RESPONSE messages based on INTEREST period
383 if (agent->should_process_response()) {
384 db<Agent>(INF) << "[Agent] " << agent->name() << " processing RESPONSE message (period filter passed)\n";
385 agent->handle_response(msg);
386 } else {
387 db<Agent>(INF) << "[Agent] " << agent->name() << " discarding RESPONSE message (period filter failed)\n";
388 }
389 } else if (msg->message_type() == Message::Type::INTEREST) {
390 agent->handle_interest(msg);
391 }
392
393 delete msg; // Clean up the message object after processing
394 }
395
396 return nullptr;
397}
398
399inline bool Agent::running() {
400 return _running.load(std::memory_order_acquire);
401}
402
403inline void Agent::handle_interest(Message* msg) {
404 db<Agent>(INF) << "[Agent] " << _name << " received INTEREST for unit: " << msg->unit() << " with period: " << msg->period().count() << " microseconds\n";
405
406 // Only respond to INTEREST if this agent is a producer (observing INTEREST messages)
407 // Consumers (observing RESPONSE messages) should not respond to INTEREST
408 if (_c.type() != Type::INTEREST) {
409 db<Agent>(WRN) << "[Agent] " << _name << " ignoring INTEREST message (not a producer)\n";
410 return;
411 }
412
413 // === Fixed-Response cache update ===
414 auto origin_paddr = msg->origin().paddr();
415 uint16_t consumer_id = (static_cast<uint16_t>(origin_paddr.bytes[4]) << 8) | origin_paddr.bytes[5];
416 long int key = (static_cast<long int>(consumer_id) << 16) | (msg->unit() & 0xFFFF);
417
418 ResponseInfo info;
419 info.termination_time = msg->period() * MAX_RESPONSES_PER_INTEREST;
420 info.period = msg->period();
421
422 if (_active_interests.contains(key)) {
423 *_active_interests.get(key) = info; // overwrite
424 } else {
425 _active_interests.add(key, info);
426 }
427
428 // Recompute GCD and adjust thread period if needed
429 recompute_gcd();
430
431 if (!_periodic_thread) {
432 _periodic_thread = new Thread(this, &Agent::reply, msg->unit());
433 _periodic_thread->start(_producer_gcd.count());
434 } else {
435 _periodic_thread->set_period(_producer_gcd.count());
436 }
437}
438
446inline void Agent::reply(Unit unit) {
447 // Safety check: don't reply if agent is being destroyed
448 if (!running()) {
449 return;
450 }
451
452 // Additional safety check: ensure periodic thread is still valid
453 if (!_periodic_thread || !_periodic_thread->running()) {
454 return;
455 }
456
457 // === Fixed-response counter handling ===
458 bool has_active_entry = false;
459 bool counters_updated = false;
460 _active_interests.for_each([&](long int key, ResponseInfo& info){
461 if ((key & 0xFFFF) == unit && info.termination_time.count() > 0) {
462 has_active_entry = true;
463 info.termination_time -= _producer_gcd;
464 if (info.termination_time.count() <= 0) counters_updated = true;
465 }
466 });
467
468 if (!has_active_entry) {
469 // No consumer still interested; nothing to send
470 return;
471 }
472
473 db<Agent>(INF) << "[Agent] " << _name << " sending RESPONSE for unit: " << unit << "\n";
474
475 // Produce data and send one RESPONSE (broadcast)
476 Value value = get(unit);
477 Message msg(Message::Type::RESPONSE, _address, unit, Microseconds::zero(), value.data(), value.size());
478
479 log_message(msg, "SEND");
480 _can->send(&msg);
481
482 if (counters_updated) {
483 recompute_gcd();
484 }
485}
486
487inline void Agent::set_csv_logger(const std::string& log_dir) {
488 std::string csv_file = log_dir + "/" + _name + "_messages.csv";
489 std::string header = "timestamp_us,message_type,direction,origin,destination,unit,period_us,value_size,latency_us";
490 _csv_logger = std::make_unique<CSVLogger>(csv_file, header);
491}
492
493inline void Agent::log_message(const Message& msg, const std::string& direction) {
494 if (!_csv_logger || !_csv_logger->is_open()) return;
495
497
498 // Calculate latency for received messages
499 auto latency_us = 0L;
500 if (direction == "RECEIVE") {
501 latency_us = timestamp_us - msg.timestamp().count();
502 }
503
504 std::ostringstream csv_line;
505 csv_line << timestamp_us << ","
506 << (msg.message_type() == Message::Type::INTEREST ? "INTEREST" : "RESPONSE") << ","
507 << direction << ","
508 << (direction == "SEND" ? _address.to_string() : msg.origin().to_string()) << ","
509 << (direction == "SEND" ? "BROADCAST" : _address.to_string()) << ","
510 << msg.unit() << ","
511 << msg.period().count() << ","
512 << msg.value_size() << ","
513 << latency_us;
514
515 _csv_logger->log(csv_line.str());
516}
517
526 if (!_is_consumer) {
527 db<Agent>(WRN) << "[Agent] " << _name << " is not a consumer, cannot start periodic interest\n";
528 return -1;
529 }
530
531 if (_interest_active.load()) {
532 db<Agent>(INF) << "[Agent] " << _name << " updating interest period from "
533 << _requested_period.count() << " to " << period.count() << " microseconds\n";
535 return 0;
536 }
537
538 _requested_period = period;
539 _interest_period = period; // Keep response filter in sync
540 _interest_active.store(true, std::memory_order_release);
541
542 if (!_interest_thread) {
543 _interest_thread = new Thread(this, &Agent::send_interest, unit);
544 _interest_thread->start(period.count());
545 db<Agent>(INF) << "[Agent] " << _name << " started periodic INTEREST for unit: "
546 << unit << " with period: " << period.count() << " microseconds\n";
547 }
548
549 return 0;
550}
551
556 if (_interest_active.load()) {
557 _interest_active.store(false, std::memory_order_release);
558
559 if (_interest_thread) {
560 _interest_thread->join();
561 delete _interest_thread;
562 _interest_thread = nullptr;
563 }
564
565 db<Agent>(INF) << "[Agent] " << _name << " stopped periodic INTEREST\n";
566 }
567}
568
574inline void Agent::send_interest(Unit unit) {
575 if (!_interest_active.load() || !running()) {
576 return;
577 }
578
579 db<Agent>(TRC) << "[Agent] " << _name << " sending periodic INTEREST for unit: "
580 << unit << " with period: " << _requested_period.count() << " microseconds" << " external: " << _external << "\n";
581
582 Message msg(Message::Type::INTEREST, _address, unit, _requested_period);
583 msg.external(_external);
584
585 log_message(msg, "SEND");
586 _can->send(&msg);
587}
588
595 _requested_period = new_period;
596 _interest_period = new_period; // Keep response filter in sync
597 if (_interest_thread) {
598 _interest_thread->set_period(new_period.count());
599 }
600}
601
602/*
603 * @brief Check if a RESPONSE message should be processed based on the INTEREST period
604 * @return true if enough time has passed since the last processed RESPONSE, false otherwise
605 */
606inline bool Agent::should_process_response() {
607 // If no INTEREST period is set, process all messages
608 if (_interest_period == Microseconds::zero()) {
609 return true;
610 }
611
613 auto last_timestamp = _last_response_timestamp.load(std::memory_order_acquire);
614
615 // If this is the first RESPONSE or enough time has passed
616 if (last_timestamp == 0 || (current_timestamp - last_timestamp) >= _interest_period.count()) {
617 _last_response_timestamp.store(current_timestamp, std::memory_order_release);
618 return true;
619 }
620
621 return false;
622}
623
624inline void Agent::external(const bool external) {
625 _external = external;
626}
627
628inline void Agent::recompute_gcd() {
629 long long gcd_us = 0;
630 _active_interests.for_each([&](long int /*k*/, ResponseInfo& info){
631 if (info.termination_time.count() > 0) {
632 long long p = info.period.count();
633 gcd_us = gcd_us == 0 ? p : std::gcd(gcd_us, p);
634 }
635 });
636
637 // If no active entries keep previous gcd (thread will idle)
638 if (gcd_us == 0)
639 return;
640
641 _producer_gcd = Microseconds(gcd_us); // Does this transform long long to int64_t?
642
643 if (_periodic_thread) {
644 _periodic_thread->set_period(gcd_us);
645 }
646}
647
649 return _address;
650}
651
653 return _can->send(msg);
654}
655
657 return _periodic_thread && _periodic_thread->running();
658}
659
660#endif // AGENT_H
EPOS-inspired Agent implementation using composition over inheritance.
Definition agent.h:35
static constexpr int MAX_RESPONSES_PER_INTEREST
Definition agent.h:48
void update_interest_period(Microseconds new_period)
Update the period for periodic interest sending.
Definition agent.h:594
Message::Array Value
Definition agent.h:39
Message::Microseconds Microseconds
Definition agent.h:42
Message::Type Type
Definition agent.h:41
Message::Origin Address
Definition agent.h:38
Address address() const
Definition agent.h:648
static const unsigned int _units_per_vehicle
Definition agent.h:46
bool thread_running()
Definition agent.h:656
int send(Unit unit, Microseconds period)
Definition agent.h:331
CAN::Observer Observer
Definition agent.h:44
int receive(Message *msg)
Definition agent.h:353
void stop_periodic_interest()
Stop sending periodic INTEREST messages.
Definition agent.h:555
Periodic_Thread< Agent > Thread
Definition agent.h:43
void log_message(const Message &msg, const std::string &direction)
Definition agent.h:493
void send_interest(Unit unit)
Send a single INTEREST message (called by periodic thread)
Definition agent.h:574
bool running()
Definition agent.h:399
CAN::Message Message
Definition agent.h:37
~Agent()
Destructor with proper cleanup order.
Definition agent.h:217
const bool external() const
Definition agent.h:66
int can_send(Message *msg)
Definition agent.h:652
void handle_response(Message *msg)
Handle response using function pointer instead of virtual method.
Definition agent.h:276
int start_periodic_interest(Unit unit, Microseconds period)
Start sending periodic INTEREST messages for the specified unit and period.
Definition agent.h:525
static void * run(void *arg)
Definition agent.h:364
Message::Unit Unit
Definition agent.h:40
virtual void reply(Unit unit)
Reply method that calls function pointer instead of virtual method.
Definition agent.h:446
Agent(CAN *bus, const std::string &name, Unit unit, Type type, Address address, DataProducer producer, ResponseHandler handler, std::unique_ptr< ComponentData > data, bool external=true)
Constructor for EPOS-inspired Agent using composition.
Definition agent.h:154
void set_csv_logger(const std::string &log_dir)
Definition agent.h:487
Value get(Unit unit)
Get data using function pointer instead of virtual method.
Definition agent.h:260
std::string name() const
Definition agent.h:67
Definition bus.h:52
int send(Message *msg)
Definition bus.h:68
void detach(Observer *o, C c)
Definition observed.h:147
void attach(Observer *o, C c)
Definition observed.h:140
Definition observer.h:79
D * updated()
Definition observer.h:124
Definition bus.h:10
const Type type() const
Definition bus.h:40
Template class for network messages with Clock integration.
Definition message.h:31
std::vector< std::uint8_t > Array
Definition message.h:49
static Microseconds getSynchronizedTimestamp()
Definition message.h:490
std::chrono::microseconds Microseconds
Definition message.h:50
Channel::Address Origin
Definition message.h:45
Type
Definition message.h:35
std::uint32_t Unit
Definition message.h:48
Definition periodicThread.h:72
void start(std::int64_t period)
Definition periodicThread.h:130
void join()
Definition periodicThread.h:116
void set_period(std::int64_t period)
Definition periodicThread.h:160
bool running()
Definition periodicThread.h:231
A hash cache with a static size, using linear probing for collision resolution.
Definition static_size_hashed_cache.h:15
void for_each(F &&fn)
Iterate over all occupied entries and apply a functor. The functor receives (key, value&) as paramete...
Definition static_size_hashed_cache.h:101
void add(long int key, V value)
Adds a key-value pair to the cache. If the key already exists, its value is updated....
Definition static_size_hashed_cache.h:32
V * get(long int key)
Retrieves a pointer to the value associated with a given key. It uses linear probing to find the key.
Definition static_size_hashed_cache.h:55
bool contains(long int key) const
Determines if a key is already in the data structure. It uses linear probing to find the key.
Definition static_size_hashed_cache.h:78
std::vector< std::uint8_t >(* DataProducer)(std::uint32_t unit, ComponentData *data)
Function pointer type for data production.
Definition component_functions.hpp:22
void(* ResponseHandler)(void *msg, ComponentData *data)
Function pointer type for response handling.
Definition component_functions.hpp:34
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
T * data()
Definition protocol.h:24
Header * header()
Definition protocol.h:3