Communication Library for Autonomous Systems v1.0
Reliable and secure communication library for autonomous vehicle systems
Loading...
Searching...
No Matches
socketEngine.h
Go to the documentation of this file.
1#ifndef SOCKETENGINE_H
2#define SOCKETENGINE_H
3
4#include <cstring>
5#include <cerrno>
6#include <cstdint>
7#include <unistd.h>
8#include <sys/types.h>
9#include <sys/socket.h>
10#include <sys/epoll.h>
11#include <sys/eventfd.h>
12#include <netinet/in.h>
13#include <linux/if_packet.h>
14#include <net/ethernet.h>
15#include <net/if.h>
16#include <fcntl.h>
17#include <sys/ioctl.h>
18#include <pthread.h>
19#include <atomic>
20#include <stdio.h>
21
22#include "ethernet.h"
23#include "api/traits.h"
24#include "api/util/debug.h"
25
26
28
29 public:
30 static const char* INTERFACE() { return Traits<SocketEngine>::INTERFACE_NAME(); }
31
32 public:
34
35 virtual ~SocketEngine();
36
37 void start();
38
39 void stop();
40
41 const bool running();
42
43 int send(Ethernet::Frame* frame, unsigned int size);
44
45 static void* run(void* arg);
46
48
49 private:
50 void setUpSocket();
51
52 void setUpEpoll();
53
54 // This is the new epoll signal handler
55 void receive();
56
57 // Signal handler
58 virtual void handle(Ethernet::Frame* frame, unsigned int size) = 0;
59
60 protected:
62 int _ep_fd;
65
66 private:
67 const int _stop_ev;
68 pthread_t _receive_thread;
69 std::atomic<bool> _running;
70};
71
72
73/********** SocketEngine Implementation **********/
74
76 // Do NOT auto-start - let NIC control when to start
77 // Initialize socket and epoll setup only
78 setUpSocket();
79 setUpEpoll();
80 db<SocketEngine>(INF) << "[SocketEngine] constructor completed - ready to start\n";
81};
82
84 db<SocketEngine>(TRC) << "SocketEngine::~SocketEngine() called!\n";
85
86 stop();
87
90 close(_stop_ev); // Also close the eventfd
91};
92
94 db<SocketEngine>(TRC) << "SocketEngine::start() called!\n";
95
96 if (_running.load()) {
97 db<SocketEngine>(WRN) << "[SocketEngine] Already running, ignoring start() call\n";
98 return;
99 }
100
101 // Socket and epoll are already set up in constructor
102 // Just start the receive thread
103 _running.store(true, std::memory_order_release);
104 pthread_create(&_receive_thread, nullptr, SocketEngine::run, this);
105
106 db<SocketEngine>(INF) << "[SocketEngine] receive thread started\n";
107}
108
110 db<SocketEngine>(TRC) << "SocketEngine::stop() called!\n";
111
112 if (!running()) return;
113
114 _running.store(false, std::memory_order_release);
115
116 std::uint64_t u = 1;
118 db<SocketEngine>(TRC) << "[SocketEngine] sending stop signal to receive thread\n";
119 do {
120 bytes_written = write(_stop_ev, &u, sizeof(u));
121 } while (bytes_written == -1 && errno == EINTR); // Retry only on EINTR
122 db<SocketEngine>(TRC) << "[SocketEngine] stop signal sent to receive thread\n";
123
124 // Join the receive thread if it exists
125 if (_receive_thread != 0) {
126 int ret = pthread_join(_receive_thread, nullptr);
127 if (ret == 0) {
128 db<SocketEngine>(INF) << "[SocketEngine] successfully stopped!\n";
129 } else {
130 db<SocketEngine>(ERR) << "[SocketEngine] failed to join thread with error: " << ret << "\n";
131 }
132 } else {
133 db<SocketEngine>(ERR) << "[SocketEngine] receive thread is not running!\n";
134 }
135
136 db<SocketEngine>(INF) << "[SocketEngine] sucessfully stopped!\n";
137}
138
139void SocketEngine::setUpSocket() {
140 db<SocketEngine>(TRC) << "SocketEngine::setUpSocket() called!\n";
141
142 // 1. Creating socket
144 if (_sock_fd < 0) {
145 perror("socket");
146 throw std::runtime_error("Failed to create SocketEngine::_sock_fd!");
147 }
148
149 // 2. Making it non-blocking
150 int flags = fcntl(_sock_fd, F_GETFL, 0);
152
153 // 3. Getting interface index
154 struct ifreq ifr;
155 std::memset(&ifr, 0, sizeof(ifr));
156 std::strncpy(ifr.ifr_name, INTERFACE(), IFNAMSIZ);
157
158
159 if (ioctl(_sock_fd, SIOCGIFINDEX, &ifr) < 0) {
160 perror("ioctl SIOCGIFINDEX");
161 throw std::runtime_error("Failed to retrieve interface index!");
162 }
163
164 _if_index = ifr.ifr_ifindex;
165 db<SocketEngine>(INF) << "[SocketEngine] if_index setted: " << _if_index << "\n";
166
167 // 4. Getting MAC address
168 std::memset(&ifr, 0, sizeof(ifr));
169 std::strncpy(ifr.ifr_name, INTERFACE(), IFNAMSIZ);
170
171 if (ioctl(_sock_fd, SIOCGIFHWADDR, &ifr) < 0) {
172 perror("ioctl SIOCGIFHWADDR");
173 throw std::runtime_error("Failed to retrieve MAC address!");
174 }
175
176 std::memcpy(_mac_address.bytes, ifr.ifr_hwaddr.sa_data, Ethernet::MAC_SIZE);
177 db<SocketEngine>(INF) << "[SocketEngine] MAC address setted: " << Ethernet::mac_to_string(_mac_address) << "\n";
178
179 // 5. Bind socket to interface
180 struct sockaddr_ll sll;
181 std::memset(&sll, 0, sizeof(sll));
182 sll.sll_family = AF_PACKET;
183 sll.sll_protocol = htons(ETH_P_ALL);
184 sll.sll_ifindex = _if_index;
185
186 if (bind(_sock_fd, reinterpret_cast<struct sockaddr*>(&sll), sizeof(sll)) < 0) {
187 perror("bind");
188 throw std::runtime_error("Failed to bind SocketEngine::_sock_fd to interface!");
189 }
190
191 db<SocketEngine>(INF) << "[SocketEngine] socket setted\n";
192}
193
194void SocketEngine::setUpEpoll() {
195 db<SocketEngine>(TRC) << "SocketEngine::setUpEpoll() called!\n";
196
197 // 1. Creating epoll
199 if (_ep_fd < 0) {
200 perror("epoll_create1");
201 throw std::runtime_error("Failed to create SocketEngine::_ep_fd!");
202 }
203
204 // 2. Binding socket on epoll
205 struct epoll_event ev;
206 ev.events = EPOLLIN;
207 ev.data.fd = _sock_fd;
208
210 perror("epoll_ctl");
211 throw std::runtime_error("Failed to bind SocketEngine::_sock_fd to epoll!");
212 }
213
214 // 3. Binding stop event on epoll
215 struct epoll_event stop_ev = {};
216 stop_ev.events = EPOLLIN;
217 stop_ev.data.fd = _stop_ev;
218 if (epoll_ctl(_ep_fd, EPOLL_CTL_ADD, _stop_ev, &stop_ev) < 0) {
219 perror("epoll_ctl stop_ev");
220 throw std::runtime_error("Failed to bind SocketEngine::_stop_ev to epoll!");
221 }
222
223 db<SocketEngine>(INF) << "[SocketEngine] epoll setted\n";
224}
225
226int SocketEngine::send(Ethernet::Frame* frame, unsigned int size) {
227 db<SocketEngine>(TRC) << "SocketEngine::send() called!\n";
228
229 // Check for null frame pointer
230 if (frame == nullptr) {
231 db<SocketEngine>(ERR) << "[SocketEngine] Attempted to send null frame pointer\n";
232 return -1;
233 }
234
235 // Check for invalid size
236 if (size == 0 || size < Ethernet::HEADER_SIZE) {
237 db<SocketEngine>(ERR) << "[SocketEngine] Attempted to send frame with invalid size: " << size << "\n";
238 return -1;
239 }
240
241 // Check if engine is running before sending
242 if (!running()) {
243 db<SocketEngine>(ERR) << "[SocketEngine] Attempted to send while engine is stopping/stopped\n";
244 return -1;
245 }
246
247 sockaddr_ll addr = {};
248 addr.sll_family = AF_PACKET;
249 addr.sll_protocol = htons(frame->prot);
250 addr.sll_ifindex = _if_index;
251 addr.sll_halen = Ethernet::MAC_SIZE;
252 std::memcpy(addr.sll_addr, _mac_address.bytes, Ethernet::MAC_SIZE);
253
254 // Make sure protocol field is in network byte order before sending
255 frame->prot = htons(frame->prot);
256
257 int result = sendto(_sock_fd, frame, size, 0, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
258 db<SocketEngine>(INF) << "[SocketEngine] sendto() returned value " << std::to_string(result) << "\n";
259
260 return result;
261}
262
263void* SocketEngine::run(void* arg) {
264 db<SocketEngine>(TRC) << "[SocketEngine] [run()] called!\n";
265
266 SocketEngine* engine = static_cast<SocketEngine*>(arg);
267
268 struct epoll_event events[1024];
269
270 while (engine->running()) {
271 db<SocketEngine>(TRC) << "[SocketEngine] [run()] epoll_wait() called\n";
272 int n = epoll_wait(engine->_ep_fd, events, 1024, -1);
273
274 db<SocketEngine>(TRC) << "[SocketEngine] [run()] epoll event detected\n";
275
276 if (n < 0) {
277 db<SocketEngine>(TRC) << "[SocketEngine] [run()] epoll_wait() returned error: " << errno << "\n";
278 if (errno == EINTR) continue;
279 db<SocketEngine>(TRC) << "[SocketEngine] [run()] epoll_wait() returned error: " << errno << "\n";
280 perror("epoll_wait");
281 break;
282 }
283
284 // Iterates over all events detected by epoll
285 for (int i = 0; i < n; ++i) {
286 db<SocketEngine>(TRC) << "[SocketEngine] [run()] epoll event " << i << " detected\n";
287 int fd = events[i].data.fd;
288
289 if (fd == engine->_sock_fd) {
290 db<SocketEngine>(INF) << "[SocketEngine] [run()] epoll socket event detected\n";
291 engine->receive();
292 db<SocketEngine>(TRC) << "[SocketEngine] [run()] receive() called\n";
293 } else if (fd == engine->_stop_ev) {
294 db<SocketEngine>(INF) << "[SocketEngine] [run()] epoll stop event detected\n";
295 uint64_t u;
296 read(engine->_stop_ev, &u, sizeof(u)); // clears eventfd
297 db<SocketEngine>(TRC) << "[SocketEngine] [run()] stop event cleared\n";
298 break; // Next loop, thread will finish
299 }
300 }
301 }
302
303 db<SocketEngine>(INF) << "[SocketEngine] [run()] receive thread terminated!\n";
304 return nullptr;
305};
306
307void SocketEngine::receive() {
308 db<SocketEngine>(TRC) << "[SocketEngine] [receive()] called!\n";
309
310 // Checks weather engine is still active
311 if (!running()) {
312 db<SocketEngine>(ERR) << "[SocketEngine] [receive()] called when engine is inactive\n";
313 return;
314 }
315
317 struct sockaddr_ll src_addr;
318 socklen_t addr_len = sizeof(src_addr);
319
320 int bytes_received = recvfrom(this->_sock_fd, &frame, sizeof(frame), 0, reinterpret_cast<sockaddr*>(&src_addr), &addr_len);
321
322 // Checks weather receive was sucessful
323 if (bytes_received < 0) {
324 db<SocketEngine>(INF) << "[SocketEngine] [receive()] no data received\n";
325 if (errno != EAGAIN && errno != EWOULDBLOCK) {
326 perror("recvfrom");
327 }
328 return;
329 }
330
331 // Checks for valid Ethernet frame size (at least header size)
332 if (static_cast<unsigned int>(bytes_received) < Ethernet::HEADER_SIZE) {
333 db<SocketEngine>(ERR) << "[SocketEngine] [receive()] Received undersized frame (" << bytes_received << " bytes)\n";
334 return;
335 }
336
337 // Convert protocol from network to host byte order
338 frame.prot = ntohs(frame.prot);
339 db<SocketEngine>(INF) << "[SocketEngine] [receive()] received frame: {src = " << Ethernet::mac_to_string(frame.src) << ", dst = " << Ethernet::mac_to_string(frame.dst) << ", prot = " << frame.prot << ", size = " << bytes_received << "}\n";
340
341 this->handle(&frame, static_cast<unsigned int>(bytes_received));
342}
343
345 return _running.load(std::memory_order_acquire);
346}
347
351
352#endif // SOCKETENGINE_H
static std::string mac_to_string(Address addr)
Definition ethernet.h:36
static constexpr unsigned int MAC_SIZE
Definition ethernet.h:13
static constexpr unsigned int HEADER_SIZE
Definition ethernet.h:26
Definition socketEngine.h:27
int _sock_fd
Definition socketEngine.h:61
virtual ~SocketEngine()
Definition socketEngine.h:83
Ethernet::Address _mac_address
Definition socketEngine.h:64
int send(Ethernet::Frame *frame, unsigned int size)
Definition socketEngine.h:226
const bool running()
Definition socketEngine.h:344
int _ep_fd
Definition socketEngine.h:62
int _if_index
Definition socketEngine.h:63
SocketEngine()
Definition socketEngine.h:75
static void * run(void *arg)
Definition socketEngine.h:263
const Ethernet::Address & mac_address()
Definition socketEngine.h:348
static const char * INTERFACE()
Definition socketEngine.h:30
void stop()
Definition socketEngine.h:109
void start()
Definition socketEngine.h:93
@ INF
Definition debug.h:208
Select_Debug<(Traits< T >::debugged &&Traits< Debug >::error)> db(Debug_Error l)
Definition debug.h:166
@ ERR
Definition debug.h:162
@ TRC
Definition debug.h:231
@ WRN
Definition debug.h:185
Definition ethernet.h:16
std::uint8_t bytes[MAC_SIZE]
Definition ethernet.h:17
Definition ethernet.h:29
Definition traits.h:45