gwenhywfar  5.14.1
endpoint_msgio.c
Go to the documentation of this file.
1 /****************************************************************************
2  * This file is part of the project Gwenhywfar.
3  * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
4  *
5  * The license for this file can be found in the file COPYING which you
6  * should have received along with this file.
7  ****************************************************************************/
8 
9 #ifdef HAVE_CONFIG_H
10 # include <config.h>
11 #endif
12 
13 /*#define DISABLE_DEBUGLOG*/
14 
15 
16 #include "./endpoint_msgio_p.h"
17 
18 #include <gwenhywfar/debug.h>
19 
20 
21 #define GWEN_ENDPOINT_MSGIO_BUFFERSIZE 1024
22 
23 
24 
25 /* ------------------------------------------------------------------------------------------------
26  * forward declarations
27  * ------------------------------------------------------------------------------------------------
28  */
29 
30 static void GWENHYWFAR_CB _freeData(void *bp, void *p);
31 
32 static void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet);
33 static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet);
34 static int _sendMsgStart(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg);
35 static void _sendMsgFinish(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg);
38 static int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen);
39 
40 
41 
42 /* ------------------------------------------------------------------------------------------------
43  * implementations
44  * ------------------------------------------------------------------------------------------------
45  */
46 
47 GWEN_INHERIT(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO)
48 
49 
50 
52 {
53  GWEN_ENDPOINT_MSGIO *xep;
54 
55  GWEN_NEW_OBJECT(GWEN_ENDPOINT_MSGIO, xep);
56  GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep, xep, _freeData);
57 
58  xep->addSocketsFn=GWEN_MsgEndpoint_SetAddSocketsFn(ep, _addSockets);
59  xep->checkSocketsFn=GWEN_MsgEndpoint_SetCheckSocketsFn(ep, _checkSockets);
60 }
61 
62 
63 
64 void GWENHYWFAR_CB _freeData(void *bp, void *p)
65 {
67  GWEN_ENDPOINT_MSGIO *xep;
68 
69  ep=(GWEN_MSG_ENDPOINT*) bp;
70  xep=(GWEN_ENDPOINT_MSGIO*) p;
71  GWEN_MsgEndpoint_SetCheckSocketsFn(ep, xep->checkSocketsFn);
72  GWEN_FREE_OBJECT(xep);
73 }
74 
75 
76 
78 {
79  if (ep) {
80  GWEN_ENDPOINT_MSGIO *xep;
81 
82  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
83  if (xep)
84  xep->getBytesNeededFn=f;
85  }
86 }
87 
88 
89 
91 {
92  if (ep) {
93  GWEN_ENDPOINT_MSGIO *xep;
94 
95  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
96  if (xep)
97  xep->sendMsgStartFn=f;
98  }
99 }
100 
101 
102 
104 {
105  if (ep) {
106  GWEN_ENDPOINT_MSGIO *xep;
107 
108  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
109  if (xep)
110  xep->sendMsgFinishFn=f;
111  }
112 }
113 
114 
115 
117 {
118  if (ep) {
119  GWEN_ENDPOINT_MSGIO *xep;
120 
121  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
122  if (xep && xep->sendMsgStartFn)
123  return xep->sendMsgStartFn(ep, msg);
124  }
125 
126  return 0;
127 }
128 
129 
130 
132 {
133  if (ep) {
134  GWEN_ENDPOINT_MSGIO *xep;
135 
136  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
137  if (xep && xep->sendMsgStartFn)
138  xep->sendMsgFinishFn(ep, msg);
139  }
140 }
141 
142 
143 
145 {
146  if (ep) {
147  GWEN_ENDPOINT_MSGIO *xep;
148 
149  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
150  if (xep) {
152  GWEN_SOCKET *sk;
153 
155  if (sk) {
156  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Adding socket %d to read set",
159  GWEN_SocketSet_AddSocket(readSet, sk);
161  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Adding socket %d to write set",
164  GWEN_SocketSet_AddSocket(writeSet, sk);
165  }
166  } /* if socket */
167  }
168  else if (xep->addSocketsFn) {
169  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Not connected, calling base function", GWEN_MsgEndpoint_GetName(ep));
170  xep->addSocketsFn(ep, readSet, writeSet, xSet);
171  }
172  } /* if (xep) */
173  } /* if (ep) */
174 }
175 
176 
177 
179 {
180  if (ep) {
181  GWEN_ENDPOINT_MSGIO *xep;
182 
183  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
184  if (xep) {
185  int rv;
186 
188  GWEN_SOCKET *sk;
189 
191  if (sk) {
192  if (GWEN_SocketSet_HasSocket(writeSet, sk)) {
193  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Has socket in write set", GWEN_MsgEndpoint_GetName(ep));
194  rv=_writeCurrentMessage(ep);
195  if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
197  "Endpoint %s: Error writing current message (%d), disconnecting",
199  rv);
201  return;
202  }
203  }
204 
205  if (GWEN_SocketSet_HasSocket(readSet, sk)) {
206  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Has socket in read set", GWEN_MsgEndpoint_GetName(ep));
207  rv=_readCurrentMessage(ep);
208  if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
210  "Endpoint %s: Error reading current message (%d), disconnecting",
212  rv);
214  return;
215  }
216  }
217  }
218  } /* if connected */
219  else if (xep->checkSocketsFn) {
220  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Not connected, calling base function", GWEN_MsgEndpoint_GetName(ep));
221  xep->checkSocketsFn(ep, readSet, writeSet, xSet);
222  }
223  }
224  }
225 }
226 
227 
228 
230 {
231  GWEN_MSG *msg;
232 
233  DBG_DEBUG(GWEN_LOGDOMAIN, "Writing to endpoint %s", GWEN_MsgEndpoint_GetName(ep));
235  if (msg) {
236  uint8_t pos;
237  int remaining;
238  int rv;
239 
240  pos=GWEN_Msg_GetCurrentPos(msg);
241  remaining=GWEN_Msg_GetRemainingBytes(msg);
242  if (pos==0 && remaining>0) {
243  DBG_DEBUG(GWEN_LOGDOMAIN, "Starting to write packet");
244  rv=_sendMsgStart(ep, msg);
245  if (rv<0) {
246  if (rv==GWEN_ERROR_TIMEOUT) {
247  DBG_INFO(GWEN_LOGDOMAIN, "Line busy");
248  return rv;
249  }
250  else {
251  DBG_INFO(GWEN_LOGDOMAIN, "Error starting message (%d)", rv);
252  return rv;
253  }
254  }
255  else {
256  DBG_DEBUG(GWEN_LOGDOMAIN, "Okay to write packet");
257  }
258  }
259  if (remaining>0) {
260  const uint8_t *buf;
261 
262  /* start new message */
263  buf=GWEN_Msg_GetBuffer(msg)+pos;
264  rv=GWEN_MsgEndpoint_WriteToSocket(ep, buf, remaining);
265  if (rv<0) {
266  if (rv==GWEN_ERROR_TIMEOUT)
267  return rv;
268  DBG_ERROR(GWEN_LOGDOMAIN, "Error on write() (%d)", rv);
269  return rv;
270  }
271  GWEN_Msg_IncCurrentPos(msg, rv);
272  if (rv==remaining) {
273  DBG_INFO(GWEN_LOGDOMAIN, "Message completely sent");
274  _sendMsgFinish(ep, msg);
275  /* end current message */
276  GWEN_Msg_List_Del(msg);
277  GWEN_Msg_free(msg);
278  }
279  }
280  }
281  else {
282  DBG_INFO(GWEN_LOGDOMAIN, "Nothing to send");
283  }
284  return 0;
285 }
286 
287 
288 
289 
291 {
292  int rv;
293  uint8_t buffer[GWEN_ENDPOINT_MSGIO_BUFFERSIZE];
294 
295  DBG_DEBUG(GWEN_LOGDOMAIN, "Reading from endpoint %s", GWEN_MsgEndpoint_GetName(ep));
296  rv=GWEN_MsgEndpoint_ReadFromSocket(ep, buffer, sizeof(buffer));
297  if (rv<0) {
298  if (rv==GWEN_ERROR_TIMEOUT) {
299  DBG_DEBUG(GWEN_LOGDOMAIN, "Timeout (%d)", rv);
300  }
301  else {
302  DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
303  }
304  return rv;
305  }
306  else if (rv==0) {
307  DBG_INFO(GWEN_LOGDOMAIN, "EOF met on read()");
308  return GWEN_ERROR_IO;
309  }
310 
311  rv=_distributeBufferContent(ep, buffer, rv);
312  if (rv<0) {
313  DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
314  return rv;
315  }
316 
317  return 0;
318 }
319 
320 
321 
322 int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen)
323 {
324  if (ep) {
325  GWEN_ENDPOINT_MSGIO *xep;
326 
327  xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
328  if (xep) {
329  if (xep->getBytesNeededFn) {
330  GWEN_MSG *msg;
331 
332  DBG_DEBUG(GWEN_LOGDOMAIN, "Distributing %d received bytes", bufferLen);
334  while(bufferLen) {
335  int bytesNeeded;
336 
337  DBG_DEBUG(GWEN_LOGDOMAIN, "%d remaining bytes in buffer", bufferLen);
338  if (msg==NULL) {
339  DBG_DEBUG(GWEN_LOGDOMAIN, "Creating new message");
343  }
344 
345  bytesNeeded=xep->getBytesNeededFn(ep, msg);
346  DBG_DEBUG(GWEN_LOGDOMAIN, "current message still needs %d bytes", bytesNeeded);
347  if (bytesNeeded==0) {
348  /* message finished already before adding bytes?? */
349  DBG_ERROR(GWEN_LOGDOMAIN, "Incoming message complete, SNH!");
350  }
351  else if (bytesNeeded<0) {
352  DBG_ERROR(GWEN_LOGDOMAIN, "Unknown how many bytes needed? SNH! (%d)", bytesNeeded);
353  return GWEN_ERROR_IO;
354  }
355  else {
356  int rv;
357 
358  /* add bytes to message */
359  if (bytesNeeded>bufferLen)
360  bytesNeeded=bufferLen;
361  DBG_DEBUG(GWEN_LOGDOMAIN, "adding %d bytes to current message", bytesNeeded);
362  rv=GWEN_Msg_AddBytes(msg, bufferPtr, bytesNeeded);
363  if (rv<0) {
364  DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
365  return rv;
366  }
367  if (xep->getBytesNeededFn(ep, msg)==0) {
368  /* message finished */
369  DBG_DEBUG(GWEN_LOGDOMAIN, "Incoming message complete");
372  msg=NULL;
373  }
374  bufferPtr+=bytesNeeded;
375  bufferLen-=bytesNeeded;
376  }
377  } /* while */
378 
379  return 0;
380  }
381  else {
382  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Function \"getBytesNeeded\" not set", GWEN_MsgEndpoint_GetName(ep));
383  }
384  } /* if (xep) */
385  } /* if (ep) */
386  return GWEN_ERROR_GENERIC;
387 }
388 
389 
390 
GWEN_SOCKET * GWEN_MsgEndpoint_GetSocket(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:104
int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:444
#define DBG_ERROR(dbg_logger, format,...)
Definition: debug.h:97
void(* GWEN_ENDPOINT_MSGIO_SENDMSGFINISH_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
int GWEN_Msg_IncCurrentPos(GWEN_MSG *msg, uint32_t i)
Definition: msg.c:468
struct GWEN_MSG_ENDPOINT GWEN_MSG_ENDPOINT
Object which can send and receive messages (base class).
Definition: endpoint.h:37
#define DBG_DEBUG(dbg_logger, format,...)
Definition: debug.h:214
int GWEN_Msg_GetRemainingBytes(const GWEN_MSG *msg)
Definition: msg.c:493
static void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet)
void GWEN_MsgEndpoint_AddReceivedMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:221
static int _writeCurrentMessage(GWEN_MSG_ENDPOINT *ep)
void GWEN_Msg_free(GWEN_MSG *msg)
Definition: msg.c:78
int GWEN_Msg_AddBytes(GWEN_MSG *msg, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition: msg.c:193
#define GWEN_FREE_OBJECT(varname)
Definition: memory.h:61
#define NULL
Definition: binreloc.c:300
GWEN_MSG * GWEN_MsgEndpoint_GetCurrentlyReceivedMsg(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:274
void GWEN_Msg_SetGroupId(GWEN_MSG *msg, int groupId)
Definition: msg.c:126
int GWEN_MsgEndpoint_GetGroupId(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:97
#define GWEN_MSG_ENDPOINT_STATE_CONNECTED
Definition: endpoint.h:25
struct GWEN_SOCKETSETSTRUCT GWEN_SOCKETSET
Definition: inetsocket.h:45
#define GWEN_LOGDOMAIN
Definition: logger.h:32
int(* GWEN_ENDPOINT_MSGIO_GETBYTESNEEDED_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
void GWEN_Msg_List_Del(GWEN_MSG *element)
void GWEN_MsgEndpoint_SetCurrentlyReceivedMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:281
#define GWEN_ERROR_IO
Definition: error.h:123
static int _readCurrentMessage(GWEN_MSG_ENDPOINT *ep)
void GWEN_MsgIoEndpoint_SetSendMsgFinishFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_SENDMSGFINISH_FN f)
GWEN_MSG * GWEN_Msg_new(uint32_t bufferSize)
Definition: msg.c:37
int(* GWEN_ENDPOINT_MSGIO_SENDMSGSTART_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
#define GWEN_NEW_OBJECT(typ, varname)
Definition: memory.h:55
GWENHYWFAR_API int GWEN_Socket_GetSocketInt(const GWEN_SOCKET *sp)
static void GWENHYWFAR_CB _freeData(void *bp, void *p)
#define GWENHYWFAR_CB
Definition: gwenhywfarapi.h:89
int GWEN_MsgEndpoint_HaveMessageToSend(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:267
int GWEN_MsgEndpoint_GetState(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:124
#define GWEN_ERROR_GENERIC
Definition: error.h:62
void GWEN_MsgIoEndpoint_SetGetNeededBytesFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_GETBYTESNEEDED_FN f)
GWENHYWFAR_API int GWEN_SocketSet_AddSocket(GWEN_SOCKETSET *ssp, const GWEN_SOCKET *sp)
uint32_t GWEN_Msg_GetCurrentPos(const GWEN_MSG *msg)
Definition: msg.c:176
GWEN_MSG * GWEN_MsgEndpoint_GetFirstSendMessage(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:260
void GWEN_MsgIoEndpoint_SetSendMsgStartFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_SENDMSGSTART_FN f)
void GWEN_MsgEndpoint_Disconnect(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:483
struct GWEN_MSG GWEN_MSG
Definition: msg.h:24
int GWEN_MsgEndpoint_GetDefaultMessageSize(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:185
int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:424
#define GWEN_ENDPOINT_MSGIO_BUFFERSIZE
static int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen)
struct GWEN_SOCKET GWEN_SOCKET
Definition: inetsocket.h:44
#define GWEN_ERROR_TIMEOUT
Definition: error.h:71
GWENHYWFAR_API int GWEN_SocketSet_HasSocket(GWEN_SOCKETSET *ssp, const GWEN_SOCKET *sp)
#define GWEN_INHERIT(bt, t)
Definition: inherit.h:264
#define DBG_INFO(dbg_logger, format,...)
Definition: debug.h:181
const char * GWEN_MsgEndpoint_GetName(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:90
static void _sendMsgFinish(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN GWEN_MsgEndpoint_SetCheckSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN fn)
Definition: endpoint.c:518
#define GWEN_INHERIT_SETDATA(bt, t, element, data, fn)
Definition: inherit.h:300
uint8_t * GWEN_Msg_GetBuffer(GWEN_MSG *msg)
Definition: msg.c:133
#define GWEN_UNUSED
static int _sendMsgStart(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
GWEN_MSG_ENDPOINT_ADDSOCKETS_FN GWEN_MsgEndpoint_SetAddSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_ADDSOCKETS_FN fn)
Definition: endpoint.c:504
#define GWEN_INHERIT_GETDATA(bt, t, element)
Definition: inherit.h:279
void GWEN_MsgIoEndpoint_Extend(GWEN_MSG_ENDPOINT *ep)