1#ifndef STATUS_MANAGER_H
2#define STATUS_MANAGER_H
20template <
typename NIC_TYPE>
class Protocol;
32template <
typename NIC_TYPE>
42 std::atomic<uint32_t>
age;
58 std::chrono::microseconds
prune_interval = std::chrono::seconds(3),
72 throw std::invalid_argument(
"StatusManager: Owner protocol cannot be null");
77 std::lock_guard<std::mutex>
lock(_neighbor_list_mutex);
78 _neighbor_list[_self_id] = {_self_id,
self_age, _self_unique_key, std::chrono::steady_clock::now()};
88 _broadcast_thread->
start(_broadcast_interval);
89 _prune_thread->
start(_prune_interval);
92 <<
" (Age: " << _self_age <<
"). Broadcasting every "
93 << _broadcast_interval.count() <<
"us. Pruning every "
94 << _prune_interval.count() <<
"us.\n";
103 _running.store(
false, std::memory_order_release);
104 if (_broadcast_thread) {
105 _broadcast_thread->
join();
106 delete _broadcast_thread;
107 _broadcast_thread =
nullptr;
110 _prune_thread->
join();
111 delete _prune_thread;
112 _prune_thread =
nullptr;
127 if (!_running.load(std::memory_order_acquire))
return;
145 std::lock_guard<std::mutex>
lock(_neighbor_list_mutex);
150 if (
it == _neighbor_list.end()) {
161 it->second.age.store(
sender_age, std::memory_order_release);
163 it->second.last_seen = std::chrono::steady_clock::now();
167 perform_leader_election_and_update_storage_unsafe();
180 void broadcast_status_message_task() {
181 if (!_running.load(std::memory_order_acquire))
return;
184 payload.resize(
sizeof(_self_age) + _self_unique_key.size());
187 std::memcpy(
payload.data() +
offset, &_self_age,
sizeof(_self_age));
188 offset +=
sizeof(_self_age);
189 std::memcpy(
payload.data() +
offset, _self_unique_key.data(), _self_unique_key.size());
194 std::lock_guard<std::mutex>
lock(_protocol_mutex);
199 <<
" (Age: " << _self_age <<
").\n";
207 void prune_stale_neighbors_task() {
208 if (!_running.load(std::memory_order_acquire))
return;
210 std::lock_guard<std::mutex>
lock(_neighbor_list_mutex);
212 auto now = std::chrono::steady_clock::now();
214 for (
auto it = _neighbor_list.begin();
it != _neighbor_list.end(); ) {
215 if (
it->first.paddr() == _self_mac_address) {
216 it->second.last_seen =
now;
220 if ((
now -
it->second.last_seen) > _neighbor_timeout) {
222 it = _neighbor_list.erase(
it);
230 perform_leader_election_and_update_storage_unsafe();
239 void perform_leader_election_and_update_storage_unsafe() {
240 if (_neighbor_list.empty()) {
241 db<StatusManager>(
WAR) <<
"StatusManager: Neighbor list became empty. Re-asserting self as leader.\n";
247 auto leader_it = std::max_element(_neighbor_list.begin(), _neighbor_list.end(),
249 const NeighborInfo& a = a_pair.second;
250 const NeighborInfo& b = b_pair.second;
251 if (a.age.load(std::memory_order_acquire) != b.age.load(std::memory_order_acquire)) {
252 return a.age.load(std::memory_order_acquire) < b.age.load(std::memory_order_acquire);
254 return b.unique_key <
a.unique_key;
262 <<
" (Age: " <<
leader_it->second.age.load(std::memory_order_acquire) <<
").\n";
269 mutable std::mutex _protocol_mutex;
271 const VehicleIdType _self_id;
272 std::atomic<uint32_t> _self_age;
275 std::map<VehicleIdType, NeighborInfo> _neighbor_list;
276 mutable std::mutex _neighbor_list_mutex;
281 const std::chrono::microseconds _broadcast_interval;
282 const std::chrono::microseconds _prune_interval;
283 const std::chrono::microseconds _neighbor_timeout;
285 std::atomic<bool> _running;
289template<
typename NIC_TYPE>
static std::string mac_to_string(Address addr)
Definition ethernet.h:36
static const Ethernet::Address BROADCAST
Definition ethernet.h:54
void setLeaderId(const Ethernet::Address &leader_id)
Set the current leader ID.
Definition leaderKeyStorage.h:65
static LeaderKeyStorage & getInstance()
Get the singleton instance.
Definition leaderKeyStorage.h:54
void setGroupMacKey(const MacKeyType &key)
Set the current group MAC key.
Definition leaderKeyStorage.h:93
Definition periodicThread.h:72
void start(std::int64_t period)
Definition periodicThread.h:130
void join()
Definition periodicThread.h:116
Definition protocol.h:134
int send(Address from, Address to, const void *data, unsigned int size)
Definition protocol.h:302
std::uint16_t Port
Definition protocol.h:35
Thread-safe manager for vehicle status and leader election.
Definition statusManager.h:33
~StatusManager()
Destroy the Status Manager.
Definition statusManager.h:102
StatusManager & operator=(const StatusManager &)=delete
Protocol< NIC_TYPE >::Port STATUS_PORT
Definition statusManager.h:38
void process_incoming_status(const VehicleIdType &sender_protocol_address, const uint8_t *payload_data, unsigned int payload_size)
Process an incoming status message.
Definition statusManager.h:122
StatusManager(Protocol< NIC_TYPE > *owner_protocol, const Ethernet::Address &self_mac_address, uint32_t self_age, const UniqueKeyValueType &self_unique_key, std::chrono::microseconds broadcast_interval=std::chrono::seconds(1), std::chrono::microseconds prune_interval=std::chrono::seconds(3), std::chrono::microseconds neighbor_timeout=std::chrono::seconds(5))
Construct a new Status Manager.
Definition statusManager.h:52
StatusManager(const StatusManager &)=delete
typename Protocol< NIC_TYPE >::Address VehicleIdType
Definition statusManager.h:35
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
std::uint8_t payload[MTU]
Definition ethernet.h:3
std::array< uint8_t, 16 > UniqueKeyValueType
Definition statusManager.h:23
Definition statusManager.h:40
VehicleIdType id
Definition statusManager.h:41
std::chrono::steady_clock::time_point last_seen
Definition statusManager.h:44
UniqueKeyValueType unique_key
Definition statusManager.h:43
std::atomic< uint32_t > age
Definition statusManager.h:42
static const bool debugged
Definition traits.h:46