Moved restconf_stream.c -> restconf_stream_fcgi.c, made some generaizations and documented what was hardcoded to FCGI

This commit is contained in:
Olof hagsand 2020-06-23 15:00:41 +02:00
parent 73bbcded87
commit e2b3cdb3f6
8 changed files with 148 additions and 94 deletions

View file

@ -94,7 +94,8 @@ APPSRC += restconf_main_$(with_restconf).c
# Fcgi-specific source including main
ifeq ($(with_restconf),fcgi)
APPSRC += restconf_stream.c
# Streams notifications have some fcgi specific handling
APPSRC += restconf_stream_$(with_restconf).c
endif
APPOBJ = $(APPSRC:.c=.o)

View file

@ -206,6 +206,7 @@ restconf_reply_send(void *req0,
FCGX_FPrintF(req->out, "%s", cbuf_get(cb));
FCGX_FPrintF(req->out, "\r\n");
}
FCGX_FFlush(req->out); /* Is this only for notification ? */
retval = 0;
done:
return retval;

View file

@ -492,7 +492,17 @@ main(int argc,
}
}
else if (strncmp(path+1, stream_path, strlen(stream_path)) == 0) {
api_stream(h, req, stream_path, &finish);
char *query = NULL;
cvec *qvec = NULL;
query = restconf_param_get(h, "QUERY_STRING");
if (query != NULL && strlen(query))
if (str2cvec(query, '&', '=', &qvec) < 0)
goto done;
api_stream(h, req, qvec, stream_path, &finish);
if (qvec){
cvec_free(qvec);
qvec = NULL;
}
}
else if (strncmp(path, RESTCONF_WELL_KNOWN, strlen(RESTCONF_WELL_KNOWN)) == 0) {
api_well_known(h, req); /* */
@ -509,7 +519,7 @@ main(int argc,
if (finish)
FCGX_Finish_r(req);
else{ /* A handler is forked so we initiate a new request after instead
of finnishing the old */
of finishing the old */
if (FCGX_InitRequest(req, sock, 0) != 0){
clicon_err(OE_CFG, errno, "FCGX_InitRequest");
goto done;

View file

@ -42,6 +42,6 @@
*/
int stream_child_free(clicon_handle h, int pid);
int stream_child_freeall(clicon_handle h);
int api_stream(clicon_handle h, FCGX_Request *r, char *streampath, int *finish);
int api_stream(clicon_handle h, void *req, cvec *qvec, char *streampath, int *finish);
#endif /* _RESTCONF_STREAM_H_ */

View file

@ -56,6 +56,12 @@
query parameters, defined in Section 4.8. Refer to Appendix B.3.6
for filter parameter examples.
* Note that this implementation includes some hardcoded things for FCGI.
* These are:
* - req->listen_sock is used to register incoming fd events from (nginx) fcgi server
* - The stream_child struct copies the FCGX_Request by value, so FCGX_Free() can be called
* asynchronously
* - In the forked variant, FCGX_Finish_r() and FCGX_Free() are called (minor)
*/
#ifdef HAVE_CONFIG_H
@ -99,13 +105,13 @@
*/
#define STREAM_FORK 1
/* Keep track of children - whjen they exit - their FCGX handle needs to be
/* Keep track of children - when they exit - their FCGX handle needs to be
* freed with FCGX_Free(&rbk, 0);
*/
struct stream_child{
qelem_t sc_q; /* queue header */
int sc_pid; /* Child process id */
FCGX_Request sc_r; /* FCGI stream data */
qelem_t sc_q; /* queue header */
int sc_pid; /* Child process id */
FCGX_Request sc_r; /* FCGI stream data. XXX this is by value */
};
/* Linked list of children
* @note could hang STREAM_CHILD list on clicon handle instead.
@ -127,7 +133,7 @@ stream_child_free(clicon_handle h,
do {
if (pid == sc->sc_pid){
DELQ(sc, STREAM_CHILD, struct stream_child *);
FCGX_Free(&sc->sc_r, 0);
FCGX_Free(&sc->sc_r, 0); /* XXX pointer to actual copied struct */
free(sc);
goto done;
}
@ -145,13 +151,15 @@ stream_child_freeall(clicon_handle h)
while ((sc = STREAM_CHILD) != NULL){
DELQ(sc, STREAM_CHILD, struct stream_child *);
FCGX_Free(&sc->sc_r, 1);
FCGX_Free(&sc->sc_r, 1); /* XXX pointer to actual copied struct */
free(sc);
}
return 0;
}
/*! Callback when stream notifications arrive from backend
* @param[in] s Socket
* @param[in] req Generic Www handle (can be part of clixon handle)
*/
static int
restconf_stream_cb(int s,
@ -229,14 +237,14 @@ restconf_stream_cb(int s,
}
/*! Send subsctription to backend
* @param[in] h Clicon handle
* @param[in] r Fastcgi request handle
* @param[in] name Stream name
* @param[out] sp Socket -1 if not set
* @param[in] h Clicon handle
* @param[in] req Generic Www handle (can be part of clixon handle)
* @param[in] name Stream name
* @param[out] sp Socket -1 if not set
*/
static int
restconf_stream(clicon_handle h,
FCGX_Request *r,
void *req,
char *name,
cvec *qvec,
int pretty,
@ -279,19 +287,22 @@ restconf_stream(clicon_handle h,
if (clicon_rpc_netconf(h, cbuf_get(cb), &xret, &s) < 0)
goto done;
if ((xe = xpath_first(xret, NULL, "rpc-reply/rpc-error")) != NULL){
if (api_return_err(h, r, xe, pretty, media_out, 0) < 0)
if (api_return_err(h, req, xe, pretty, media_out, 0) < 0)
goto done;
goto ok;
}
/* Setting up stream */
FCGX_SetExitStatus(201, r->out); /* Created */
FCGX_FPrintF(r->out, "Status: 201 Created\r\n");
FCGX_FPrintF(r->out, "Content-Type: text/event-stream\r\n");
FCGX_FPrintF(r->out, "Cache-Control: no-cache\r\n");
FCGX_FPrintF(r->out, "Connection: keep-alive\r\n");
FCGX_FPrintF(r->out, "X-Accel-Buffering: no\r\n");
FCGX_FPrintF(r->out, "\r\n");
FCGX_FFlush(r->out);
if (restconf_reply_header(req, "Content-Type", "text/event-stream") < 0)
goto done;
if (restconf_reply_header(req, "Cache-Control", "no-cache") < 0)
goto done;
if (restconf_reply_header(req, "Connection", "keep-alive") < 0)
goto done;
if (restconf_reply_header(req, "X-Accel-Buffering", "no") < 0)
goto done;
if (restconf_reply_send(req, 201, NULL) < 0)
goto done;
*sp = s;
ok:
retval = 0;
@ -308,11 +319,16 @@ restconf_stream(clicon_handle h,
#include "restconf_lib.h"
#include "restconf_stream.h"
/*! Listen sock callback (from proxy?)
* @param[in] s Socket
* @param[in] req Generic Www handle (can be part of clixon handle)
*/
static int
stream_checkuplink(int s,
void *arg)
{
FCGX_Request *r = (FCGX_Request *)arg;
FCGX_Request *r = (FCGX_Request *)arg;
clicon_debug(1, "%s", __FUNCTION__);
if (FCGX_GetError(r->out) != 0){ /* break loop */
clicon_debug(1, "%s FCGX_GetError upstream", __FUNCTION__);
@ -343,78 +359,78 @@ stream_timeout(int s,
return 0;
}
/*! Process a FastCGI request
* @param[in] r Fastcgi request handle
/*! Process a stream request
* @param[in] h Clicon handle
* @param[in] req Generic Www handle (can be part of clixon handle)
* @param[in] qvec Query parameters, ie the ?<id>=<val>&<id>=<val> stuff
* @param[in] streampath URI path for streams, eg /streams, see CLICON_STREAM_PATH
* @param[out] finish Set to zero, if request should not be finnished by upper layer
*/
int
api_stream(clicon_handle h,
FCGX_Request *r,
void *req,
cvec *qvec,
char *streampath,
int *finish)
{
int retval = -1;
char *path;
char *query;
char *method;
char **pvec = NULL;
int pn;
cvec *qvec = NULL;
cvec *dvec = NULL;
cvec *pcvec = NULL; /* for rest api */
cbuf *cb = NULL;
char *data;
int authenticated = 0;
int pretty;
int retval = -1;
FCGX_Request *rfcgi = (FCGX_Request *)req; /* XXX */
char *path;
char *method;
char **pvec = NULL;
int pn;
cvec *pcvec = NULL; /* for rest api */
cbuf *cb = NULL;
char *indata;
int authenticated = 0;
int pretty;
restconf_media media_out = YANG_DATA_XML; /* XXX default */
cbuf *cbret = NULL;
cxobj *xret = NULL;
cxobj *xerr;
int s=-1;
cbuf *cbret = NULL;
cxobj *xret = NULL;
cxobj *xerr;
int s = -1;
#ifdef STREAM_FORK
int pid;
int pid;
struct stream_child *sc;
#endif
clicon_debug(1, "%s", __FUNCTION__);
path = restconf_uripath(h);
query = restconf_param_get(h, "QUERY_STRING");
pretty = clicon_option_bool(h, "CLICON_RESTCONF_PRETTY");
if ((pvec = clicon_strsep(path, "/", &pn)) == NULL)
goto done;
/* Sanity check of path. Should be /stream/<name> */
if (pn != 3){
restconf_notfound(h, r);
restconf_notfound(h, req);
goto ok;
}
if (strlen(pvec[0]) != 0){
retval = restconf_notfound(h, r);
retval = restconf_notfound(h, req);
goto done;
}
if (strcmp(pvec[1], streampath)){
retval = restconf_notfound(h, r);
retval = restconf_notfound(h, req);
goto done;
}
if ((method = pvec[2]) == NULL){
retval = restconf_notfound(h, r);
retval = restconf_notfound(h, req);
goto done;
}
clicon_debug(1, "%s: method=%s", __FUNCTION__, method);
if (str2cvec(query, '&', '=', &qvec) < 0)
goto done;
if (str2cvec(path, '/', '=', &pcvec) < 0) /* rest url eg /album=ricky/foo */
goto done;
/* data */
if ((cb = restconf_get_indata(r)) == NULL)
goto done;
data = cbuf_get(cb);
clicon_debug(1, "%s DATA=%s", __FUNCTION__, data);
if (str2cvec(data, '&', '=', &dvec) < 0)
if ((cb = restconf_get_indata(req)) == NULL)
goto done;
indata = cbuf_get(cb);
clicon_debug(1, "%s DATA=%s", __FUNCTION__, indata);
/* If present, check credentials. See "plugin_credentials" in plugin
* See RFC 8040 section 2.5
*/
if ((authenticated = clixon_plugin_auth_all(h, r)) < 0)
if ((authenticated = clixon_plugin_auth_all(h, req)) < 0)
goto done;
clicon_debug(1, "%s auth:%d %s", __FUNCTION__, authenticated, clicon_username_get(h));
@ -427,22 +443,20 @@ api_stream(clicon_handle h,
if (netconf_access_denied_xml(&xret, "protocol", "The requested URL was unauthorized") < 0)
goto done;
if ((xerr = xpath_first(xret, NULL, "//rpc-error")) != NULL){
if (api_return_err(h, r, xerr, pretty, media_out, 0) < 0)
if (api_return_err(h, req, xerr, pretty, media_out, 0) < 0)
goto done;
goto ok;
}
goto ok;
}
clicon_debug(1, "%s auth2:%d %s", __FUNCTION__, authenticated, clicon_username_get(h));
if (restconf_stream(h, r, method, qvec, pretty, media_out, &s) < 0)
if (restconf_stream(h, req, method, qvec, pretty, media_out, &s) < 0)
goto done;
if (s != -1){
#ifdef STREAM_FORK
if ((pid = fork()) == 0){ /* child */
if (pvec)
free(pvec);
if (dvec)
cvec_free(dvec);
if (qvec)
cvec_free(qvec);
if (pcvec)
@ -457,26 +471,27 @@ api_stream(clicon_handle h,
/* Listen to backend socket */
if (clixon_event_reg_fd(s,
restconf_stream_cb,
(void*)r,
req,
"stream socket") < 0)
goto done;
if (clixon_event_reg_fd(r->listen_sock,
stream_checkuplink,
(void*)r,
"stream socket") < 0)
if (clixon_event_reg_fd(rfcgi->listen_sock,
stream_checkuplink,
req,
"stream socket") < 0)
goto done;
/* Poll upstream errors */
stream_timeout(0, (void*)r);
stream_timeout(0, req);
/* Start loop */
clixon_event_loop();
close(s);
clixon_event_unreg_fd(s, restconf_stream_cb);
clixon_event_unreg_fd(r->listen_sock, restconf_stream_cb);
clixon_event_unreg_timeout(stream_timeout, (void*)r);
clixon_event_unreg_fd(rfcgi->listen_sock,
restconf_stream_cb);
clixon_event_unreg_timeout(stream_timeout, (void*)req);
clicon_exit_reset();
#ifdef STREAM_FORK
FCGX_Finish_r(r);
FCGX_Free(r, 0);
FCGX_Finish_r(rfcgi);
FCGX_Free(rfcgi, 0);
restconf_terminate(h);
exit(0);
}
@ -490,7 +505,8 @@ api_stream(clicon_handle h,
}
memset(sc, 0, sizeof(struct stream_child));
sc->sc_pid = pid;
sc->sc_r = *r;
sc->sc_r = *rfcgi; /* XXX by value */
ADDQ(sc, STREAM_CHILD);
*finish = 0; /* If spawn child, we should not finish this stream */
#endif /* STREAM_FORK */
@ -501,10 +517,6 @@ api_stream(clicon_handle h,
clicon_debug(1, "%s retval:%d", __FUNCTION__, retval);
if (pvec)
free(pvec);
if (dvec)
cvec_free(dvec);
if (qvec)
cvec_free(qvec);
if (pcvec)
cvec_free(pcvec);
if (cb)