Restconf native+http/1 + tls

Added command-line timeout -t <sec> to restconf
Example: Added programmable timeout to backend example
Test: updated for fcgi and native using internal timeouts
This commit is contained in:
Olof hagsand 2024-05-06 15:57:11 +02:00
parent 2b2a2ec1ad
commit 62a4b5feff
17 changed files with 577 additions and 318 deletions

View file

@ -5,6 +5,7 @@ on:
branches: branches:
- master - master
- test-actions - test-actions
- restconf-native-stream
pull_request: pull_request:
branches: [ master ] branches: [ master ]

View file

@ -67,6 +67,9 @@
#include "clixon_http1_parse.h" #include "clixon_http1_parse.h"
#include "restconf_http1.h" #include "restconf_http1.h"
#include "clixon_http_data.h" #include "clixon_http_data.h"
#ifdef RESTCONF_NATIVE_STREAM
#include "restconf_stream.h"
#endif
/* Size of xml read buffer */ /* Size of xml read buffer */
#define BUFLEN 1024 #define BUFLEN 1024
@ -310,7 +313,7 @@ restconf_http1_reply(restconf_conn *rc,
* (Successful) response to a CONNECT request (Section 4.3.6 of * (Successful) response to a CONNECT request (Section 4.3.6 of
* [RFC7231]). * [RFC7231]).
*/ */
if (sd->sd_code != 204 && sd->sd_code > 199) if (sd->sd_code != 204 && sd->sd_code > 199 && !rc->rc_event_stream)
if (restconf_reply_header(sd, "Content-Length", "%zu", sd->sd_body_len) < 0) if (restconf_reply_header(sd, "Content-Length", "%zu", sd->sd_body_len) < 0)
goto done; goto done;
/* Create reply and write headers */ /* Create reply and write headers */
@ -450,6 +453,13 @@ restconf_http1_path_root(clixon_handle h,
if (api_http_data(h, sd, sd->sd_qvec) < 0) if (api_http_data(h, sd, sd->sd_qvec) < 0)
goto done; goto done;
} }
#ifdef RESTCONF_NATIVE_STREAM
else if (api_path_is_stream(h)){
restconf_socket *rs = rc->rc_socket;
if (api_stream(h, sd, sd->sd_qvec, rs->rs_stream_timeout, NULL) < 0)
goto done;
}
#endif
else else
sd->sd_code = 404; /* catch all without body/media */ sd->sd_code = 404; /* catch all without body/media */
fail: fail:

View file

