gwenhywfar  5.14.1
endpoint.c
Go to the documentation of this file.
1 /****************************************************************************
2  * This file is part of the project Gwenhywfar.
3  * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
4  *
5  * The license for this file can be found in the file COPYING which you
6  * should have received along with this file.
7  ****************************************************************************/
8 
9 #ifdef HAVE_CONFIG_H
10 # include <config.h>
11 #endif
12 
13 /*#define DISABLE_DEBUGLOG*/
14 
15 
16 #include "msgio/endpoint_p.h"
17 
18 #include <gwenhywfar/debug.h>
19 #include <gwenhywfar/text.h>
20 
21 
22 #define GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE 1024
23 
24 
25 
27 GWEN_TREE2_FUNCTIONS(GWEN_MSG_ENDPOINT, GWEN_MsgEndpoint)
28 
29 
30 
31 GWEN_MSG_ENDPOINT *GWEN_MsgEndpoint_new(const char *name, int groupId)
32 {
34 
36  ep->refCount=1;
38  GWEN_TREE2_INIT(GWEN_MSG_ENDPOINT, ep, GWEN_MsgEndpoint);
39 
40  ep->name=strdup(name?name:"<unnamed>");
41  ep->groupId=groupId;
42 
43  ep->receivedMessageList=GWEN_Msg_List_new();
44  ep->sendMessageList=GWEN_Msg_List_new();
45 
46  ep->defaultMessageSize=GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE;
47 
48  return ep;
49 }
50 
51 
52 
54 {
55  if (ep && ep->refCount)
56  ep->refCount++;
57 }
58 
59 
60 
62 {
63  if (ep) {
64  if (ep->refCount>1)
65  ep->refCount--;
66  else {
68  "Deleting endpoint \"%s\" (%d msgs in recv list, %d msgs in send list)",
69  (ep->name)?(ep->name):"<unnamed>",
70  GWEN_Msg_List_GetCount(ep->receivedMessageList),
71  GWEN_Msg_List_GetCount(ep->sendMessageList));
72  GWEN_TREE2_FINI(GWEN_MSG_ENDPOINT, ep, GWEN_MsgEndpoint);
74  if (ep->socket) {
75  GWEN_Socket_Close(ep->socket);
76  GWEN_Socket_free(ep->socket);
77  }
78  GWEN_Msg_free(ep->currentlyReceivedMsg);
79  GWEN_Msg_List_free(ep->receivedMessageList);
80  GWEN_Msg_List_free(ep->sendMessageList);
81  free(ep->name);
82  ep->refCount=0;
83  GWEN_FREE_OBJECT(ep);
84  }
85  }
86 }
87 
88 
89 
91 {
92  return (ep?ep->name:NULL);
93 }
94 
95 
96 
98 {
99  return (ep?ep->groupId:0);
100 }
101 
102 
103 
105 {
106  return (ep?ep->socket:NULL);
107 }
108 
109 
110 
112 {
113  if (ep) {
114  if (ep->socket) {
115  GWEN_Socket_Close(ep->socket);
116  GWEN_Socket_free(ep->socket);
117  }
118  ep->socket=sk;
119  }
120 }
121 
122 
123 
125 {
126  return (ep?ep->state:GWEN_MSG_ENDPOINT_STATE_UNCONNECTED);
127 }
128 
129 
130 
132 {
133  if (ep) {
134  if (ep->state!=m) {
135  ep->timeOfLastStateChange=time(NULL);
136  DBG_INFO(GWEN_LOGDOMAIN, "Changing status of endpoint %s to %d", GWEN_MsgEndpoint_GetName(ep), m);
137  ep->state=m;
138  }
139  }
140 }
141 
142 
143 
145 {
146  return (ep?ep->timeOfLastStateChange:0);
147 }
148 
149 
150 
151 
152 
153 
155 {
156  return (ep?ep->flags:0);
157 }
158 
159 
160 
162 {
163  if (ep)
164  ep->flags=f;
165 }
166 
167 
168 
170 {
171  if (ep)
172  ep->flags|=f;
173 }
174 
175 
176 
178 {
179  if (ep)
180  ep->flags&=~f;
181 }
182 
183 
184 
186 {
187  return ep?ep->defaultMessageSize:0;
188 }
189 
190 
191 
193 {
194  if (ep)
195  ep->defaultMessageSize=i;
196 }
197 
198 
199 
201 {
202  return ep?(++(ep->lastMsgId)):0;
203 }
204 
205 
206 
208 {
209  return (ep?ep->receivedMessageList:NULL);
210 }
211 
212 
213 
215 {
216  return (ep?ep->sendMessageList:NULL);
217 }
218 
219 
220 
222 {
223  if (ep && m) {
225  GWEN_Msg_List_Add(m, ep->receivedMessageList);
226  }
227 }
228 
229 
230 
232 {
233  return ep?GWEN_Msg_List_First(ep->receivedMessageList):NULL;
234 }
235 
236 
237 
239 {
240  GWEN_MSG *msg;
241 
243  if (msg)
244  GWEN_Msg_List_Del(msg);
245  return msg;
246 }
247 
248 
249 
251 {
252  if (ep && m) {
254  GWEN_Msg_List_Add(m, ep->sendMessageList);
255  }
256 }
257 
258 
259 
261 {
262  return (ep?GWEN_Msg_List_First(ep->sendMessageList):NULL);
263 }
264 
265 
266 
268 {
269  return (ep && GWEN_Msg_List_GetCount(ep->sendMessageList)>0)?1:0;
270 }
271 
272 
273 
275 {
276  return (ep?ep->currentlyReceivedMsg:NULL);
277 }
278 
279 
280 
282 {
283  if (ep)
284  ep->currentlyReceivedMsg=m;
285 }
286 
287 
288 
290 {
291  if (ep && ep->addSocketsFn)
292  ep->addSocketsFn(ep, readSet, writeSet, xSet);
293 }
294 
295 
296 
298 {
299  if (ep && ep->checkSocketsFn)
300  ep->checkSocketsFn(ep, readSet, writeSet, xSet);
301 }
302 
303 
304 
306  GWEN_SOCKETSET *readSet,
307  GWEN_SOCKETSET *writeSet,
308  GWEN_SOCKETSET *xSet)
309 {
310  GWEN_MSG_ENDPOINT *epChild;
311 
312  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
313  while(epChild) {
314  GWEN_MsgEndpoint_AddSockets(epChild, readSet, writeSet, xSet);
315  epChild=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
316  }
317 }
318 
319 
320 
322  GWEN_SOCKETSET *readSet,
323  GWEN_SOCKETSET *writeSet,
324  GWEN_SOCKETSET *xSet)
325 {
326  GWEN_MSG_ENDPOINT *epChild;
327 
328  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
329  while(epChild) {
330  GWEN_MsgEndpoint_CheckSockets(epChild, readSet, writeSet, xSet);
331  epChild=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
332  }
333 }
334 
335 
336 
338 {
339  if (ep) {
340  GWEN_MSG_ENDPOINT *epChild;
341 
342  epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep);
343  while(epChild) {
344  GWEN_MSG_ENDPOINT *epNext;
345 
346  epNext=GWEN_MsgEndpoint_Tree2_GetNext(epChild);
349  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Disconnected and empty, removing", GWEN_MsgEndpoint_GetName(epChild));
350  GWEN_MsgEndpoint_Tree2_Unlink(epChild);
351  GWEN_MsgEndpoint_free(epChild);
352  }
353  epChild=epNext;
354  }
355  }
356 }
357 
358 
359 
361 {
362  GWEN_SOCKETSET *readSet;
363  GWEN_SOCKETSET *writeSet;
364  GWEN_SOCKETSET *xSet;
365  int rv;
366 
367  readSet=GWEN_SocketSet_new();
368  writeSet=GWEN_SocketSet_new();
369  xSet=GWEN_SocketSet_new();
370  GWEN_MsgEndpoint_AddSockets(ep, readSet, writeSet, xSet);
371 
372  do {
374  GWEN_SocketSet_GetSocketCount(writeSet)?writeSet:NULL,
376  timeout);
377  } while(rv==GWEN_ERROR_INTERRUPTED);
378  if (rv<0) {
379  if (rv!=GWEN_ERROR_TIMEOUT) {
380  DBG_INFO(GWEN_LOGDOMAIN, "Error on GWEN_Socket_Select: %d", rv);
381  }
382  }
383  else
384  GWEN_MsgEndpoint_CheckSockets(ep, readSet, writeSet, xSet);
385  GWEN_SocketSet_free(xSet);
386  GWEN_SocketSet_free(writeSet);
387  GWEN_SocketSet_free(readSet);
388 }
389 
390 
391 
393 {
394  GWEN_SOCKETSET *readSet;
395  GWEN_SOCKETSET *writeSet;
396  GWEN_SOCKETSET *xSet;
397  int rv;
398 
399  readSet=GWEN_SocketSet_new();
400  writeSet=GWEN_SocketSet_new();
401  xSet=GWEN_SocketSet_new();
402  GWEN_MsgEndpoint_ChildrenAddSockets(ep, readSet, writeSet, xSet);
403 
404  do {
406  GWEN_SocketSet_GetSocketCount(writeSet)?writeSet:NULL,
408  timeout);
409  } while(rv==GWEN_ERROR_INTERRUPTED);
410  if (rv<0) {
411  if (rv!=GWEN_ERROR_TIMEOUT) {
412  DBG_INFO(GWEN_LOGDOMAIN, "Error on GWEN_Socket_Select: %d", rv);
413  }
414  }
415  else
416  GWEN_MsgEndpoint_ChildrenCheckSockets(ep, readSet, writeSet, xSet);
417  GWEN_SocketSet_free(xSet);
418  GWEN_SocketSet_free(writeSet);
419  GWEN_SocketSet_free(readSet);
420 }
421 
422 
423 
424 int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
425 {
426  int len;
427  int rv;
428 
429  len=bufferLen;
430  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Reading from socket", GWEN_MsgEndpoint_GetName(ep));
431  do {
432  rv=GWEN_Socket_Read(ep->socket, (char*) bufferPtr, &len);
433  } while(rv==GWEN_ERROR_INTERRUPTED);
434  if (rv<0) {
435  DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: here (%d)", GWEN_MsgEndpoint_GetName(ep), rv);
436  return rv;
437  }
438  DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Read %d bytes from socket", GWEN_MsgEndpoint_GetName(ep), len);
439  return len;
440 }
441 
442 
443 
444 int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
445 {
446  int len;
447  int rv;
448 
449  len=bufferLen;
450  do {
451  rv=GWEN_Socket_Write(ep->socket, (const char*) bufferPtr, &len);
452  } while(rv==GWEN_ERROR_INTERRUPTED);
453  if (rv<0)
454  return rv;
455  return len;
456 }
457 
458 
459 
461 {
462  int rv;
463  uint8_t buffer[64];
464 
465  do {
466  rv=GWEN_MsgEndpoint_ReadFromSocket(ep, buffer, sizeof(buffer));
467  } while(rv>0);
468  if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
469  DBG_INFO(GWEN_LOGDOMAIN, "Error on read(): %d", rv);
470  return rv;
471  }
472  else if (rv==0) {
473  DBG_INFO(GWEN_LOGDOMAIN, "EOF met on read()");
474 #if 0
475  return GWEN_ERROR_IO;
476 #endif
477  }
478  return 0;
479 }
480 
481 
482 
484 {
485  if (ep) {
486  DBG_INFO(GWEN_LOGDOMAIN, "Disconnecting endpoint");
487  if (ep->socket) {
488  DBG_INFO(GWEN_LOGDOMAIN, "Disconnecting socket");
489  GWEN_Socket_Close(ep->socket);
490  GWEN_Socket_free(ep->socket);
491  ep->socket=NULL;
492  }
494  }
495 }
496 
497 
498 
499 
500 
501 
502 
503 
505 {
506  if (ep) {
508 
509  o=ep->addSocketsFn;
510  ep->addSocketsFn=fn;
511  return o;
512  }
513  return NULL;
514 }
515 
516 
517 
519 {
520  if (ep) {
522 
523  o=ep->checkSocketsFn;
524  ep->checkSocketsFn=fn;
525  return o;
526  }
527  return NULL;
528 }
529 
530 
531 
#define GWEN_MSG_ENDPOINT_STATE_UNCONNECTED
Definition: endpoint.h:23
GWEN_SOCKET * GWEN_MsgEndpoint_GetSocket(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:104
void(* GWEN_MSG_ENDPOINT_ADDSOCKETS_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.h:183
int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:444
GWENHYWFAR_API void GWEN_SocketSet_free(GWEN_SOCKETSET *ssp)
void GWEN_MsgEndpoint_SetFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:161
#define GWEN_INHERIT_FINI(t, element)
Definition: inherit.h:238
struct GWEN_MSG_ENDPOINT GWEN_MSG_ENDPOINT
Object which can send and receive messages (base class).
Definition: endpoint.h:37
GWENHYWFAR_API int GWEN_Socket_Write(GWEN_SOCKET *sp, const char *buffer, int *bsize)
#define DBG_DEBUG(dbg_logger, format,...)
Definition: debug.h:214
void GWEN_MsgEndpoint_AddReceivedMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:221
void GWEN_MsgEndpoint_AddSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:289
void GWEN_Msg_free(GWEN_MSG *msg)
Definition: msg.c:78
#define GWEN_FREE_OBJECT(varname)
Definition: memory.h:61
#define NULL
Definition: binreloc.c:300
GWEN_MSG * GWEN_MsgEndpoint_GetCurrentlyReceivedMsg(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:274
int GWEN_MsgEndpoint_DiscardInput(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:460
GWEN_MSG_LIST * GWEN_Msg_List_new()
uint32_t GWEN_MsgEndpoint_GetFlags(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:154
void GWEN_MsgEndpoint_Attach(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:53
int GWEN_MsgEndpoint_GetGroupId(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:97
struct GWEN_SOCKETSETSTRUCT GWEN_SOCKETSET
Definition: inetsocket.h:45
GWEN_MSG * GWEN_Msg_List_First(const GWEN_MSG_LIST *l)
#define GWEN_LOGDOMAIN
Definition: logger.h:32
int GWEN_Msg_RewindCurrentPos(GWEN_MSG *msg)
Definition: msg.c:482
void GWEN_MsgEndpoint_SetDefaultMessageSize(GWEN_MSG_ENDPOINT *ep, int i)
Definition: endpoint.c:192
void GWEN_MsgEndpoint_ChildrenIoLoop(GWEN_MSG_ENDPOINT *ep, int timeout)
Definition: endpoint.c:392
void GWEN_Msg_List_Del(GWEN_MSG *element)
GWENHYWFAR_API int GWEN_Socket_Read(GWEN_SOCKET *sp, char *buffer, int *bsize)
void GWEN_MsgEndpoint_SetCurrentlyReceivedMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:281
#define GWEN_ERROR_IO
Definition: error.h:123
#define GWEN_ERROR_INTERRUPTED
Definition: error.h:74
GWEN_MSG_ENDPOINT * GWEN_MsgEndpoint_new(const char *name, int groupId)
Definition: endpoint.c:31
GWENHYWFAR_API int GWEN_SocketSet_GetSocketCount(GWEN_SOCKETSET *ssp)
#define GWEN_TREE2_FINI(t, element, pr)
Definition: tree2.h:457
time_t GWEN_MsgEndpoint_GetTimeOfLastStateChange(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:144
#define GWEN_MSG_ENDPOINT_DEFAULT_MSGSIZE
Definition: endpoint.c:22
void GWEN_MsgEndpoint_SetSocket(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk)
Definition: endpoint.c:111
#define GWEN_NEW_OBJECT(typ, varname)
Definition: memory.h:55
GWEN_MSG_LIST * GWEN_MsgEndpoint_GetReceivedMessageList(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:207
void GWEN_MsgEndpoint_free(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:61
int GWEN_MsgEndpoint_HaveMessageToSend(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:267
void GWEN_MsgEndpoint_IoLoop(GWEN_MSG_ENDPOINT *ep, int timeout)
Definition: endpoint.c:360
int GWEN_MsgEndpoint_GetState(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:124
#define GWEN_INHERIT_INIT(t, element)
Definition: inherit.h:223
GWENHYWFAR_API GWEN_SOCKETSET * GWEN_SocketSet_new(void)
GWENHYWFAR_API int GWEN_Socket_Close(GWEN_SOCKET *sp)
GWEN_TREE2_FUNCTIONS(GWEN_JSON_ELEM, GWEN_JsonElement)
GWEN_MSG * GWEN_MsgEndpoint_GetFirstSendMessage(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:260
void GWEN_MsgEndpoint_AddSendMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition: endpoint.c:250
GWEN_MSG * GWEN_MsgEndpoint_TakeFirstReceivedMessage(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:238
void GWEN_MsgEndpoint_RemoveUnconnectedAndEmptyChildren(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:337
void GWEN_MsgEndpoint_DelFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:177
void GWEN_MsgEndpoint_Disconnect(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:483
struct GWEN_MSG GWEN_MSG
Definition: msg.h:24
GWENHYWFAR_API int GWEN_Socket_Select(GWEN_SOCKETSET *rs, GWEN_SOCKETSET *ws, GWEN_SOCKETSET *xs, int timeout)
int GWEN_MsgEndpoint_GetDefaultMessageSize(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:185
int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
Definition: endpoint.c:424
void(* GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.h:188
void GWEN_MsgEndpoint_CheckSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:297
void GWEN_Msg_List_free(GWEN_MSG_LIST *l)
#define GWEN_TREE2_INIT(t, element, pr)
Definition: tree2.h:445
void GWEN_MsgEndpoint_ChildrenCheckSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:321
struct GWEN_SOCKET GWEN_SOCKET
Definition: inetsocket.h:44
#define GWEN_ERROR_TIMEOUT
Definition: error.h:71
void GWEN_MsgEndpoint_SetState(GWEN_MSG_ENDPOINT *ep, int m)
Definition: endpoint.c:131
#define DBG_INFO(dbg_logger, format,...)
Definition: debug.h:181
const char * GWEN_MsgEndpoint_GetName(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:90
uint32_t GWEN_Msg_List_GetCount(const GWEN_MSG_LIST *l)
GWEN_MSG * GWEN_MsgEndpoint_GetFirstReceivedMessage(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:231
void GWEN_MsgEndpoint_ChildrenAddSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
Definition: endpoint.c:305
void GWEN_MsgEndpoint_AddFlags(GWEN_MSG_ENDPOINT *ep, uint32_t f)
Definition: endpoint.c:169
GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN GWEN_MsgEndpoint_SetCheckSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN fn)
Definition: endpoint.c:518
uint32_t GWEN_MsgEndpoint_GetNextMessageId(GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:200
#define GWEN_INHERIT_FUNCTIONS(t)
Definition: inherit.h:163
GWENHYWFAR_API void GWEN_Socket_free(GWEN_SOCKET *sp)
void GWEN_Msg_List_Add(GWEN_MSG *element, GWEN_MSG_LIST *list)
GWEN_MSG_LIST * GWEN_MsgEndpoint_GetSendMessageList(const GWEN_MSG_ENDPOINT *ep)
Definition: endpoint.c:214
GWEN_MSG_ENDPOINT_ADDSOCKETS_FN GWEN_MsgEndpoint_SetAddSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_ADDSOCKETS_FN fn)
Definition: endpoint.c:504