Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
statusManager.h
Go to the documentation of this file.
1#ifndef STATUS_MANAGER_H
2#define STATUS_MANAGER_H
3
4#include <map>
5#include <mutex>
6#include <chrono>
7#include <array>
8#include <atomic>
9#include <vector>
10#include <algorithm> // For std::max_element
11#include <stdexcept> // For std::invalid_argument
12
13// Assuming these paths are correct relative to your include directories
15#include "api/framework/periodicThread.h" // Your Periodic_Thread
16#include "api/util/debug.h" // Your debug logging
18
19// Forward declaration of Protocol to manage include order if necessary
20template <typename NIC_TYPE> class Protocol;
21
22// Define a type for the unique key of a vehicle
23using UniqueKeyValueType = std::array<uint8_t, 16>; // Example: 128-bit key
24
32template <typename NIC_TYPE>
34public:
35 using VehicleIdType = typename Protocol<NIC_TYPE>::Address; // MAC + Port
36 // Define the port used for STATUS messages
37 // This should ideally come from a central traits/configuration file.
39
40 struct NeighborInfo {
42 std::atomic<uint32_t> age; // Made atomic for potential concurrent updates
44 std::chrono::steady_clock::time_point last_seen;
45 };
46
57 std::chrono::microseconds broadcast_interval = std::chrono::seconds(1),
58 std::chrono::microseconds prune_interval = std::chrono::seconds(3),
59 std::chrono::microseconds neighbor_timeout = std::chrono::seconds(5)
60 ) : _protocol_ptr(owner_protocol),
61 _self_mac_address(self_mac_address),
63 _self_age(self_age),
64 _self_unique_key(self_unique_key),
65 _broadcast_interval(broadcast_interval),
66 _prune_interval(prune_interval),
67 _neighbor_timeout(neighbor_timeout),
68 _running(true)
69 {
70 if (!_protocol_ptr) {
71 db<StatusManager>(ERR) << "StatusManager: Owner protocol cannot be null\n";
72 throw std::invalid_argument("StatusManager: Owner protocol cannot be null");
73 }
74
75 // Add self to neighbor list
76 {
77 std::lock_guard<std::mutex> lock(_neighbor_list_mutex);
78 _neighbor_list[_self_id] = {_self_id, self_age, _self_unique_key, std::chrono::steady_clock::now()};
79 }
80
81 // Set self as initial leader
82 LeaderKeyStorage::getInstance().setLeaderId(_self_mac_address);
84
85 _broadcast_thread = new Periodic_Thread<StatusManager>(this, &StatusManager::broadcast_status_message_task);
86 _prune_thread = new Periodic_Thread<StatusManager>(this, &StatusManager::prune_stale_neighbors_task);
87
88 _broadcast_thread->start(_broadcast_interval);
89 _prune_thread->start(_prune_interval);
90
91 db<StatusManager>(INF) << "StatusManager initialized for " << _self_id.to_string()
92 << " (Age: " << _self_age << "). Broadcasting every "
93 << _broadcast_interval.count() << "us. Pruning every "
94 << _prune_interval.count() << "us.\n";
95 }
96
103 _running.store(false, std::memory_order_release);
104 if (_broadcast_thread) {
105 _broadcast_thread->join();
106 delete _broadcast_thread;
107 _broadcast_thread = nullptr;
108 }
109 if (_prune_thread) {
110 _prune_thread->join();
111 delete _prune_thread;
112 _prune_thread = nullptr;
113 }
114 db<StatusManager>(INF) << "StatusManager for " << _self_id.to_string() << " shut down.\n";
115 }
116
124 const uint8_t* payload_data,
125 unsigned int payload_size)
126 {
127 if (!_running.load(std::memory_order_acquire)) return;
128
129 if (payload_size < (sizeof(uint32_t) + sizeof(UniqueKeyValueType))) {
130 db<StatusManager>(WAR) << "StatusManager: Received undersized STATUS payload from "
131 << sender_protocol_address.to_string() << ". Size: " << payload_size << "\n";
132 return;
133 }
134
137
138 unsigned int offset = 0;
139 std::memcpy(&sender_age, payload_data + offset, sizeof(sender_age));
140 offset += sizeof(sender_age);
141 std::memcpy(sender_unique_key.data(), payload_data + offset, sender_unique_key.size());
142
144
145 std::lock_guard<std::mutex> lock(_neighbor_list_mutex);
146
147 auto it = _neighbor_list.find(consistent_sender_id);
148 bool list_changed = false;
149
150 if (it == _neighbor_list.end()) {
151 _neighbor_list[consistent_sender_id] = {consistent_sender_id, sender_age, sender_unique_key, std::chrono::steady_clock::now()};
152 list_changed = true;
153 db<StatusManager>(INF) << "StatusManager: New neighbor " << consistent_sender_id.to_string()
154 << " (Age: " << sender_age << ").\n";
155 } else {
156 if (it->second.age != sender_age || it->second.unique_key != sender_unique_key) {
157 list_changed = true;
158 db<StatusManager>(INF) << "StatusManager: Updated neighbor " << consistent_sender_id.to_string()
159 << " (Age: " << sender_age << ").\n";
160 }
161 it->second.age.store(sender_age, std::memory_order_release);
162 it->second.unique_key = sender_unique_key;
163 it->second.last_seen = std::chrono::steady_clock::now();
164 }
165
166 if (list_changed) {
167 perform_leader_election_and_update_storage_unsafe();
168 }
169 }
170
171 StatusManager(const StatusManager&) = delete;
173
174private:
180 void broadcast_status_message_task() {
181 if (!_running.load(std::memory_order_acquire)) return;
182
183 std::vector<uint8_t> payload;
184 payload.resize(sizeof(_self_age) + _self_unique_key.size());
185
186 unsigned int offset = 0;
187 std::memcpy(payload.data() + offset, &_self_age, sizeof(_self_age));
188 offset += sizeof(_self_age);
189 std::memcpy(payload.data() + offset, _self_unique_key.data(), _self_unique_key.size());
190
192
193 {
194 std::lock_guard<std::mutex> lock(_protocol_mutex);
195 _protocol_ptr->send(_self_id, broadcast_dest_addr, payload.data(), payload.size());
196 }
197
198 db<StatusManager>(TRC) << "StatusManager: Broadcasted STATUS from " << _self_id.to_string()
199 << " (Age: " << _self_age << ").\n";
200 }
201
207 void prune_stale_neighbors_task() {
208 if (!_running.load(std::memory_order_acquire)) return;
209
210 std::lock_guard<std::mutex> lock(_neighbor_list_mutex);
211 bool list_changed = false;
212 auto now = std::chrono::steady_clock::now();
213
214 for (auto it = _neighbor_list.begin(); it != _neighbor_list.end(); ) {
215 if (it->first.paddr() == _self_mac_address) {
216 it->second.last_seen = now;
217 ++it;
218 continue;
219 }
220 if ((now - it->second.last_seen) > _neighbor_timeout) {
221 db<StatusManager>(INF) << "StatusManager: Pruning stale neighbor " << it->first.to_string() << ".\n";
222 it = _neighbor_list.erase(it);
223 list_changed = true;
224 } else {
225 ++it;
226 }
227 }
228
229 if (list_changed) {
230 perform_leader_election_and_update_storage_unsafe();
231 }
232 }
233
239 void perform_leader_election_and_update_storage_unsafe() {
240 if (_neighbor_list.empty()) {
241 db<StatusManager>(WAR) << "StatusManager: Neighbor list became empty. Re-asserting self as leader.\n";
242 LeaderKeyStorage::getInstance().setLeaderId(_self_mac_address);
244 return;
245 }
246
247 auto leader_it = std::max_element(_neighbor_list.begin(), _neighbor_list.end(),
248 [](const auto& a_pair, const auto& b_pair) {
249 const NeighborInfo& a = a_pair.second;
250 const NeighborInfo& b = b_pair.second;
251 if (a.age.load(std::memory_order_acquire) != b.age.load(std::memory_order_acquire)) {
252 return a.age.load(std::memory_order_acquire) < b.age.load(std::memory_order_acquire);
253 }
254 return b.unique_key < a.unique_key;
255 });
256
258 const UniqueKeyValueType& elected_leader_key = leader_it->second.unique_key;
259
260 db<StatusManager>(INF) << "StatusManager: Leader election completed. New leader MAC: "
262 << " (Age: " << leader_it->second.age.load(std::memory_order_acquire) << ").\n";
263
266 }
267
268 Protocol<NIC_TYPE>* _protocol_ptr;
269 mutable std::mutex _protocol_mutex; // Protect protocol access
270 const Ethernet::Address _self_mac_address;
271 const VehicleIdType _self_id;
272 std::atomic<uint32_t> _self_age; // Made atomic for potential concurrent updates
273 const UniqueKeyValueType _self_unique_key;
274
275 std::map<VehicleIdType, NeighborInfo> _neighbor_list;
276 mutable std::mutex _neighbor_list_mutex;
277
278 Periodic_Thread<StatusManager>* _broadcast_thread;
279 Periodic_Thread<StatusManager>* _prune_thread;
280
281 const std::chrono::microseconds _broadcast_interval;
282 const std::chrono::microseconds _prune_interval;
283 const std::chrono::microseconds _neighbor_timeout;
284
285 std::atomic<bool> _running;
286};
287
288// Add traits for StatusManager
289template<typename NIC_TYPE>
290struct Traits<StatusManager<NIC_TYPE>> : public Traits<void> {
291 static const bool debugged = true;
292};
293
294#endif // STATUS_MANAGER_H
static std::string mac_to_string(Address addr)
Definition ethernet.h:36
static const Ethernet::Address BROADCAST
Definition ethernet.h:54
void setLeaderId(const Ethernet::Address &leader_id)
Set the current leader ID.
Definition leaderKeyStorage.h:65
static LeaderKeyStorage & getInstance()
Get the singleton instance.
Definition leaderKeyStorage.h:54
void setGroupMacKey(const MacKeyType &key)
Set the current group MAC key.
Definition leaderKeyStorage.h:93
Definition periodicThread.h:72
void start(std::int64_t period)
Definition periodicThread.h:130
void join()
Definition periodicThread.h:116
Definition protocol.h:134
Definition protocol.h:29
int send(Address from, Address to, const void *data, unsigned int size)
Definition protocol.h:302
std::uint16_t Port
Definition protocol.h:35
Thread-safe manager for vehicle status and leader election.
Definition statusManager.h:33
~StatusManager()
Destroy the Status Manager.
Definition statusManager.h:102
StatusManager & operator=(const StatusManager &)=delete
Protocol< NIC_TYPE >::Port STATUS_PORT
Definition statusManager.h:38
void process_incoming_status(const VehicleIdType &sender_protocol_address, const uint8_t *payload_data, unsigned int payload_size)
Process an incoming status message.
Definition statusManager.h:122
StatusManager(Protocol< NIC_TYPE > *owner_protocol, const Ethernet::Address &self_mac_address, uint32_t self_age, const UniqueKeyValueType &self_unique_key, std::chrono::microseconds broadcast_interval=std::chrono::seconds(1), std::chrono::microseconds prune_interval=std::chrono::seconds(3), std::chrono::microseconds neighbor_timeout=std::chrono::seconds(5))
Construct a new Status Manager.
Definition statusManager.h:52
StatusManager(const StatusManager &)=delete
typename Protocol< NIC_TYPE >::Address VehicleIdType
Definition statusManager.h:35
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
std::uint8_t payload[MTU]
Definition ethernet.h:3
std::array< uint8_t, 16 > UniqueKeyValueType
Definition statusManager.h:23
Definition ethernet.h:16
Definition statusManager.h:40
VehicleIdType id
Definition statusManager.h:41
std::chrono::steady_clock::time_point last_seen
Definition statusManager.h:44
UniqueKeyValueType unique_key
Definition statusManager.h:43
std::atomic< uint32_t > age
Definition statusManager.h:42
Definition traits.h:45
static const bool debugged
Definition traits.h:46