Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
nic.h
Go to the documentation of this file.
1#ifndef NIC_H
2#define NIC_H
3
4#include <semaphore.h>
5#include <pthread.h>
6#include <queue>
7#include <atomic>
8#include <time.h>
9#include <cstdint>
10#include <string>
11#include <cstring>
12#include <fstream>
13
14#include "api/traits.h"
16#include "api/util/debug.h"
17#include "api/util/observer.h"
18#include "api/util/observed.h"
19#include "api/util/buffer.h"
20#include "api/framework/clock.h" // Include Clock for timestamping
21
22// Foward Declaration
23class Initializer;
24
25// Network Interface Card implementation
26template <typename Engine>
27class NIC: public Ethernet, public Conditionally_Data_Observed<Buffer<Ethernet::Frame>, Ethernet::Protocol>, private Engine
28{
29 friend class Initializer;
30
31 public:
32 static const unsigned int N_BUFFERS = Traits<NIC<Engine>>::SEND_BUFFERS + Traits<NIC<Engine>>::RECEIVE_BUFFERS;
33 static constexpr unsigned int MAX_FRAME_SIZE = sizeof(Ethernet::Frame);
34
40
41 // Statistics for network operations
42 struct Statistics {
43 std::atomic<unsigned int> packets_sent;
44 std::atomic<unsigned int> packets_received;
45 std::atomic<unsigned int> bytes_sent;
46 std::atomic<unsigned int> bytes_received;
47 std::atomic<unsigned int> tx_drops;
48 std::atomic<unsigned int> rx_drops;
49
54 };
55
56 protected:
57 NIC();
58
59 public:
60 ~NIC();
61
62 // Send a pre-allocated buffer with packet size for timestamp offset calculation
63 int send(DataBuffer* buf, unsigned int packet_size);
64 // Legacy send method for backward compatibility
65 int send(DataBuffer* buf);
66
67 // Process a received buffer
68 int receive(DataBuffer* buf, Address* src, Address* dst, void* data, unsigned int size);
69
70 // Allocate a buffer for sending
71 DataBuffer* alloc(Address dst, Protocol_Number prot, unsigned int size);
72
73 // Release a buffer after use
74 void free(DataBuffer* buf);
75
76 // Get the local address
77 const Address& address();
78
79 // Set the local address
81
82 // Get network statistics
83 const Statistics& statistics();
84
85 void stop();
86
87 double radius();
88 void setRadius(double radius);
89 // Attach/detach observers
90 // void attach(Observer* obs, Protocol_Number prot); // inherited
91 // void detach(Observer* obs, Protocol_Number prot); // inherited
92
93 private:
94 void handle(Ethernet::Frame* frame, unsigned int size) override;
95
96 // Helper method to fill TX timestamp in packet
97 void fillTxTimestamp(DataBuffer* buf, unsigned int packet_size);
98
99 // Helper method to log latency to CSV file
100 void logLatency(std::int64_t latency_us);
101
102 private:
103
104 Address _address;
105 Statistics _statistics;
106 DataBuffer _buffer[N_BUFFERS];
107 std::queue<DataBuffer*> _free_buffers;
108 std::atomic<bool> _running;
109 sem_t _buffer_sem;
110 sem_t _binary_sem;
111 double _radius;
112 std::ofstream _latency_csv_file;
113
114 bool running() { return _running.load(std::memory_order_acquire); }
115};
116
117/*********** NIC Implementation ************/
118template <typename Engine>
119NIC<Engine>::NIC() : _running(true), _radius(1000.0) { // Default 1000m transmission radius
120 // Initialize buffers FIRST - before starting Engine
121 db<NIC>(INF) << "[NIC] [constructor] initializing buffers and semaphores\n";
122
123 for (unsigned int i = 0; i < N_BUFFERS; ++i) {
124 _buffer[i] = DataBuffer();
125 _free_buffers.push(&_buffer[i]);
126 }
127
128 sem_init(&_buffer_sem, 0, N_BUFFERS);
129
130 sem_init(&_binary_sem, 0, 1);
131
132 // Setting default address - must be done before starting Engine
133 _address = Engine::mac_address();
134
135 // Initialize CSV file for latency logging
136 _latency_csv_file.open("nic_latency.csv", std::ios::out | std::ios::app);
137 if (_latency_csv_file.is_open()) {
138 // Check if file is empty (new file) to write header
139 std::ifstream test_file("nic_latency.csv");
140 test_file.seekg(0, std::ios::end);
141 if (test_file.tellg() == 0) {
142 _latency_csv_file << "latency_us\n";
143 }
144 test_file.close();
145 db<NIC>(INF) << "[NIC] [constructor] CSV latency log file opened\n";
146 } else {
147 db<NIC>(WRN) << "[NIC] [constructor] Failed to open CSV latency log file\n";
148 }
149
150 // NOW it's safe to start the Engine - all NIC infrastructure is ready
151 Engine::start();
152 db<NIC>(INF) << "[NIC] [constructor] NIC fully initialized and Engine started with default radius " << _radius << "m\n";
153}
154
155template <typename Engine>
157 // Destroy engine first
158 stop();
159
160 // Close CSV file
161 if (_latency_csv_file.is_open()) {
162 _latency_csv_file.close();
163 db<NIC>(INF) << "[NIC] [destructor] CSV latency log file closed\n";
164 }
165
166 sem_destroy(&_buffer_sem);
167 sem_destroy(&_binary_sem);
168 db<NIC>(INF) << "[NIC] [destructor] semaphores destroyed\n";
169 // Engine stops itself on its own
170}
171
172template <typename Engine>
174 db<NIC>(TRC) << "[NIC] [stop()] called!\n";
175 _running.store(false, std::memory_order_release);
176 int sem_value;
177 sem_getvalue(&_buffer_sem, &sem_value);
178 for (unsigned int i = 0; i < N_BUFFERS - sem_value; ++i) {
179 sem_post(&_buffer_sem); // Release the semaphore for each buffer
180 }
181 Engine::stop();
182}
183
184template <typename Engine>
186 db<NIC>(TRC) << "NIC<Engine>::send() called!\n";
187 if (!running()) {
188 db<NIC>(TRC) << "[NIC] send called when NIC is not running \n";
189 return 0;
190 }
191
192 if (!buf) {
193 db<NIC>(WRN) << "[NIC] send() called with a null buffer\n";
194 _statistics.tx_drops++;
195 return 0;
196 }
197
198 // Fill TX timestamp before sending
199 fillTxTimestamp(buf, packet_size);
200
201 int result = Engine::send(buf->data(), buf->size());
202 db<NIC>(INF) << "[NIC] Engine::send returned " << result << "\n";
203
204 if (result <= 0) {
205 _statistics.tx_drops++;
206 result = 0;
207 } else {
208 _statistics.packets_sent++;
209 _statistics.bytes_sent += result;
210 }
211
212 return result;
213}
214
215template <typename Engine>
217 db<NIC>(TRC) << "NIC<Engine>::send() called!\n";
218
219 if (!running()) {
220 db<NIC>(ERR) << "[NIC] send() called when NIC is inactive\n";
221 return -1;
222 }
223
224 if (!buf) {
225 db<NIC>(WRN) << "[NIC] send() called with a null buffer\n";
226 _statistics.tx_drops++;
227 return -1;
228 }
229
230 int result = Engine::send(buf->data(), buf->size());
231 db<NIC>(INF) << "[NIC] Engine::send returned " << result << "\n";
232
233 if (result <= 0) {
234 _statistics.tx_drops++;
235 result = 0;
236 } else {
237 _statistics.packets_sent++;
238 _statistics.bytes_sent += result;
239 }
240
241 return result;
242}
243
244template <typename Engine>
245int NIC<Engine>::receive(DataBuffer* buf, Address* src, Address* dst, void* data, unsigned int size) {
246 db<NIC>(TRC) << "NIC<Engine>::receive() called!\n";
247
248 if (!running()) {
249 db<NIC>(ERR) << "[NIC] receive() called when NIC is inactive\n";
250 return -1;
251 }
252
253 if (!buf) {
254 db<NIC>(WRN) << "[NIC] receive() called with a null buffer\n";
255 _statistics.rx_drops++;
256 return 0;
257 }
258
259 Ethernet::Frame* frame = buf->data();
260 db<NIC>(INF) << "[NIC] frame extracted from buffer: {src = " << Ethernet::mac_to_string(frame->src) << ", dst = " << Ethernet::mac_to_string(frame->dst) << ", prot = " << std::to_string(frame->prot) << ", size = " << buf->size() << "}\n";
261
262 // 1. Filling src and dst addresses
263 if (src) *src = frame->src;
264 if (dst) *dst = frame->dst;
265
266 // 2. Payload size
267 unsigned int payload_size = buf->size() - Ethernet::HEADER_SIZE;
268
269 // 3. Copies payload to data pointer
270 std::memcpy(data, frame->payload, payload_size);
271
272 // 4. Releases the buffer
273 free(buf);
274
275 // 5. Return size of copied bytes
276 return payload_size;
277}
278
279template <typename Engine>
280void NIC<Engine>::handle(Ethernet::Frame* frame, unsigned int size) {
281 db<NIC>(TRC) << "[NIC] [handle()] called!\n";
282
283 // Additional safety check - ensure we're fully initialized
284 if (!running()) {
285 db<NIC>(WRN) << "[NIC] [handle()] called but NIC is not running - ignoring packet\n";
286 return;
287 }
288
289 // Filter check (optional, engine might do this): ignore packets sent by self
290 if (frame->src == _address) {
291 db<NIC>(INF) << "[NIC] [handle()] ignoring frame from self: {src=" << Ethernet::mac_to_string(frame->src) << "}\n";
292 return;
293 }
294
295 // 1. Extracting frame header
298
299 // 2. Allocate buffer
300 unsigned int packet_size = size - Ethernet::HEADER_SIZE;
301 if (packet_size == 0) {
302 db<NIC>(INF) << "[NIC] [handle()] dropping empty frame\n";
303 _statistics.rx_drops++;
304 return;
305 }
306
307 db<NIC>(TRC) << "[NIC] [handle()] allocating buffer\n";
308 DataBuffer * buf = alloc(dst, proto, packet_size);
309 db<NIC>(TRC) << "[NIC] [handle()] buffer allocated\n";
310
311 if (!buf) {
312 db<NIC>(ERR) << "[NIC] [handle()] alloc called, but NIC is not running\n";
313 return;
314 }
315
316 // 3. Fill RX timestamp in the buffer
317 db<NIC>(TRC) << "[NIC] [handle()] filling RX timestamp in the buffer\n";
318 auto& clock = Clock::getInstance();
319 TimestampType rx_time = clock.getLocalSystemTime();
320 std::int64_t timestamp = rx_time.time_since_epoch().count();
321 buf->setRX(timestamp);
322 db<NIC>(TRC) << "[NIC] [handle()] RX timestamp filled in the buffer: " << timestamp << "\n";
323
324 // 4. Copy frame to buffer
325 db<NIC>(TRC) << "[NIC] [handle()] copying frame to buffer\n";
326 buf->setData(frame, size);
327 db<NIC>(TRC) << "[NIC] [handle()] frame copied to buffer\n";
328
329 // 5. Extract TX timestamp and calculate latency for logging
330 db<NIC>(TRC) << "[NIC] [handle()] extracting TX timestamp for latency calculation\n";
331 const unsigned int header_size = sizeof(std::uint16_t) * 2 + sizeof(std::uint32_t); // 8 bytes
332 const unsigned int tx_timestamp_offset = header_size + 8; // Header + offsetof(TimestampFields, tx_timestamp)
333
334 if (packet_size > tx_timestamp_offset + sizeof(TimestampType)) {
335 const TimestampType* tx_timestamp_ptr = reinterpret_cast<const TimestampType*>(frame->payload + tx_timestamp_offset);
337
338 // Calculate latency in microseconds
339 std::int64_t latency_us = (rx_time - tx_time).count();
340
341 db<NIC>(INF) << "[NIC] [handle()] Latency calculated: TX=" << tx_time.time_since_epoch().count()
342 << "us, RX=" << rx_time.time_since_epoch().count() << "us, Latency=" << latency_us << "us\n";
343
344 // Log latency to CSV file
345 logLatency(latency_us);
346 } else {
347 db<NIC>(WRN) << "[NIC] [handle()] Packet too small for TX timestamp extraction. Size: " << packet_size
348 << ", required: " << (tx_timestamp_offset + sizeof(TimestampType)) << "\n";
349 }
350
351 if (!running()) {
352 db<NIC>(ERR) << "[NIC] [handle()] trying to notify protocol when NIC is inactive\n";
353 return;
354 }
355
356 // 6. Notify Observers
357 if (!notify(buf, proto)) {
358 db<NIC>(INF) << "[NIC] [handle()] data received, but no one was notified (" << proto << ")\n";
359 free(buf); // if no one is listening, release buffer
360 }
361}
362
363template <typename Engine>
365 db<NIC>(TRC) << "[NIC] [alloc()] called!\n";
366
367 if (!running()) {
368 db<NIC>(ERR) << "[NIC] [alloc()] called when NIC is inactive\n";
369 return nullptr;
370 }
371
372 // Acquire free buffers counter semaphore
373 db<NIC>(TRC) << "[NIC] [alloc()] acquiring free buffers counter semaphore\n";
374 sem_wait(&_buffer_sem);
375 db<NIC>(TRC) << "[NIC] [alloc()] free buffers counter semaphore acquired\n";
376
377 // Remove first buffer of the free buffers queue
378 db<NIC>(TRC) << "[NIC] [alloc()] acquiring binary semaphore\n";
379 sem_wait(&_binary_sem);
380 db<NIC>(TRC) << "[NIC] [alloc()] binary semaphore acquired\n";
381 DataBuffer* buf = _free_buffers.front();
382 _free_buffers.pop();
383 db<NIC>(TRC) << "[NIC] [alloc()] buffer removed from free buffers queue\n";
384 sem_post(&_binary_sem);
385 db<NIC>(TRC) << "[NIC] [alloc()] binary semaphore released\n";
386
387 // Set Frame
389 frame.src = address();
390 frame.dst = dst;
391 frame.prot = prot;
392 unsigned int frame_size = size + Ethernet::HEADER_SIZE;
393
394 // Set buffer data
395 buf->setData(&frame, frame_size);
396
397 db<NIC>(INF) << "[NIC] [alloc()] buffer allocated for frame: {src = " << Ethernet::mac_to_string(frame.src) << ", dst = " << Ethernet::mac_to_string(frame.dst) << ", prot = " << std::to_string(frame.prot) << ", size = " << buf->size() << "}\n";
398
399 return buf;
400}
401
402template <typename Engine>
404 db<NIC>(TRC) << "NIC<Engine>::free() called!\n";
405
406 if (buf == nullptr) {
407 db<NIC>(WRN) << "[NIC] free() called with a null buffer\n";
408 return;
409 }
410
411 if (!running()) {
412 db<NIC>(ERR) << "[NIC] free() called when NIC is inactive\n";
413 return;
414 }
415
416 // Debug: Check semaphore state before freeing
417 int sem_value;
418 sem_getvalue(&_buffer_sem, &sem_value);
419 db<NIC>(INF) << "[NIC] [free()] freeing buffer, current semaphore value: " << sem_value << "\n";
420
421 // Clear buffer
422 buf->clear();
423
424 // Add to free buffers queue
425 sem_wait(&_binary_sem);
426 _free_buffers.push(buf);
427 size_t queue_size = _free_buffers.size();
428 sem_post(&_binary_sem);
429
430 // Increase free buffers counter
431 sem_post(&_buffer_sem);
432
433 // Debug: Check semaphore state after freeing
434 sem_getvalue(&_buffer_sem, &sem_value);
435 db<NIC>(INF) << "[NIC] buffer released, new semaphore value: " << sem_value << ", queue size: " << queue_size << "\n";
436}
437
438template <typename Engine>
440 return _address;
441}
442
443template <typename Engine>
445 _address = address;
446 db<NIC>(INF) << "[NIC] address setted: " << Ethernet::mac_to_string(address) << "\n";
447}
448
449template <typename Engine>
451 return _statistics;
452}
453
454template <typename Engine>
455void NIC<Engine>::fillTxTimestamp(DataBuffer* buf, unsigned int packet_size) {
456 db<NIC>(TRC) << "NIC<Engine>::fillTxTimestamp() called!\n";
457
458 // Get current synchronized time from Clock
459 auto& clock = Clock::getInstance();
460 bool is_sync = true;
461 TimestampType tx_time = clock.getLocalSystemTime();
462
463 // Calculate correct offset for TX timestamp accounting for structure alignment
464 // Header: 8 bytes (2Γ—uint16_t + uint32_t)
465 // TimestampFields: bool at offset 0, tx_timestamp at offset 8 (due to alignment)
466 const unsigned int header_size = sizeof(std::uint16_t) * 2 + sizeof(std::uint32_t); // 8 bytes
467 const unsigned int tx_timestamp_offset = header_size + 8; // Header + offsetof(TimestampFields, tx_timestamp)
468
469 // Get pointer to the packet within the Ethernet frame payload
470 Ethernet::Frame* frame = buf->data();
472
473 // Fill TX timestamp at the calculated offset
474 if (packet_size > tx_timestamp_offset + sizeof(TimestampType)) {
475 TimestampType* tx_timestamp_ptr = reinterpret_cast<TimestampType*>(packet_start + tx_timestamp_offset);
477
478 db<NIC>(INF) << "[NIC] Filled TX timestamp at offset " << tx_timestamp_offset
479 << ": " << tx_time.time_since_epoch().count() << "us\n";
480 } else {
481 db<NIC>(WRN) << "[NIC] Packet too small for TX timestamp. Size: " << packet_size
482 << ", required: " << (tx_timestamp_offset + sizeof(TimestampType)) << "\n";
483 }
484}
485
486template <typename Engine>
488 return _radius;
489}
490
491template <typename Engine>
492void NIC<Engine>::setRadius(double radius) {
493 _radius = radius;
494}
495
501template <typename Engine>
502void NIC<Engine>::logLatency(std::int64_t latency_us) {
503 if (_latency_csv_file.is_open()) {
504 _latency_csv_file << latency_us << "\n";
505 _latency_csv_file.flush(); // Ensure data is written immediately
506 db<NIC>(TRC) << "[NIC] [logLatency] Logged latency: " << latency_us << " us\n";
507 } else {
508 db<NIC>(WRN) << "[NIC] [logLatency] CSV file not open, cannot log latency\n";
509 }
510}
511
512#endif // NIC_H
Definition buffer.h:8
static Clock & getInstance()
Get the singleton instance.
Definition clock.h:145
Definition observer.h:13
Definition observed.h:13
Definition ethernet.h:9
static std::string mac_to_string(Address addr)
Definition ethernet.h:36
std::uint16_t Protocol
Definition ethernet.h:24
static constexpr unsigned int HEADER_SIZE
Definition ethernet.h:26
This class initializes the API.
Definition initializer.h:20
Definition nic.h:28
const Address & address()
Definition nic.h:439
void setRadius(double radius)
Definition nic.h:492
static const unsigned int N_BUFFERS
Definition nic.h:32
Buffer< Ethernet::Frame > DataBuffer
Definition nic.h:37
int send(DataBuffer *buf, unsigned int packet_size)
Definition nic.h:185
double radius()
Definition nic.h:487
Conditionally_Data_Observed< DataBuffer, Protocol_Number > Observed
Definition nic.h:39
Conditional_Data_Observer< DataBuffer, Protocol_Number > Observer
Definition nic.h:38
~NIC()
Definition nic.h:156
void free(DataBuffer *buf)
Definition nic.h:403
DataBuffer * alloc(Address dst, Protocol_Number prot, unsigned int size)
Definition nic.h:364
Ethernet::Protocol Protocol_Number
Definition nic.h:36
const Statistics & statistics()
Definition nic.h:450
Ethernet::Address Address
Definition nic.h:35
int receive(DataBuffer *buf, Address *src, Address *dst, void *data, unsigned int size)
Definition nic.h:245
void setAddress(Address address)
Definition nic.h:444
static constexpr unsigned int MAX_FRAME_SIZE
Definition nic.h:33
void stop()
Definition nic.h:173
NIC()
Definition nic.h:119
std::chrono::time_point< std::chrono::steady_clock, std::chrono::microseconds > TimestampType
Definition clock.h:16
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
Protocol prot
Definition ethernet.h:2
Address src
Definition ethernet.h:1
Address dst
Definition ethernet.h:0
T * data()
Definition protocol.h:24
Definition ethernet.h:16
Definition ethernet.h:29
Address src
Definition ethernet.h:31
std::uint8_t payload[MTU]
Definition ethernet.h:33
Definition nic.h:42
std::atomic< unsigned int > bytes_sent
Definition nic.h:45
std::atomic< unsigned int > rx_drops
Definition nic.h:48
std::atomic< unsigned int > tx_drops
Definition nic.h:47
std::atomic< unsigned int > packets_received
Definition nic.h:44
Statistics()
Definition nic.h:50
std::atomic< unsigned int > bytes_received
Definition nic.h:46
std::atomic< unsigned int > packets_sent
Definition nic.h:43
Definition traits.h:45