@ -88,7 +88,7 @@
#include "restconf_stream.h" #include "restconf_stream.h"
/* Command line options to be passed to getopt(3) */ /* Command line options to be passed to getopt(3) */
#define RESTCONF_OPTS "hVD:f:E:l:C:p:d:y:a:u:rW:R:o:" #define RESTCONF_OPTS "hVD:f:E:l:C:p:d:y:a:u:rW:R:t:o:"
/*! Convert FCGI parameters to clixon runtime data /*! Convert FCGI parameters to clixon runtime data
* *
@ -285,6 +285,7 @@ usage(clixon_handle h,
"\t-r \t\t Do not drop privileges if run as root\n" "\t-r \t\t Do not drop privileges if run as root\n"
"\t-W <user>\t Run restconf daemon as this user, drop according to CLICON_RESTCONF_PRIVILEGES\n" "\t-W <user>\t Run restconf daemon as this user, drop according to CLICON_RESTCONF_PRIVILEGES\n"
"\t-R <xml> \t Restconf configuration in-line overriding config file\n" "\t-R <xml> \t Restconf configuration in-line overriding config file\n"
"\t-t <sec>\t Notification stream timeout in: quit after <sec>. For debug\n"
"\t-o \"<option>=<value>\" Give configuration option overriding config file (see clixon-config.yang)\n", "\t-o \"<option>=<value>\" Give configuration option overriding config file (see clixon-config.yang)\n",
argv0 argv0
); );
@ -327,6 +328,7 @@ main(int argc,
int config_dump = 0; int config_dump = 0;
enum format_enum config_dump_format = FORMAT_XML; enum format_enum config_dump_format = FORMAT_XML;
int print_version = 0; int print_version = 0;
int stream_timeout = 0;
/* Create handle */ /* Create handle */
if ((h = restconf_handle_init()) == NULL) if ((h = restconf_handle_init()) == NULL)
@ -445,6 +447,9 @@ main(int argc,
case 'R': /* Restconf on-line config */ case 'R': /* Restconf on-line config */
inline_config = optarg; inline_config = optarg;
break; break;
case 't': /* Stream timeout */
stream_timeout = atoi(optarg);
break;
case 'o':{ /* Configuration option */ case 'o':{ /* Configuration option */
char *val; char *val;
if ((val = index(optarg, '=')) == NULL) if ((val = index(optarg, '=')) == NULL)
@ -675,7 +680,7 @@ main(int argc,
if (uri_str2cvec(query, '&', '=', 1, &qvec) < 0) if (uri_str2cvec(query, '&', '=', 1, &qvec) < 0)
goto done; goto done;
/* XXX doing goto done on error causes test errors */ /* XXX doing goto done on error causes test errors */
(void)api_stream(h, req, qvec, &finish); (void)api_stream(h, req, qvec, stream_timeout, &finish);
} }
else{ else{
clixon_debug(CLIXON_DBG_RESTCONF, "top-level %s not found", path); clixon_debug(CLIXON_DBG_RESTCONF, "top-level %s not found", path);

View file

@ -159,7 +159,7 @@
#endif #endif
/* Command line options to be passed to getopt(3) */ /* Command line options to be passed to getopt(3) */
#define RESTCONF_OPTS "hVD:f:E:l:C:p:y:a:u:rW:R:o:" #define RESTCONF_OPTS "hVD:f:E:l:C:p:y:a:u:rW:R:t:o:"
/* If set, open outwards socket non-blocking, as opposed to blocking /* If set, open outwards socket non-blocking, as opposed to blocking
* Should work both ways, but in the ninblocking case, * Should work both ways, but in the ninblocking case,
@ -713,13 +713,15 @@ restconf_clixon_backend(clixon_handle h,
* @param[in] h Clixon handle * @param[in] h Clixon handle
* @param[in] xs XML config of single restconf socket * @param[in] xs XML config of single restconf socket
* @param[in] nsc Namespace context * @param[in] nsc Namespace context
* @param[in] timeout Terminate notification event stream after number of seconds
* @retval 0 OK * @retval 0 OK
* @retval -1 Error * @retval -1 Error
*/ */
static int static int
openssl_init_socket(clixon_handle h, openssl_init_socket(clixon_handle h,
cxobj *xs, cxobj *xs,
cvec *nsc) cvec *nsc,
int timeout)
{ {
int retval = -1; int retval = -1;
char *netns = NULL; char *netns = NULL;
@ -742,6 +744,7 @@ openssl_init_socket(clixon_handle h,
} }
memset(rsock, 0, sizeof *rsock); memset(rsock, 0, sizeof *rsock);
rsock->rs_h = h; rsock->rs_h = h;
rsock->rs_stream_timeout = timeout;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
rsock->rs_start = now.tv_sec; rsock->rs_start = now.tv_sec;
/* Extract socket parameters from single socket config: ns, addr, port, ssl */ /* Extract socket parameters from single socket config: ns, addr, port, ssl */
@ -808,13 +811,15 @@ openssl_init_socket(clixon_handle h,
* @param[in] h Clixon handle * @param[in] h Clixon handle
* @param[in] dbg0 Manually set debug flag, if set overrides configuration setting * @param[in] dbg0 Manually set debug flag, if set overrides configuration setting
* @param[in] xrestconf XML tree containing restconf config * @param[in] xrestconf XML tree containing restconf config
* @param[in] timeout Terminate notification stream after number of seconds
* @retval 0 OK * @retval 0 OK
* @retval -1 Error * @retval -1 Error
*/ */
int int
restconf_openssl_init(clixon_handle h, restconf_openssl_init(clixon_handle h,
int dbg0, int dbg0,
cxobj *xrestconf) cxobj *xrestconf,
int timeout)
{ {
int retval = -1; int retval = -1;
SSL_CTX *ctx; /* SSL context */ SSL_CTX *ctx; /* SSL context */
@ -851,6 +856,8 @@ restconf_openssl_init(clixon_handle h,
if ((x = xpath_first(xrestconf, nsc, "enable-core-dump")) != NULL) { if ((x = xpath_first(xrestconf, nsc, "enable-core-dump")) != NULL) {
/* core dump is enabled on RESTCONF process */ /* core dump is enabled on RESTCONF process */
struct rlimit rlp; struct rlimit rlp;
int status;
if (strcmp(xml_body(x), "true") == 0) { if (strcmp(xml_body(x), "true") == 0) {
rlp.rlim_cur = RLIM_INFINITY; rlp.rlim_cur = RLIM_INFINITY;
rlp.rlim_max = RLIM_INFINITY; rlp.rlim_max = RLIM_INFINITY;
@ -858,12 +865,11 @@ restconf_openssl_init(clixon_handle h,
rlp.rlim_cur = 0; rlp.rlim_cur = 0;
rlp.rlim_max = 0; rlp.rlim_max = 0;
} }
int status = setrlimit(RLIMIT_CORE, &rlp); status = setrlimit(RLIMIT_CORE, &rlp);
if (status != 0) { if (status != 0) {
clixon_log(h, LOG_INFO, "%s: setrlimit() failed, %s", __FUNCTION__, strerror(errno)); clixon_log(h, LOG_INFO, "%s: setrlimit() failed, %s", __FUNCTION__, strerror(errno));
} }
} }
if (init_openssl() < 0) if (init_openssl() < 0)
goto done; goto done;
if ((ctx = restconf_ssl_context_create(h)) == NULL) if ((ctx = restconf_ssl_context_create(h)) == NULL)
@ -886,7 +892,7 @@ restconf_openssl_init(clixon_handle h,
if (xpath_vec(xrestconf, nsc, "socket", &vec, &veclen) < 0) if (xpath_vec(xrestconf, nsc, "socket", &vec, &veclen) < 0)
goto done; goto done;
for (i=0; i<veclen; i++){ for (i=0; i<veclen; i++){
if (openssl_init_socket(h, vec[i], nsc) < 0){ if (openssl_init_socket(h, vec[i], nsc, timeout) < 0){
/* Bind errors are ignored, proceed with next after log */ /* Bind errors are ignored, proceed with next after log */
if (clixon_err_category() == OE_UNIX && clixon_err_subnr() == EADDRNOTAVAIL) if (clixon_err_category() == OE_UNIX && clixon_err_subnr() == EADDRNOTAVAIL)
clixon_err_reset(); clixon_err_reset();
@ -1131,7 +1137,8 @@ usage(clixon_handle h,
"\t-r \t\t Do not drop privileges if run as root\n" "\t-r \t\t Do not drop privileges if run as root\n"
"\t-W <user>\t Run restconf daemon as this user, drop according to CLICON_RESTCONF_PRIVILEGES\n" "\t-W <user>\t Run restconf daemon as this user, drop according to CLICON_RESTCONF_PRIVILEGES\n"
"\t-R <xml>\t Restconf configuration in-line overriding config file\n" "\t-R <xml>\t Restconf configuration in-line overriding config file\n"
"\t-o <option>=<value> Set configuration option overriding config file (see clixon-config.yang)\n" "\t-t <sec>\t Notification stream timeout in: quit after <sec>. For debug\n"
"\t-o <op>=<value>\t Set configuration option overriding config file (see clixon-config.yang)\n"
, ,
argv0 argv0
); );
@ -1160,6 +1167,7 @@ main(int argc,
int config_dump = 0; int config_dump = 0;
enum format_enum config_dump_format = FORMAT_XML; enum format_enum config_dump_format = FORMAT_XML;
int print_version = 0; int print_version = 0;
int stream_timeout = 0;
/* Create handle */ /* Create handle */
if ((h = restconf_handle_init()) == NULL) if ((h = restconf_handle_init()) == NULL)
@ -1297,6 +1305,9 @@ main(int argc,
case 'R': /* Restconf on-line config */ case 'R': /* Restconf on-line config */
inline_config = optarg; inline_config = optarg;
break; break;
case 't': /* Stream timeout */
stream_timeout = atoi(optarg);
break;
case 'o':{ /* Configuration option */ case 'o':{ /* Configuration option */
char *val; char *val;
if ((val = index(optarg, '=')) == NULL) if ((val = index(optarg, '=')) == NULL)
@ -1352,7 +1363,7 @@ main(int argc,
if (restconf_native_handle_set(h, rn) < 0) if (restconf_native_handle_set(h, rn) < 0)
goto done; goto done;
/* Openssl inits */ /* Openssl inits */
if (restconf_openssl_init(h, dbg, xrestconf) < 0) if (restconf_openssl_init(h, dbg, xrestconf, stream_timeout) < 0)
goto done; goto done;
/* Drop privileges if started as root to CLICON_RESTCONF_USER /* Drop privileges if started as root to CLICON_RESTCONF_USER
* and use drop mode: CLICON_RESTCONF_PRIVILEGES * and use drop mode: CLICON_RESTCONF_PRIVILEGES

View file

@ -76,6 +76,9 @@
#ifdef HAVE_HTTP1 #ifdef HAVE_HTTP1
#include "restconf_http1.h" #include "restconf_http1.h"
#endif #endif
#ifdef RESTCONF_NATIVE_STREAM
#include "restconf_stream.h"
#endif
/* Forward */ /* Forward */
static int restconf_idle_cb(int fd, void *arg); static int restconf_idle_cb(int fd, void *arg);
@ -1041,6 +1044,12 @@ restconf_connection(int s,
/*----------------------------- Close socket ------------------------------*/ /*----------------------------- Close socket ------------------------------*/
static int
restconf_idle_timer_unreg(restconf_conn *rc)
{
return clixon_event_unreg_timeout(restconf_idle_cb, rc);
}
/*! Close Restconf native connection socket and unregister callback /*! Close Restconf native connection socket and unregister callback
* *
* For callhome also start reconnect timer * For callhome also start reconnect timer
@ -1072,6 +1081,11 @@ restconf_connection_close1(restconf_conn *rc)
if (restconf_callhome_timer(rsock, 1) < 0) if (restconf_callhome_timer(rsock, 1) < 0)
goto done; goto done;
} }
#ifdef RESTCONF_NATIVE_STREAM
if (rc->rc_event_stream){
stream_close(rc->rc_h, rc);
}
#endif
retval = 0; retval = 0;
done: done:
clixon_debug(CLIXON_DBG_RESTCONF, "retval:%d", retval); clixon_debug(CLIXON_DBG_RESTCONF, "retval:%d", retval);
@ -1556,12 +1570,6 @@ restconf_idle_cb(int fd,
return retval; return retval;
} }
int
restconf_idle_timer_unreg(restconf_conn *rc)
{
return clixon_event_unreg_timeout(restconf_idle_cb, rc);
}
/*! Set callhome periodic idle-timeout /*! Set callhome periodic idle-timeout
* *
* 1) If callhome and periodic, set timer for t0+idle-timeout(ti) * 1) If callhome and periodic, set timer for t0+idle-timeout(ti)
@ -1573,7 +1581,7 @@ restconf_idle_timer_unreg(restconf_conn *rc)
* XXX: now just timeout dont keep track of data (td) * XXX: now just timeout dont keep track of data (td)
* @see restconf_idle_timer_unreg * @see restconf_idle_timer_unreg
*/ */
int static int
restconf_idle_timer(restconf_conn *rc) restconf_idle_timer(restconf_conn *rc)
{ {
int retval = -1; int retval = -1;

View file

@ -32,7 +32,7 @@
***** END LICENSE BLOCK ***** ***** END LICENSE BLOCK *****
* *
* Data structures: * Data structures: (NOT UPDATED)
* 1 1 * 1 1
* +--------------------+ restconf_handle_get +--------------------+ * +--------------------+ restconf_handle_get +--------------------+
* | rn restconf_native | <--------------------- | h clixon_handle | * | rn restconf_native | <--------------------- | h clixon_handle |
@ -71,7 +71,7 @@ extern "C" {
/* Forward */ /* Forward */
struct restconf_conn; struct restconf_conn;
/* session stream struct, mainly for http/2 but http/1 has a single pseudo-stream with id=0 /* Session stream struct, mainly for http/2 but http/1 has a single pseudo-stream with id=0
*/ */
typedef struct { typedef struct {
qelem_t sd_qelem; /* List header */ qelem_t sd_qelem; /* List header */
@ -121,6 +121,9 @@ typedef struct restconf_conn {
restconf_socket *rc_socket; /* Backpointer to restconf_socket needed for callhome */ restconf_socket *rc_socket; /* Backpointer to restconf_socket needed for callhome */
struct timeval rc_t; /* Timestamp of last read/write activity, used by callhome struct timeval rc_t; /* Timestamp of last read/write activity, used by callhome
idle-timeout algorithm */ idle-timeout algorithm */
#ifdef RESTCONF_NATIVE_STREAM
int rc_event_stream; /* Event notification stream socket (maybe in sd?) */
#endif
} restconf_conn; } restconf_conn;
/* Restconf per socket handle /* Restconf per socket handle
@ -156,7 +159,7 @@ typedef struct restconf_socket{
*/ */
restconf_conn *rs_conns; /* List of transient connect sockets */ restconf_conn *rs_conns; /* List of transient connect sockets */
char *rs_from_addr; /* From IP address as seen by accept (mv to rc?) */ char *rs_from_addr; /* From IP address as seen by accept (mv to rc?) */
int rs_stream_timeout; /* Close stream after <s> (debug) */
} restconf_socket; } restconf_socket;
/* Restconf handle /* Restconf handle
@ -182,8 +185,6 @@ int restconf_connection_sanity(clixon_handle h, restconf_conn *rc,
restconf_native_handle *restconf_native_handle_get(clixon_handle h); restconf_native_handle *restconf_native_handle_get(clixon_handle h);
int restconf_connection(int s, void *arg); int restconf_connection(int s, void *arg);
int restconf_ssl_accept_client(clixon_handle h, int s, restconf_socket *rsock, restconf_conn **rcp); int restconf_ssl_accept_client(clixon_handle h, int s, restconf_socket *rsock, restconf_conn **rcp);
int restconf_idle_timer_unreg(restconf_conn *rc);
int restconf_idle_timer(restconf_conn *rc);
int restconf_callhome_timer_unreg(restconf_socket *rsock); int restconf_callhome_timer_unreg(restconf_socket *rsock);
int restconf_callhome_timer(restconf_socket *rsock, int status); int restconf_callhome_timer(restconf_socket *rsock, int status);
int restconf_socket_extract(clixon_handle h, cxobj *xs, cvec *nsc, restconf_socket *rsock, int restconf_socket_extract(clixon_handle h, cxobj *xs, cvec *nsc, restconf_socket *rsock,

View file

@ -343,7 +343,8 @@ restconf_nghttp2_path(restconf_stream_data *sd)
} }
#ifdef RESTCONF_NATIVE_STREAM #ifdef RESTCONF_NATIVE_STREAM
else if (api_path_is_stream(h)){ else if (api_path_is_stream(h)){
if (api_stream(h, sd, sd->sd_qvec, NULL) < 0) restconf_socket *rs = rc->rc_socket;
if (api_stream(h, sd, sd->sd_qvec, rs->rs_stream_timeout, NULL) < 0)
goto done; goto done;
} }
#endif #endif

View file

@ -179,14 +179,18 @@ restconf_subscription(clixon_handle h,
goto ok; goto ok;
} }
/* Setting up stream */ /* Setting up stream */
if (restconf_reply_header(req, "Server", "clixon") < 0)
goto done;
if (restconf_reply_header(req, "Content-Type", "text/event-stream") < 0) if (restconf_reply_header(req, "Content-Type", "text/event-stream") < 0)
goto done; goto done;
if (restconf_reply_header(req, "Cache-Control", "no-cache") < 0) if (restconf_reply_header(req, "Cache-Control", "no-cache") < 0)
goto done; goto done;
if (restconf_reply_header(req, "Connection", "keep-alive") < 0) if (restconf_reply_header(req, "Connection", "keep-alive") < 0)
goto done; goto done;
#ifndef RESTCONF_NATIVE_STREAM
if (restconf_reply_header(req, "X-Accel-Buffering", "no") < 0) if (restconf_reply_header(req, "X-Accel-Buffering", "no") < 0)
goto done; goto done;
#endif
if (restconf_reply_send(req, 201, NULL, 0) < 0) if (restconf_reply_send(req, 201, NULL, 0) < 0)
goto done; goto done;
*sp = s; *sp = s;

View file

@ -44,6 +44,7 @@ int api_path_is_stream(clixon_handle h);
int restconf_subscription(clixon_handle h, void *req, char *name, cvec *qvec, int pretty, restconf_media media_out, int *sp); int restconf_subscription(clixon_handle h, void *req, char *name, cvec *qvec, int pretty, restconf_media media_out, int *sp);
int stream_child_free(clixon_handle h, int pid); int stream_child_free(clixon_handle h, int pid);
int stream_child_freeall(clixon_handle h); int stream_child_freeall(clixon_handle h);
int api_stream(clixon_handle h, void *req, cvec *qvec, int *finish); int stream_close(clixon_handle h, void *req);
int api_stream(clixon_handle h, void *req, cvec *qvec, int timeout, int *finish);
#endif /* _RESTCONF_STREAM_H_ */ #endif /* _RESTCONF_STREAM_H_ */

View file

@ -96,6 +96,8 @@
#include "restconf_api.h" #include "restconf_api.h"
#include "restconf_err.h" #include "restconf_err.h"
#include "restconf_stream.h" #include "restconf_stream.h"
#include "restconf_lib.h"
#include "restconf_stream.h"
/* /*
* Constants * Constants
@ -185,9 +187,10 @@ restconf_stream_cb(int s,
int pretty = 0; /* XXX should be via arg */ int pretty = 0; /* XXX should be via arg */
int ret; int ret;
clixon_debug(CLIXON_DBG_STREAM, ""); clixon_debug(CLIXON_DBG_STREAM|CLIXON_DBG_DETAIL, "");
if (clixon_msg_rcv11(s, NULL, 0, &cbmsg, &eof) < 0) if (clixon_msg_rcv11(s, NULL, 0, &cbmsg, &eof) < 0)
goto done; goto done;
clixon_debug(CLIXON_DBG_STREAM, "%s", cbuf_get(cbmsg)); // Also MSG
/* handle close from remote end: this will exit the client */ /* handle close from remote end: this will exit the client */
if (eof){ if (eof){
clixon_debug(CLIXON_DBG_STREAM, "eof"); clixon_debug(CLIXON_DBG_STREAM, "eof");
@ -232,7 +235,7 @@ restconf_stream_cb(int s,
ok: ok:
retval = 0; retval = 0;
done: done:
clixon_debug(CLIXON_DBG_STREAM, "retval: %d", retval); clixon_debug(CLIXON_DBG_STREAM|CLIXON_DBG_DETAIL, "retval: %d", retval);
if (xtop != NULL) if (xtop != NULL)
xml_free(xtop); xml_free(xtop);
if (cbmsg) if (cbmsg)
@ -242,10 +245,6 @@ restconf_stream_cb(int s,
return retval; return retval;
} }
/* restconf */
#include "restconf_lib.h"
#include "restconf_stream.h"
/*! Listen sock callback (from proxy?) /*! Listen sock callback (from proxy?)
* *
* @param[in] s Socket * @param[in] s Socket
@ -265,33 +264,45 @@ stream_checkuplink(int s,
return 0; return 0;
} }
int /*! Timeout of notification stream, check fcgi socket
stream_timeout(int s, */
static int
fcgi_stream_timeout(int s,
void *arg) void *arg)
{ {
struct timeval t; struct timeval t;
struct timeval t1;
FCGX_Request *r = (FCGX_Request *)arg; FCGX_Request *r = (FCGX_Request *)arg;
clixon_debug(CLIXON_DBG_STREAM, ""); clixon_debug(CLIXON_DBG_STREAM|CLIXON_DBG_DETAIL, "");
if (FCGX_GetError(r->out) != 0){ /* break loop */ if (FCGX_GetError(r->out) != 0){ /* break loop */
clixon_debug(CLIXON_DBG_STREAM, "FCGX_GetError upstream"); clixon_debug(CLIXON_DBG_STREAM, "FCGX_GetError upstream");
clixon_exit_set(1); clixon_exit_set(1);
} }
else{ else{
gettimeofday(&t, NULL); gettimeofday(&t, NULL);
t1.tv_sec = 1; t1.tv_usec = 0; t.tv_sec++;
timeradd(&t, &t1, &t); clixon_event_reg_timeout(t, fcgi_stream_timeout, arg, "Stream timeout");
clixon_event_reg_timeout(t, stream_timeout, arg, "Stream timeout");
} }
return 0; return 0;
} }
/*! Timeout of notification stream, limit lifetime, for debug
*/
static int
fcgi_stream_timeout2(int s,
void *arg)
{
clixon_debug(CLIXON_DBG_STREAM, "Terminate stream");
clixon_exit_set(1); // XXX This is local eventloop see below, not global
return 0;
}
/*! Process a stream request /*! Process a stream request
* *
* @param[in] h Clixon handle * @param[in] h Clixon handle
* @param[in] req Generic Www handle (can be part of clixon handle) * @param[in] req Generic Www handle (can be part of clixon handle)
* @param[in] qvec Query parameters, ie the ?<id>=<val>&<id>=<val> stuff * @param[in] qvec Query parameters, ie the ?<id>=<val>&<id>=<val> stuff
* @param[in] timeout Stream timeout
* @param[out] finish Set to zero, if request should not be finnished by upper layer * @param[out] finish Set to zero, if request should not be finnished by upper layer
* @retval 0 OK * @retval 0 OK
* @retval -1 Error * @retval -1 Error
@ -300,6 +311,7 @@ int
api_stream(clixon_handle h, api_stream(clixon_handle h,
void *req, void *req,
cvec *qvec, cvec *qvec,
int timeout,
int *finish) int *finish)
{ {
int retval = -1; int retval = -1;
@ -403,9 +415,15 @@ api_stream(clixon_handle h,
req, req,
"stream socket") < 0) "stream socket") < 0)
goto done; goto done;
clixon_debug(CLIXON_DBG_STREAM, "before loop"); /* Timeout of notification stream, close after limited lifetime, for debug */
if (timeout){
struct timeval t;
gettimeofday(&t, NULL);
t.tv_sec += timeout;
clixon_event_reg_timeout(t, fcgi_stream_timeout2, req, "Stream timeout");
}
/* Poll upstream errors */ /* Poll upstream errors */
stream_timeout(0, req); fcgi_stream_timeout(0, req);
/* Start loop */ /* Start loop */
clixon_event_loop(h); clixon_event_loop(h);
clixon_debug(CLIXON_DBG_STREAM, "after loop"); clixon_debug(CLIXON_DBG_STREAM, "after loop");
@ -414,7 +432,8 @@ api_stream(clixon_handle h,
close(s); close(s);
clixon_event_unreg_fd(rfcgi->listen_sock, clixon_event_unreg_fd(rfcgi->listen_sock,
restconf_stream_cb); restconf_stream_cb);
clixon_event_unreg_timeout(stream_timeout, (void*)req); clixon_event_unreg_timeout(fcgi_stream_timeout, (void*)req);
clixon_event_unreg_timeout(fcgi_stream_timeout2, (void*)req);
clixon_exit_set(0); /* reset */ clixon_exit_set(0); /* reset */
#ifdef STREAM_FORK #ifdef STREAM_FORK
#if 0 /* Seems to be a global resource, but there is till some timing error here */ #if 0 /* Seems to be a global resource, but there is till some timing error here */

View file

@ -86,8 +86,100 @@
#include "restconf_native.h" /* Restconf-openssl mode specific headers*/ #include "restconf_native.h" /* Restconf-openssl mode specific headers*/
#include "restconf_stream.h" #include "restconf_stream.h"
#ifdef HAVE_LIBNGHTTP2 /* Ends at end-of-file */ // XXX: copy from restconf_native.c
#include "restconf_nghttp2.h" /* Restconf-openssl mode specific headers*/ static int
native_buf_write_xxx(clixon_handle h,
char *buf,
size_t buflen,
restconf_conn *rc,
const char *callfn)
{
int retval = -1;
ssize_t len;
ssize_t totlen = 0;
int er;
SSL *ssl;
if (rc == NULL){
clixon_err(OE_RESTCONF, EINVAL, "rc is NULL");
goto done;
}
ssl = rc->rc_ssl;
/* Two problems with debugging buffers that this fixes:
* 1. they are not "strings" in the sense they are not NULL-terminated
* 2. they are often very long
*/
if (clixon_debug_get()) {
char *dbgstr = NULL;
size_t sz;
sz = buflen>256?256:buflen; /* Truncate to 256 */
if ((dbgstr = malloc(sz+1)) == NULL){
clixon_err(OE_UNIX, errno, "malloc");
goto done;
}
memcpy(dbgstr, buf, sz);
dbgstr[sz] = '\0';
clixon_debug(CLIXON_DBG_RESTCONF, "%s buflen:%zu buf:\n%s", callfn, buflen, dbgstr);
free(dbgstr);
}
while (totlen < buflen){
if (ssl){
if ((len = SSL_write(ssl, buf+totlen, buflen-totlen)) <= 0){
er = errno;
switch (SSL_get_error(ssl, len)){
case SSL_ERROR_SYSCALL: /* 5 */
if (er == ECONNRESET || /* Connection reset by peer */
er == EPIPE) { /* Reading end of socket is closed */
goto closed; /* Close socket and ssl */
}
else if (er == EAGAIN){
clixon_debug(CLIXON_DBG_RESTCONF, "write EAGAIN");
usleep(10000);
continue;
}
else{
clixon_err(OE_RESTCONF, er, "SSL_write %d", er);
goto done;
}
break;
default:
clixon_err(OE_SSL, 0, "SSL_write");
goto done;
break;
}
goto done;
}
}
else{
if ((len = write(rc->rc_s, buf+totlen, buflen-totlen)) < 0){
switch (errno){
case EAGAIN: /* Operation would block */
clixon_debug(CLIXON_DBG_RESTCONF, "write EAGAIN");
usleep(10000);
continue;
break;
// case EBADF: // XXX if this happens there is some larger error
case ECONNRESET: /* Connection reset by peer */
case EPIPE: /* Broken pipe */
goto closed; /* Close socket and ssl */
break;
default:
clixon_err(OE_UNIX, errno, "write %d", errno);
goto done;
break;
}
}
}
totlen += len;
} /* while */
retval = 1;
done:
clixon_debug(CLIXON_DBG_RESTCONF, "retval:%d", retval);
return retval;
closed:
retval = 0;
goto done;
}
/*! Callback when stream notifications arrive from backend /*! Callback when stream notifications arrive from backend
* *
@ -102,25 +194,28 @@ restconf_native_stream_cb(int s,
void *arg) void *arg)
{ {
int retval = -1; int retval = -1;
restconf_stream_data *sd = (restconf_stream_data *)arg;
int eof; int eof;
cxobj *xtop = NULL; /* top xml */ cxobj *xtop = NULL; /* top xml */
cxobj *xn; /* notification xml */ cxobj *xn; /* notification xml */
cbuf *cbx = NULL;
cbuf *cb = NULL; cbuf *cb = NULL;
cbuf *cbmsg = NULL; cbuf *cbmsg = NULL;
int pretty = 0; /* XXX should be via arg */ int pretty = 0;
int ret; int ret;
restconf_conn *rc = sd->sd_conn;
clixon_handle h = rc->rc_h;
clixon_debug(CLIXON_DBG_STREAM, ""); clixon_debug(CLIXON_DBG_STREAM|CLIXON_DBG_DETAIL, "");
pretty = restconf_pretty_get(h);
if (clixon_msg_rcv11(s, NULL, 0, &cbmsg, &eof) < 0) if (clixon_msg_rcv11(s, NULL, 0, &cbmsg, &eof) < 0)
goto done; goto done;
clixon_debug(CLIXON_DBG_STREAM, "%s", cbuf_get(cbmsg)); clixon_debug(CLIXON_DBG_STREAM, "%s", cbuf_get(cbmsg));
/* handle close from remote end: this will exit the client */ /* handle close from remote end: this will exit the client */
if (eof){ if (eof){
clixon_debug(CLIXON_DBG_STREAM, "eof"); clixon_debug(CLIXON_DBG_STREAM, "eof");
clixon_err(OE_PROTO, ESHUTDOWN, "Socket unexpected close"); restconf_close_ssl_socket(rc, __FUNCTION__, 0);
errno = ESHUTDOWN; goto ok;
clixon_exit_set(1);
goto done;
} }
if ((ret = clixon_xml_parse_string(cbuf_get(cbmsg), YB_NONE, NULL, &xtop, NULL)) < 0) if ((ret = clixon_xml_parse_string(cbuf_get(cbmsg), YB_NONE, NULL, &xtop, NULL)) < 0)
goto done; goto done;
@ -133,20 +228,220 @@ restconf_native_stream_cb(int s,
clixon_err(OE_PLUGIN, errno, "cbuf_new"); clixon_err(OE_PLUGIN, errno, "cbuf_new");
goto done; goto done;
} }
if ((cbx = cbuf_new()) == NULL){
clixon_err(OE_PLUGIN, errno, "cbuf_new");
goto done;
}
if ((xn = xpath_first(xtop, NULL, "notification")) == NULL) if ((xn = xpath_first(xtop, NULL, "notification")) == NULL)
goto ok; goto ok;
#if 0 // Cant get CHUNKED to work
{
size_t len;
cprintf(cbx, "data: ");
if (clixon_xml2cbuf(cbx, xn, 0, pretty, NULL, -1, 0) < 0)
goto done;
len = cbuf_len(cbx);
len +=2;
cprintf(cb, "%x", (int16_t)len&0xffff);
cprintf(cb, "\r\n");
cprintf(cb, "%s", cbuf_get(cbx));
cprintf(cb, "\r\n");
cprintf(cb, "\r\n");
cprintf(cb, "0\r\n");
cprintf(cb, "\r\n");
// XXX This terminates stream, but want it to continue / hang
}
#else
cprintf(cb, "data: ");
if (clixon_xml2cbuf(cb, xn, 0, pretty, NULL, -1, 0) < 0) if (clixon_xml2cbuf(cb, xn, 0, pretty, NULL, -1, 0) < 0)
goto done; goto done;
cprintf(cb, "\r\n");
cprintf(cb, "\r\n");
#endif
if ((ret = native_buf_write_xxx(h, cbuf_get(cb), cbuf_len(cb), rc, "native stream")) < 0)
goto done;
ok: ok:
retval = 0; retval = 0;
done: done:
clixon_debug(CLIXON_DBG_STREAM, "retval: %d", retval); clixon_debug(CLIXON_DBG_STREAM|CLIXON_DBG_DETAIL, "retval: %d", retval);
if (xtop != NULL) if (xtop != NULL)
xml_free(xtop); xml_free(xtop);
if (cbmsg) if (cbmsg)
cbuf_free(cbmsg); cbuf_free(cbmsg);
if (cb) if (cb)
cbuf_free(cb); cbuf_free(cb);
if (cbx)
cbuf_free(cbx);
return retval;
}
/*! Timeout of notification stream, limit lifetime, for debug
*/
static int
native_stream_timeout(int s,
void *arg)
{
restconf_conn *rc = (restconf_conn *)arg;
clixon_debug(CLIXON_DBG_STREAM, "");
return restconf_close_ssl_socket(rc, __FUNCTION__, 0);
}
/*! Close notification stream
*
* Only stream aspects, to close full socket, call eg restconf_close_ssl_socket
*/
int
stream_close(clixon_handle h,
void *req)
{
restconf_conn *rc = (restconf_conn *)req;
clicon_rpc_close_session(h);
clixon_event_unreg_fd(rc->rc_event_stream, restconf_native_stream_cb);
clixon_event_unreg_timeout(native_stream_timeout, req);
close(rc->rc_event_stream);
rc->rc_event_stream = 0;
return 0;
}
/*! Process a stream request, native variant
*
* @param[in] h Clixon handle
* @param[in] req Generic Www handle (can be part of clixon handle)
* @param[in] qvec Query parameters, ie the ?<id>=<val>&<id>=<val> stuff
* @param[in] timeout Stream timeout
* @param[out] finish Not used in native?
* @retval 0 OK
* @retval -1 Error
* @see api_stream fcgi implementation
* @note According to RFC8040 Sec 6 accept-stream is text/event-stream, but stream data
* is XML according to RFC5277. But what is error return? assume XML here
*/
static int
api_native_stream(clixon_handle h,
void *req,
cvec *qvec,
int timeout,
int *finish)
{
int retval = -1;
restconf_stream_data *sd = (restconf_stream_data *)req;
restconf_conn *rc;
char *path = NULL;
char *request_method = NULL; /* GET,.. */
char *streampath;
int pretty;
char **pvec = NULL;
int pn;
cvec *pcvec = NULL; /* for rest api */
cxobj *xerr = NULL;
char *media_str = NULL;
char *stream_name;
restconf_media media_reply = YANG_DATA_XML;
int ret;
int backend_socket = -1;
clixon_debug(CLIXON_DBG_STREAM, "");
if (req == NULL){
clixon_err(OE_RESTCONF, EINVAL, "req is NULL");
goto done;
}
rc = sd->sd_conn;
streampath = clicon_option_str(h, "CLICON_STREAM_PATH");
if ((path = restconf_uripath(h)) == NULL)
goto done;
clixon_debug(CLIXON_DBG_STREAM, "path:%s", path);
request_method = restconf_param_get(h, "REQUEST_METHOD");
clixon_debug(CLIXON_DBG_STREAM, "method:%s", request_method);
pretty = restconf_pretty_get(h);
clixon_debug(CLIXON_DBG_STREAM, "pretty:%d", pretty);
/* Get media for output (proactive negotiation) RFC7231 by using
* Accept:. This is for methods that have output, such as GET,
* operation POST, etc
* If accept is * default is yang-json
*/
media_str = restconf_param_get(h, "HTTP_ACCEPT");
clixon_debug(CLIXON_DBG_STREAM, "accept(media):%s", media_str);
if (media_str == NULL){
if (restconf_not_acceptable(h, sd, pretty, media_reply) < 0)
goto done;
goto ok;
}
/* Accept only text_event-stream or */
if (strcmp(media_str, "*/*") != 0 &&
strcmp(media_str, "text/event-stream") != 0){
if (restconf_not_acceptable(h, req, pretty, media_reply) < 0)
goto done;
goto ok;
}
if ((pvec = clicon_strsep(path, "/", &pn)) == NULL)
goto done;
if (strlen(pvec[0]) != 0){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_reply, 0) < 0)
goto done;
goto ok;
}
else if (strcmp(pvec[1], streampath)){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_reply, 0) < 0)
goto done;
goto ok;
}
else if ((stream_name = pvec[2]) == NULL ||
strlen(stream_name) == 0){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_reply, 0) < 0)
goto done;
goto ok;
}
clixon_debug(CLIXON_DBG_STREAM, "stream-name: %s", stream_name);
if (uri_str2cvec(path, '/', '=', 1, &pcvec) < 0) /* rest url eg /album=ricky/foo */
goto done;
/* If present, check credentials. See "plugin_credentials" in plugin
* See RFC 8040 section 2.5
*/
if ((ret = restconf_authentication_cb(h, req, pretty, media_reply)) < 0)
goto done;
if (ret == 0)
goto ok;
clixon_debug(CLIXON_DBG_STREAM, "passed auth");
if (restconf_subscription(h, req, stream_name, qvec, pretty, media_reply, &backend_socket) < 0)
goto done;
if (backend_socket != -1){
// XXX Could add forking here eventurally
/* Listen to backend socket */
if (clixon_event_reg_fd(backend_socket,
restconf_native_stream_cb,
sd,
"stream socket") < 0)
goto done;
rc->rc_event_stream = backend_socket;
/* Timeout of notification stream, close after limited lifetime, for debug */
if (timeout){
struct timeval t;
gettimeofday(&t, NULL);
t.tv_sec += timeout;
clixon_event_reg_timeout(t, native_stream_timeout, rc, "Stream timeout");
}
}
ok:
retval = 0;
done:
clixon_debug(CLIXON_DBG_STREAM, "retval:%d", retval);
if (xerr)
xml_free(xerr);
if (path)
free(path);
if (pvec)
free(pvec);
if (pcvec)
cvec_free(pcvec);
return retval; return retval;
} }
@ -166,131 +461,8 @@ int
api_stream(clixon_handle h, api_stream(clixon_handle h,
void *req, void *req,
cvec *qvec, cvec *qvec,
int timeout,
int *finish) int *finish)
{ {
int retval = -1; return api_native_stream(h, req, qvec, timeout, finish);
char *path = NULL;
char *request_method = NULL; /* GET,.. */
char *streampath;
int pretty;
char **pvec = NULL;
int pn;
cvec *pcvec = NULL; /* for rest api */
cxobj *xerr = NULL;
char *media_str = NULL;
char *stream_name;
restconf_media media_stream = TEXT_EVENT_STREAM; /* text/event-stream, see RFC8040 sec 6 */
restconf_media media_out = YANG_DATA_XML;
int ret;
int backend_socket = -1;
fprintf(stderr, "%s\n", __FUNCTION__);
clixon_debug(CLIXON_DBG_STREAM, "");
if (req == NULL){
errno = EINVAL;
goto done;
} }
streampath = clicon_option_str(h, "CLICON_STREAM_PATH");
if ((path = restconf_uripath(h)) == NULL)
goto done;
clixon_debug(CLIXON_DBG_STREAM, "path:%s", path);
request_method = restconf_param_get(h, "REQUEST_METHOD");
clixon_debug(CLIXON_DBG_STREAM, "method:%s", request_method);
pretty = restconf_pretty_get(h);
clixon_debug(CLIXON_DBG_STREAM, "pretty:%d", pretty);
/* Get media for output (proactive negotiation) RFC7231 by using
* Accept:. This is for methods that have output, such as GET,
* operation POST, etc
* If accept is * default is yang-json
*/
media_str = restconf_param_get(h, "HTTP_ACCEPT");
clixon_debug(CLIXON_DBG_STREAM, "accept(media):%s", media_str);
if (media_str == NULL){
if (restconf_not_acceptable(h, req, pretty, media_out) < 0)
goto done;
goto ok;
}
media_stream = restconf_media_str2int(media_str);
clixon_debug(CLIXON_DBG_STREAM, "media_out:%s", restconf_media_int2str(media_stream));
switch ((int)media_stream){
case -1:
if (strcmp(media_str, "*/*") == 0){ /* catch-all */
media_out = TEXT_EVENT_STREAM;
}
else{
if (restconf_not_acceptable(h, req, pretty, media_out) < 0)
goto done;
goto ok;
}
break;
case TEXT_EVENT_STREAM:
break;
default:
if (restconf_not_acceptable(h, req, pretty, media_out) < 0)
goto done;
goto ok;
break;
}
if ((pvec = clicon_strsep(path, "/", &pn)) == NULL)
goto done;
if (strlen(pvec[0]) != 0){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_out, 0) < 0)
goto done;
goto ok;
}
else if (strcmp(pvec[1], streampath)){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_out, 0) < 0)
goto done;
goto ok;
}
else if ((stream_name = pvec[2]) == NULL ||
strlen(stream_name) == 0){
if (netconf_invalid_value_xml(&xerr, "protocol", "Invalid path, /stream/<name> expected") < 0)
goto done;
if (api_return_err0(h, req, xerr, pretty, media_out, 0) < 0)
goto done;
goto ok;
}
clixon_debug(CLIXON_DBG_STREAM, "stream-name: %s", stream_name);
if (uri_str2cvec(path, '/', '=', 1, &pcvec) < 0) /* rest url eg /album=ricky/foo */
goto done;
/* If present, check credentials. See "plugin_credentials" in plugin
* See RFC 8040 section 2.5
*/
if ((ret = restconf_authentication_cb(h, req, pretty, media_out)) < 0)
goto done;
if (ret == 0)
goto ok;
clixon_debug(CLIXON_DBG_STREAM, "passed auth");
if (restconf_subscription(h, req, stream_name, qvec, pretty, media_out, &backend_socket) < 0)
goto done;
if (backend_socket != -1){
// XXX Could add forking here eventurally
/* Listen to backend socket */
if (clixon_event_reg_fd(backend_socket,
restconf_native_stream_cb,
req,
"stream socket") < 0)
goto done;
}
ok:
retval = 0;
done:
fprintf(stderr, "%s retval %d\n", __FUNCTION__, retval);
clixon_debug(CLIXON_DBG_STREAM, "retval:%d", retval);
if (xerr)
xml_free(xerr);
if (path)
free(path);
if (pvec)
free(pvec);
if (pcvec)
cvec_free(pcvec);
return retval;
}
#endif /* HAVE_LIBNGHTTP2 */

View file

@ -70,7 +70,7 @@
#include <clixon/clixon_backend.h> #include <clixon/clixon_backend.h>
/* Command line options to be passed to getopt(3) */ /* Command line options to be passed to getopt(3) */
#define BACKEND_EXAMPLE_OPTS "a:m:M:nrsS:x:iuUtV:" #define BACKEND_EXAMPLE_OPTS "a:m:M:n:rsS:x:iuUtV:"
/* Enabling this improves performance in tests, but there may trigger the "double XPath" /* Enabling this improves performance in tests, but there may trigger the "double XPath"
* problem. * problem.
@ -97,9 +97,10 @@ static char *_mount_namespace = NULL;
/*! Notification stream /*! Notification stream
* *
* Enable notification streams for netconf/restconf * Enable notification streams for netconf/restconf
* Start backend with -- -n * Start backend with -- -n <sec>
* where <sec> is period of stream
*/ */
static int _notification_stream = 0; static int _notification_stream_s = 0;
/*! Variable to control if reset code is run. /*! Variable to control if reset code is run.
* *
@ -182,7 +183,7 @@ static char *_validate_fail_xpath = NULL;
static int _validate_fail_toggle = 0; /* fail at validate and commit */ static int _validate_fail_toggle = 0; /* fail at validate and commit */
/* forward */ /* forward */
static int example_stream_timer_setup(clixon_handle h); static int example_stream_timer_setup(clixon_handle h, int sec);
int int
main_begin(clixon_handle h, main_begin(clixon_handle h,
@ -310,7 +311,7 @@ example_stream_timer(int fd,
/* XXX Change to actual netconf notifications and namespace */ /* XXX Change to actual netconf notifications and namespace */
if (stream_notify(h, "EXAMPLE", "<event xmlns=\"urn:example:clixon\"><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0) if (stream_notify(h, "EXAMPLE", "<event xmlns=\"urn:example:clixon\"><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0)
goto done; goto done;
if (example_stream_timer_setup(h) < 0) if (example_stream_timer_setup(h, _notification_stream_s) < 0)
goto done; goto done;
retval = 0; retval = 0;
done: done:
@ -318,15 +319,18 @@ example_stream_timer(int fd,
} }
/*! Set up example stream notification timer /*! Set up example stream notification timer
*
* @param[in] h Clixon handle
* @param[in] s Timeout period in seconds
*/ */
static int static int
example_stream_timer_setup(clixon_handle h) example_stream_timer_setup(clixon_handle h,
int sec)
{ {
struct timeval t, t1; struct timeval t;
gettimeofday(&t, NULL); gettimeofday(&t, NULL);
t1.tv_sec = 5; t1.tv_usec = 0; t.tv_sec += sec;
timeradd(&t, &t1, &t);
return clixon_event_reg_timeout(t, example_stream_timer, h, "example stream timer"); return clixon_event_reg_timeout(t, example_stream_timer, h, "example stream timer");
} }
@ -1456,7 +1460,7 @@ clixon_plugin_init(clixon_handle h)
_mount_namespace = optarg; _mount_namespace = optarg;
break; break;
case 'n': case 'n':
_notification_stream = 1; _notification_stream_s = atoi(optarg);
break; break;
case 'r': case 'r':
_reset = 1; _reset = 1;
@ -1502,7 +1506,7 @@ clixon_plugin_init(clixon_handle h)
} }
} }
if (_notification_stream){ if (_notification_stream_s){
/* Example stream initialization: /* Example stream initialization:
* 1) Register EXAMPLE stream * 1) Register EXAMPLE stream
* 2) setup timer for notifications, so something happens on stream * 2) setup timer for notifications, so something happens on stream
@ -1518,7 +1522,7 @@ clixon_plugin_init(clixon_handle h)
if (clicon_option_exists(h, "CLICON_STREAM_PUB") && if (clicon_option_exists(h, "CLICON_STREAM_PUB") &&
stream_publish(h, "EXAMPLE") < 0) stream_publish(h, "EXAMPLE") < 0)
goto done; goto done;
if (example_stream_timer_setup(h) < 0) if (example_stream_timer_setup(h, _notification_stream_s) < 0)
goto done; goto done;
} }
/* Register callback for routing rpc calls /* Register callback for routing rpc calls

View file

@ -1097,7 +1097,7 @@ function expecteof_file(){
fi fi
} }
# test script with timeout, used for notificatin streams # test script for NETCONF with timeout, used for notification streams
# - (not-evaluated) expression # - (not-evaluated) expression
# - expected command return value (0 if OK) (NOOP for now) # - expected command return value (0 if OK) (NOOP for now)
# - stdin input1 This is NOT encoded, eg preamble/hello # - stdin input1 This is NOT encoded, eg preamble/hello

View file

@ -98,8 +98,8 @@ if [ $BE -ne 0 ]; then
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
err err
fi fi
new "start backend -s init -f $cfg -- -n" new "start backend -s init -f $cfg -- -n 5"
start_backend -s init -f $cfg -- -n # create example notification stream start_backend -s init -f $cfg -- -n 5 # create example notification stream
fi fi
new "wait backend" new "wait backend"

View file

@ -21,19 +21,10 @@
# 2d) start sub 8s - replay from start -8s to stop +4s - expect 3 notifications # 2d) start sub 8s - replay from start -8s to stop +4s - expect 3 notifications
# 2e) start sub 8s - replay from -90s w retention 60s - expect 10 notifications # 2e) start sub 8s - replay from -90s w retention 60s - expect 10 notifications
# Note the sleeps are mainly for valgrind usage # Note the sleeps are mainly for valgrind usage
#
# XXX There is some state/timing issue introduced in 5.7, see test-pause and parallell
# Magic line must be first in script (see README.md) # Magic line must be first in script (see README.md)
s="$_" ; . ./lib.sh || if [ "$s" = $0 ]; then exit 0; else return 0; fi s="$_" ; . ./lib.sh || if [ "$s" = $0 ]; then exit 0; else return 0; fi
# Skip it other than fcgi and http
#if [ "${WITH_RESTCONF}" != "fcgi" -o "$RCPROTO" = https ]; then
if false; then
rm -rf $dir
if [ "$s" = $0 ]; then exit 0; else return 0; fi # skip
fi
# Dont run this test with valgrind # Dont run this test with valgrind
if [ $valgrindtest -ne 0 ]; then if [ $valgrindtest -ne 0 ]; then
echo "...skipped " echo "...skipped "
@ -41,15 +32,18 @@ if [ $valgrindtest -ne 0 ]; then
return 0 # skip return 0 # skip
fi fi
# Degraded does not work at all
#rm -rf $dir
#if [ "$s" = $0 ]; then exit 0; else return 0; fi # skip
: ${SLEEP2:=1} : ${SLEEP2:=1}
SLEEP5=.5 SLEEP5=.5
APPNAME=example APPNAME=example
: ${clixon_util_stream:=clixon_util_stream} : ${clixon_util_stream:=clixon_util_stream}
: ${TIMEOUT:=10}
: ${PERIOD:=2}
# Lower and upper bound on number of intervals
LBOUND=$((${TIMEOUT}/${PERIOD} - 1))
UBOUND=$((${TIMEOUT}/${PERIOD} + 1))
# Ensure UTC # Ensure UTC
DATE=$(date -u +"%Y-%m-%d") DATE=$(date -u +"%Y-%m-%d")
@ -128,13 +122,55 @@ module example {
} }
EOF EOF
# Temporary pause between tests to make state timeout # Run stream test
# XXX This should not really be here, there is some state/timing issue introduced in 5.7 # Args:
function test-pause() # 1: extra curlopt
function runtest()
{ {
sleep 5 extra=$1
# -m 1 means 1 sec timeout
curl -Ssik --http1.1 -X GET -m 1 -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" "http://localhost/streams/EXAMPLE" 2>&1 > /dev/null expect="data: <notification xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\"><eventTime>${DATE}T[0-9:.]*Z</eventTime><event xmlns=\"urn:example:clixon\"><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>"
new "2a) start $extra timeout:${TIMEOUT}s - expect ${LBOUND}-${UBOUND} notifications"
ret=$(curl $CURLOPTS $extra -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/EXAMPLE)
match=$(echo "$ret" | grep -Eo "$expect")
if [ -z "$match" ]; then
err "$expect" "$ret"
fi
nr=$(echo "$ret" | grep -c "data:")
if [ $nr -lt ${LBOUND} -o $nr -gt ${UBOUND} ]; then
err "[${LBOUND},$[UBOUND]]" "$nr"
fi
LB=$((5/${PERIOD} - 1))
UB=$((5/${PERIOD} + 1))
time1=$(date -u -d"5 second now" +'%Y-%m-%dT%H:%M:%SZ')
new "2b) start $extra timeout:${TIMEOUT} stop after 5s - expect ${LB}-${UB} notifications"
ret=$(curl $CURLOPTS $extra -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/EXAMPLE?stop-time=${time1})
match=$(echo "$ret" | grep -Eo "$expect")
if [ -z "$match" ]; then
err "$expect" "$ret"
fi
nr=$(echo "$ret" | grep -c "data:")
if [ $nr -lt ${LB} -o $nr -gt ${UB} ]; then
err "[${LB},$[UB]]" "$nr"
fi
if false; then # Does not work yet
time1=$(date -u -d"-5 second now" +'%Y-%m-%dT%H:%M:%SZ')
LB=$(((5+${TIMEOUT})/${PERIOD} - 1))
UB=$(((5+${TIMEOUT})/${PERIOD} + 1))
new "2c) start sub 8s - replay from start -8s - expect 3-4 notifications"
echo "curl $CURLOPTS $extra -X GET -H \"Accept: text/event-stream\" -H \"Cache-Control: no-cache\" -H \"Connection: keep-alive\" $RCPROTO://localhost/streams/EXAMPLE?start-time=${time1}"
ret=$(curl $CURLOPTS $extra -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/EXAMPLE?start-time=${time1})
fi
unset LB
unset UB
unset time1
} }
new "test params: -f $cfg" new "test params: -f $cfg"
@ -145,8 +181,9 @@ if [ $BE -ne 0 ]; then
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
err err
fi fi
new "start backend -s init -f $cfg -- -n" new "start backend -s init -f $cfg -- -n ${PERIOD}"
start_backend -s init -f $cfg -- -n # create example notification stream # create example notification stream with periodic timeout ${PERIOD} seconds
start_backend -s init -f $cfg -- -n ${PERIOD}
fi fi
new "wait backend" new "wait backend"
@ -156,15 +193,15 @@ if [ $RC -ne 0 ]; then
new "kill old restconf daemon" new "kill old restconf daemon"
stop_restconf_pre stop_restconf_pre
new "start restconf daemon" new "start restconf daemon -f $cfg -t ${TIMEOUT}"
start_restconf -f $cfg start_restconf -f $cfg -t ${TIMEOUT}
fi fi
new "wait restconf" new "wait restconf"
wait_restconf wait_restconf
new "netconf event stream discovery RFC8040 Sec 6.2" new "netconf event stream discovery RFC8040 Sec 6.2"
expecteof_netconf "$clixon_netconf -D $DBG -qf $cfg" 0 "$DEFAULTHELLO" "<rpc $DEFAULTNS><get><filter type=\"xpath\" select=\"r:restconf-state/r:streams\" xmlns:r=\"urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring\"/></get></rpc>" "" "<rpc-reply $DEFAULTNS><data><restconf-state xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring\"><streams><stream><name>EXAMPLE</name><description>Example event stream</description><replay-support>true</replay-support><access><encoding>xml</encoding><location>https://localhost/streams/EXAMPLE</location></access></stream></streams></restconf-state></data></rpc-reply>" expecteof_netconf "$clixon_netconf -D $DBG -qf $cfg" 0 "$DEFAULTHELLO" "<rpc $DEFAULTNS><get><filter type=\"xpath\" select=\"r:restconf-state/r:streams\" xmlns:r=\"urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring\"/></get></rpc>" "" "<rpc-reply $DEFAULTNS><data><restconf-state xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring\"><streams><stream><name>EXAMPLE</name><description>Example event stream</description><replay-support>true</replay-support><access><encoding>xml</encoding><location>$RCPROTO://localhost/streams/EXAMPLE</location></access></stream></streams></restconf-state></data></rpc-reply>"
# 1.2 Netconf stream subscription # 1.2 Netconf stream subscription
@ -172,40 +209,30 @@ expecteof_netconf "$clixon_netconf -D $DBG -qf $cfg" 0 "$DEFAULTHELLO" "<rpc $DE
new "2. Restconf RFC8040 stream testing" new "2. Restconf RFC8040 stream testing"
# 2.1 Stream discovery # 2.1 Stream discovery
new "restconf event stream discovery RFC8040 Sec 6.2" new "restconf event stream discovery RFC8040 Sec 6.2"
expectpart "$(curl $CURLOPTS -X GET $RCPROTO://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams)" 0 "HTTP/$HVER 200" '{"ietf-restconf-monitoring:streams":{"stream":\[{"name":"EXAMPLE","description":"Example event stream","replay-support":true,"access":\[{"encoding":"xml","location":"https://localhost/streams/EXAMPLE"}\]}\]}' expectpart "$(curl $CURLOPTS -X GET $RCPROTO://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams)" 0 "HTTP/$HVER 200" "{\"ietf-restconf-monitoring:streams\":{\"stream\":\[{\"name\":\"EXAMPLE\",\"description\":\"Example event stream\",\"replay-support\":true,\"access\":\[{\"encoding\":\"xml\",\"location\":\"$RCPROTO://localhost/streams/EXAMPLE\"}\]}\]}"
sleep $SLEEP2
new "restconf subscribe RFC8040 Sec 6.3, get location" new "restconf subscribe RFC8040 Sec 6.3, get location"
expectpart "$(curl $CURLOPTS -X GET $RCPROTO://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams/stream=EXAMPLE/access=xml/location)" 0 "HTTP/$HVER 200" '{"ietf-restconf-monitoring:location":"https://localhost/streams/EXAMPLE"}' expectpart "$(curl $CURLOPTS -X GET $RCPROTO://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams/stream=EXAMPLE/access=xml/location)" 0 "HTTP/$HVER 200" "{\"ietf-restconf-monitoring:location\":\"$RCPROTO://localhost/streams/EXAMPLE\"}"
sleep $SLEEP2
# Restconf stream subscription RFC8040 Sec 6.3 # Restconf stream subscription RFC8040 Sec 6.3
# Start Subscription w error # Start Subscription w error
new "restconf monitor event nonexist stream" new "Try nonexist stream"
# Note cant use -S or -i here, the former dont know, latter because expectwait cant take expectpart "$(curl $CURLOPTS -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/NOTEXIST)" 0 "HTTP/$HVER 400" "<errors xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf\"><error><error-type>application</error-type><error-tag>invalid-value</error-tag><error-severity>error</error-severity><error-message>No such stream</error-message></error></errors>"
# partial returns like expectpart can
expectpart "$(curl $CURLOPTS -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/NOTEXIST)" 0 "HTTP/$HVER 400" '<errors xmlns=\"urn:ietf:params:xml:ns:yang:ietf-restconf\"><error><error-type>application</error-type><error-tag>invalid-value</error-tag><error-severity>error</error-severity><error-message>No such stream</error-message></error></errors>'
# 2a) start subscription 8s - expect 1-2 notifications if ${HAVE_HTTP1}; then
runtest --http1.1
new "2a) start subscriptions 8s - expect 1-2 notifications"
echo "curl $CURLOPTS --no-buffer -X GET -H \"Accept: text/event-stream\" -H \"Cache-Control: no-cache\" -H \"Connection: keep-alive\" $RCPROTO://localhost/streams/EXAMPLE"
#curl $CURLOPTS --no-buffer -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" $RCPROTO://localhost/streams/EXAMPLE
exit
ret=$($clixon_util_stream -u $RCPROTO://localhost/streams/EXAMPLE -t 8)
expect="data: <notification xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\"><eventTime>${DATE}T[0-9:.]*Z</eventTime><event xmlns=\"urn:example:clixon\"><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>"
match=$(echo "$ret" | grep -Eo "$expect")
if [ -z "$match" ]; then
err "$expect" "$ret"
fi fi
nr=$(echo "$ret" | grep -c "data:") if [ "${WITH_RESTCONF}" = "fcgi" ]; then
if [ $nr -lt 1 -o $nr -gt 2 ]; then runtest ""
err 2 "$nr"
fi fi
exit if [ "${WITH_RESTCONF}" = "native" ]; then
if false; then # XXX native + http/2 dont work yet
# if ${HAVE_LIBNGHTTP2}; then
runtest --http2
fi
fi
if false; then # NYI
test-pause test-pause
# 2b) start subscription 8s - stoptime after 5s - expect 1-2 notifications # 2b) start subscription 8s - stoptime after 5s - expect 1-2 notifications
@ -274,7 +301,6 @@ fi
test-pause test-pause
sleep 5 sleep 5
# Try parallell # Try parallell
# start background job # start background job
curl $CURLOPTS -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" "$RCPROTO://localhost/streams/EXAMPLE" & # > /dev/null & curl $CURLOPTS -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" "$RCPROTO://localhost/streams/EXAMPLE" & # > /dev/null &
@ -297,14 +323,11 @@ fi # XXX
kill $PID kill $PID
#--------------------------------------------------------------------
# NCHAN Need manual testing
echo "Nchan streams requires manual testing"
echo "Add <CLICON_STREAM_PUB>http://localhost/pub</CLICON_STREAM_PUB> to config"
echo "Eg: curl $CURLOPTS -H \"Accept: text/event-stream\" -s -X GET $RCPROTO://localhost/sub/EXAMPLE"
#----------------- #-----------------
sleep $SLEEP5 sleep $SLEEP5
fi # XXX
if [ $RC -ne 0 ]; then if [ $RC -ne 0 ]; then
new "Kill restconf daemon" new "Kill restconf daemon"
stop_restconf stop_restconf

View file

@ -1302,7 +1302,6 @@ module clixon-config {
units s; units s;
description "Retention for stream replay buffers in seconds, ie how much description "Retention for stream replay buffers in seconds, ie how much
data to store before dropping. 0 means no retention"; data to store before dropping. 0 means no retention";
} }
leaf CLICON_LOG_STRING_LIMIT { leaf CLICON_LOG_STRING_LIMIT {
type uint32; type uint32;