From af16760279467df2fe69b426da4458fbc5a17c7b Mon Sep 17 00:00:00 2001 From: Olof hagsand Date: Sat, 27 Oct 2018 11:12:01 +0200 Subject: [PATCH] * Stream replay support * RFC8040 Restconf replay support: start-time and stop-time query parameter support * This only applies to "native" restconf stream support, Nchan mode has different replay functionality * RFC5277 Netconf replay support using and * Replay support is only in-memory and not persistent. External time-series DB could be added. --- CHANGELOG.md | 9 +- apps/backend/backend_client.c | 30 ++++- apps/backend/backend_main.c | 3 + apps/cli/cli_main.c | 4 +- apps/netconf/netconf_main.c | 4 +- apps/restconf/restconf_main.c | 3 + example/example_backend.c | 2 +- lib/clixon/clixon_options.h | 5 + lib/clixon/clixon_queue.h | 53 ++++++-- lib/clixon/clixon_stream.h | 29 ++-- lib/src/clixon_options.c | 34 +++++ lib/src/clixon_stream.c | 240 ++++++++++++++++++++++++++++------ test/test_stream.sh | 4 + 13 files changed, 355 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 799fff1d..c0099111 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,11 @@ * Optional pub/sub support enabled by ./configure --enable-publish * Set publish URL base with: CLICON_STREAM_PUB (default http://localhost/pub) * Example: new stream "foo" will get pub URL: https://localhost/pub/foo +* Stream replay support + * RFC8040 Restconf replay support: start-time and stop-time query parameter support + * This only applies to "native" restconf stream support, Nchan mode has different replay functionality + * RFC5277 Netconf replay support using and + * Replay support is only in-memory and not persistent. External time-series DB could be added. ### API changes on existing features (you may need to change your code) * clixon-config YAML file has new revision: 2018-10-21. @@ -94,7 +99,9 @@ * Set dir /www-data with www-data as owner, see https://github.com/clicon/clixon/issues/37 ### Known issues -* Bug: Top-level Yang symbol cannot be called "config" in any imported yang file. +* netconf rpc input is not sanity checked for wrong symbols (just ignored). +* Yang sub-command order and cardinality not checked. +* Top-level Yang symbol cannot be called "config" in any imported yang file. * datastore uses "config" as reserved keyword for storing any XML whoich collides with code for detecting Yang sanity. * Namespace name relabeling is not supported. * Eg: if "des" is defined as prefix for an imported module, then a relabeling using xmlfns is not supported, such as: diff --git a/apps/backend/backend_client.c b/apps/backend/backend_client.c index 96a372e2..4f1692cd 100644 --- a/apps/backend/backend_client.c +++ b/apps/backend/backend_client.c @@ -815,12 +815,13 @@ from_client_delete_config(clicon_handle h, * @param[out] cbret Return xml value cligen buffer * @retval 0 OK * @retval -1 Error. Send error message back to client. + * @see RFC5277 2.1 * @example: * * RESULT # If not present, events in the default NETCONF stream will be sent. * - * # only for replay (NYI) - * # only for replay (NYI) + * + * * */ static int @@ -834,10 +835,28 @@ from_client_create_subscription(clicon_handle h, cxobj *x; /* Generic xml tree */ cxobj *xfilter; /* Filter xml tree */ char *ftype; + char *starttime = NULL; + char *stoptime = NULL; char *selector = NULL; + struct timeval start; + struct timeval stop; if ((x = xpath_first(xe, "//stream")) != NULL) stream = xml_find_value(x, "body"); + if ((x = xpath_first(xe, "//stopTime")) != NULL){ + stoptime = xml_find_value(x, "body"); + if (str2time(stoptime, &stop) < 0){ + clicon_err(OE_PROTO, errno, "str2time"); + goto done; + } + } + if ((x = xpath_first(xe, "//startTime")) != NULL){ + starttime = xml_find_value(x, "body"); + if (str2time(starttime, &start) < 0){ + clicon_err(OE_PROTO, errno, "str2time"); + goto done; + } + } if ((xfilter = xpath_first(xe, "//filter")) != NULL){ if ((ftype = xml_find_value(xfilter, "type")) != NULL){ /* Only accept xpath as filter type */ @@ -855,8 +874,13 @@ from_client_create_subscription(clicon_handle h, goto done; goto ok; } - if (stream_cb_add(h, stream, selector, ce_event_cb, (void*)ce) < 0) + if (stream_cb_add(h, stream, selector, stoptime?&stop:NULL, + ce_event_cb, (void*)ce) < 0) goto done; + /* Replay of this stream to specific subscription according to start and stop*/ + if (starttime) /* XXX No this must be done _after_ this RPC completes */ + if (stream_replay(h, stream, &start, stoptime?&stop:NULL) < 0) + goto done; cprintf(cbret, ""); ok: retval = 0; diff --git a/apps/backend/backend_main.c b/apps/backend/backend_main.c index 83004ad3..fcfa3570 100644 --- a/apps/backend/backend_main.c +++ b/apps/backend/backend_main.c @@ -89,6 +89,8 @@ backend_terminate(clicon_handle h) clicon_debug(1, "%s", __FUNCTION__); if ((yspec = clicon_dbspec_yang(h)) != NULL) yspec_free(yspec); + if ((yspec = clicon_config_yang(h)) != NULL) + yspec_free(yspec); if ((x = clicon_conf_xml(h)) != NULL) xml_free(x); stream_publish_exit(); @@ -590,6 +592,7 @@ main(int argc, usage(h, argv[0]); return -1; } + clicon_config_yang_set(h, yspecfg); /* External NACM file? */ nacm_mode = clicon_option_str(h, "CLICON_NACM_MODE"); if (nacm_mode && strcmp(nacm_mode, "external") == 0) diff --git a/apps/cli/cli_main.c b/apps/cli/cli_main.c index d5ff3efd..c651127e 100644 --- a/apps/cli/cli_main.c +++ b/apps/cli/cli_main.c @@ -85,6 +85,8 @@ cli_terminate(clicon_handle h) clicon_rpc_close_session(h); if ((yspec = clicon_dbspec_yang(h)) != NULL) yspec_free(yspec); + if ((yspec = clicon_config_yang(h)) != NULL) + yspec_free(yspec); if ((x = clicon_conf_xml(h)) != NULL) xml_free(x); cli_plugin_finish(h); @@ -338,7 +340,7 @@ main(int argc, char **argv) usage(h, argv[0]); return -1; } - + clicon_config_yang_set(h, yspecfg); /* Now rest of options */ opterr = 0; optind = 1; diff --git a/apps/netconf/netconf_main.c b/apps/netconf/netconf_main.c index 6765a7eb..6f3f30bc 100644 --- a/apps/netconf/netconf_main.c +++ b/apps/netconf/netconf_main.c @@ -280,6 +280,8 @@ netconf_terminate(clicon_handle h) clicon_rpc_close_session(h); if ((yspec = clicon_dbspec_yang(h)) != NULL) yspec_free(yspec); + if ((yspec = clicon_config_yang(h)) != NULL) + yspec_free(yspec); if ((x = clicon_conf_xml(h)) != NULL) xml_free(x); event_exit(); @@ -390,7 +392,7 @@ main(int argc, /* Find and read configfile */ if (clicon_options_main(h, yspecfg) < 0) return -1; - + clicon_config_yang_set(h, yspecfg); /* Now rest of options */ optind = 1; opterr = 0; diff --git a/apps/restconf/restconf_main.c b/apps/restconf/restconf_main.c index 2b3949de..b931c63e 100644 --- a/apps/restconf/restconf_main.c +++ b/apps/restconf/restconf_main.c @@ -454,6 +454,8 @@ restconf_terminate(clicon_handle h) clicon_rpc_close_session(h); if ((yspec = clicon_dbspec_yang(h)) != NULL) yspec_free(yspec); + if ((yspec = clicon_config_yang(h)) != NULL) + yspec_free(yspec); if ((x = clicon_conf_xml(h)) != NULL) xml_free(x); clicon_handle_exit(h); @@ -583,6 +585,7 @@ main(int argc, /* Find and read configfile */ if (clicon_options_main(h, yspecfg) < 0) goto done; + clicon_config_yang_set(h, yspecfg); stream_path = clicon_option_str(h, "CLICON_STREAM_PATH"); /* Now rest of options, some overwrite option file */ optind = 1; diff --git a/example/example_backend.c b/example/example_backend.c index 1eb6bedb..d4415e34 100644 --- a/example/example_backend.c +++ b/example/example_backend.c @@ -301,7 +301,7 @@ clixon_plugin_init(clicon_handle h) * 2) setup timer for notifications, so something happens on stream * 3) setup stream callbacks for notification to push channel */ - if (stream_register(h, "EXAMPLE", "Example event stream") < 0) + if (stream_register(h, "EXAMPLE", "Example event stream", 1) < 0) goto done; /* assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish */ diff --git a/lib/clixon/clixon_options.h b/lib/clixon/clixon_options.h index 94f6788a..f06660f0 100644 --- a/lib/clixon/clixon_options.h +++ b/lib/clixon/clixon_options.h @@ -165,6 +165,11 @@ int clicon_quiet_mode_set(clicon_handle h, int val); yang_spec * clicon_dbspec_yang(clicon_handle h); int clicon_dbspec_yang_set(clicon_handle h, struct yang_spec *ys); +#if 1 /* Temporary function until "Top-level Yang symbol cannot be called "config"" is fixed */ +yang_spec * clicon_config_yang(clicon_handle h); +int clicon_config_yang_set(clicon_handle h, struct yang_spec *ys); +#endif + cxobj *clicon_conf_xml(clicon_handle h); int clicon_conf_xml_set(clicon_handle h, cxobj *x); diff --git a/lib/clixon/clixon_queue.h b/lib/clixon/clixon_queue.h index 2997c655..70127756 100644 --- a/lib/clixon/clixon_queue.h +++ b/lib/clixon/clixon_queue.h @@ -37,17 +37,30 @@ #ifndef _CLIXON_QUEUE_H_ #define _CLIXON_QUEUE_H_ -/* - * Circular queue structure for use as first entry in a parent structure. +/*! Circular queue structure for use as first entry in a parent structure. + * Add qelem_t as first element in struct + * @code + * struct a{ + * qelem_t a_q; # this must be there + * int a_b; # other elements + * ... + * }; + * @endcode */ typedef struct _qelem_t { struct _qelem_t *q_next; struct _qelem_t *q_prev; } qelem_t; - /* - * Append element 'elem' to queue. - */ +/*! Append element 'elem' to queue. + * @param[in] elem Element to be added + * @param[in,out] pred Add element after this + * @code + * struct a *list; # existing list + * struct a *new = malloc(...); + * ADDQ(new, list); + * @endcode + */ #define ADDQ(elem, pred) { \ register qelem_t *Xe = (qelem_t *) (elem); \ register qelem_t *Xp = (qelem_t *) (pred); \ @@ -62,8 +75,14 @@ typedef struct _qelem_t { } \ } -/* - * Insert element 'elem' in queue after 'pred' +/*! Insert element 'elem' in queue after 'pred' + * @param[in] elem Element to be added + * @param[in,out] pred Add element after this + * @code + * struct a *list; # existing list + * struct a *new = malloc(...); + * INSQ(new, list); + * @endcode */ #define INSQ(elem, pred) { \ register qelem_t *Xe = (qelem_t *) (elem); \ @@ -79,9 +98,16 @@ typedef struct _qelem_t { pred = elem; \ } -/* - * Remove element 'elem' from queue. 'head' is the pointer to the queue and +/*! Remove element 'elem' from queue. 'head' is the pointer to the queue and * is of 'type'. + * @param[in] elem + * @param[in] head + * @param[in] type XXX needed? + * @code + * struct a *list; # existing list + * struct a *el; # remove this + * DELQ(el, list, struct a*); + * @endcode */ #define DELQ(elem, head, type) { \ register qelem_t *Xe = (qelem_t *) elem; \ @@ -92,10 +118,13 @@ typedef struct _qelem_t { head = (type)Xe->q_next; \ } -/* - * Get next entry in list +/*! Get next entry in list + * @param[in] type Type of element + * @param[in] el Return next element after elem. + * @code + * struct a *list; # existing element (or list) + * NEXTQ(struct a*, el); */ #define NEXTQ(type, elem) ((type)((elem)?((qelem_t *)(elem))->q_next:NULL)) - #endif /* _CLIXON_QUEUE_H_ */ diff --git a/lib/clixon/clixon_stream.h b/lib/clixon/clixon_stream.h index 94450cd9..d301aa36 100644 --- a/lib/clixon/clixon_stream.h +++ b/lib/clixon/clixon_stream.h @@ -51,17 +51,27 @@ struct stream_subscription{ struct stream_subscription *ss_next; char *ss_stream; /* Name of associated stream */ char *ss_xpath; /* Filter selector as xpath */ + struct timeval ss_stoptime; /* Replay stoptime */ stream_fn_t ss_fn; /* Callback when event occurs */ void *ss_arg; /* Callback argument */ }; +/* Replay time-series */ +struct stream_replay{ + qelem_t r_q; /* queue header */ + struct timeval r_tv; /* time index */ + cxobj *r_xml; /* event in xml form */ +}; + /* See RFC8040 9.3, stream list, no replay support for now */ struct event_stream{ struct event_stream *es_next; char *es_name; /* name of notification event stream */ - char *es_description; + char *es_description; struct stream_subscription *es_subscription; + int es_replay_enabled; /* set if replay is enables */ + struct stream_replay *es_replay; }; typedef struct event_stream event_stream_t; @@ -69,25 +79,26 @@ typedef struct event_stream event_stream_t; * Prototypes */ event_stream_t *stream_find(clicon_handle h, const char *name); -int stream_register(clicon_handle h, const char *name, const char *description); +int stream_register(clicon_handle h, const char *name, const char *description, int replay_enabled); int stream_delete_all(event_stream_t *es); int stream_get_xml(clicon_handle h, int access, cbuf *cb); -int stream_cb_add(clicon_handle h, char *stream, char *xpath, stream_fn_t fn, void *arg); +struct stream_subscription *stream_cb_add(clicon_handle h, char *stream, + char *xpath, struct timeval *stop, stream_fn_t fn, void *arg); int stream_cb_delete(clicon_handle h, char *stream, stream_fn_t fn, void *arg); -int stream_notify_xml(clicon_handle h, char *stream, cxobj *xevent); +int stream_notify_xml(clicon_handle h, event_stream_t *es, cxobj *xevent); #if defined(__GNUC__) && __GNUC__ >= 3 int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4))); #else int stream_notify(clicon_handle h, char *stream, const char *event, ...); #endif -/* Experimental publish streams using SSE */ + +int stream_replay(clicon_handle h, char *stream, struct timeval *start, struct timeval *stop); +int stream_replay_add(event_stream_t *es, struct timeval *tv, cxobj *xv); + +/* Experimental publish streams using SSE. CLIXON_PUBLISH_STREAMS should be set */ int stream_publish(clicon_handle h, char *stream); int stream_publish_init(); int stream_publish_exit(); -/* Backward compatible macro for <1.8 */ -#define backend_notify_xml(h, stream, level, x) stream_notify_xml(h, stream, x) -#define backend_notify(h, stream, level, event) stream_notify(h, stream, event) - #endif /* _CLIXON_STREAM_H_ */ diff --git a/lib/src/clixon_options.c b/lib/src/clixon_options.c index 016ca668..62202def 100644 --- a/lib/src/clixon_options.c +++ b/lib/src/clixon_options.c @@ -588,6 +588,40 @@ clicon_dbspec_yang_set(clicon_handle h, return 0; } +#if 1 /* Temporary function until "Top-level Yang symbol cannot be called "config"" is fixed */ +/*! Get YANG specification for clixon config + * Must use hash functions directly since they are not strings. + */ +yang_spec * +clicon_config_yang(clicon_handle h) +{ + clicon_hash_t *cdat = clicon_data(h); + size_t len; + void *p; + + if ((p = hash_value(cdat, "control_yang", &len)) != NULL) + return *(yang_spec **)p; + return NULL; +} + +/*! Set yang specification for control + * ys must be a malloced pointer + */ +int +clicon_config_yang_set(clicon_handle h, + struct yang_spec *ys) +{ + clicon_hash_t *cdat = clicon_data(h); + + /* It is the pointer to ys that should be copied by hash, + so we send a ptr to the ptr to indicate what to copy. + */ + if (hash_add(cdat, "control_yang", &ys, sizeof(ys)) == NULL) + return -1; + return 0; +} +#endif + /*! Get YANG specification for Clixon system options and features * Must use hash functions directly since they are not strings. * Example: features are typically accessed directly in the config tree. diff --git a/lib/src/clixon_stream.c b/lib/src/clixon_stream.c index 784a5556..3d1cc985 100644 --- a/lib/src/clixon_stream.c +++ b/lib/src/clixon_stream.c @@ -32,7 +32,11 @@ ***** END LICENSE BLOCK ***** * Event notification streams according to RFC5277 - * See (old) subscription code in clixon_backend_handle.c and backend_client.c + * The stream implementation has three parts: + * 1) Base stream handling: stream_find/register/delete_all/get_xml + * 2) Callback and notification handling (stream_cb_add/delete/timeout, stream_notify, etc + * 3) Stream replay: stream_replay/_add + * 4) nginx/nchan publish code (use --enable-publish config option) */ #ifdef HAVE_CONFIG_H @@ -88,7 +92,8 @@ stream_find(clicon_handle h, int stream_register(clicon_handle h, const char *name, - const char *description) + const char *description, + const int replay_enabled) { int retval = -1; event_stream_t *es; @@ -108,6 +113,7 @@ stream_register(clicon_handle h, clicon_err(OE_XML, errno, "strdup"); goto done; } + es->es_replay_enabled = replay_enabled; clicon_stream_append(h, es); ok: retval = 0; @@ -121,21 +127,28 @@ stream_register(clicon_handle h, int stream_delete_all(event_stream_t *es) { - event_stream_t *e_next; - + event_stream_t *e_next; + struct stream_replay *r; + while (es){ e_next = es->es_next; if (es->es_name) free(es->es_name); if (es->es_description) free(es->es_description); + while ((r = es->es_replay) != NULL){ + if (r->r_xml) + xml_free(r->r_xml); + DELQ(r, es->es_replay, struct stream_replay *); + free(r); + } free(es); es = e_next; } return 0; } -/*! Return stream definition +/*! Return stream definition state in XML supporting RFC 8040 and RFC5277 * @param[in] h Clicon handle * @param[in] access If set, include access/location * @param[out] cb Output buffer containing XML on exit @@ -189,21 +202,22 @@ stream_del() * @param[in] h Clicon handle * @param[in] stream Name of stream * @param[in] xpath Filter selector - xpath + * @param[in] stoptime If set, * @param[in] fn Callback when event occurs * @param[in] arg Argument to use with callback. Also handle when deleting * @retval 0 OK * @retval -1 Error, ie no such stream */ -int +struct stream_subscription * stream_cb_add(clicon_handle h, char *stream, char *xpath, + struct timeval *stoptime, stream_fn_t fn, void *arg) { - int retval = -1; event_stream_t *es; - struct stream_subscription *ss; + struct stream_subscription *ss = NULL; clicon_debug(1, "%s", __FUNCTION__); if ((es = stream_find(h, stream)) == NULL){ @@ -219,6 +233,8 @@ stream_cb_add(clicon_handle h, clicon_err(OE_CFG, errno, "strdup"); goto done; } + if (stoptime) + ss->ss_stoptime = *stoptime; if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){ clicon_err(OE_CFG, errno, "strdup"); goto done; @@ -227,9 +243,11 @@ stream_cb_add(clicon_handle h, ss->ss_arg = arg; ss->ss_next = es->es_subscription; es->es_subscription = ss; - retval = 0; + return ss; done: - return retval; + if (ss) + free(ss); + return NULL; } /*! Delete event notification callback to a stream given a callback and arg @@ -240,6 +258,34 @@ stream_cb_add(clicon_handle h, * @retval 0 OK * @retval -1 Error */ +static int +stream_cb_rm(event_stream_t *es, + struct stream_subscription *ssrm) +{ + struct stream_subscription **ss_prev; + struct stream_subscription *ss; + struct stream_subscription *ss_next; + + ss_prev = &es->es_subscription; + for (ss = *ss_prev; ss; ss = ss_next){ + ss_next = ss->ss_next; + if (ss == ssrm){ + *ss_prev = ss->ss_next; + if (ss->ss_stream) + free(ss->ss_stream); + if (ss->ss_xpath) + free(ss->ss_xpath); + free(ss); + continue; + // break; if more > 1 + } + ss_prev = &ss->ss_next; + } + return 0; +} + +/*! Remove stream subscription identified with fn and arg + */ int stream_cb_delete(clicon_handle h, char *stream, @@ -248,30 +294,43 @@ stream_cb_delete(clicon_handle h, { int retval = -1; event_stream_t *es; - struct stream_subscription **ss_prev; struct stream_subscription *ss; - struct stream_subscription *ss_next; for (es=clicon_stream(h); es; es=es->es_next){ if (stream && strcmp(stream, es->es_name)!=0) continue; - ss_prev = &es->es_subscription; - for (ss = *ss_prev; ss; ss = ss_next){ - ss_next = ss->ss_next; + for (ss = es->es_subscription; ss; ss = ss->ss_next){ if (fn == ss->ss_fn && arg == ss->ss_arg){ - *ss_prev = ss->ss_next; - if (ss->ss_stream) - free(ss->ss_stream); - if (ss->ss_xpath) - free(ss->ss_xpath); - free(ss); - continue; - // break; if more > 1 + if (stream_cb_rm(es, ss) < 0) + goto done; + break; /* scb_rm removes element, must break - but only one match */ } - ss_prev = &ss->ss_next; } } retval = 0; + done: + return retval; +} + +/*! Check if any stream subscriptions have timed out, if so remove them + */ +int +stream_cb_timeout(event_stream_t *es, + struct timeval *tv) +{ + int retval = -1; + struct stream_subscription *ss; + + again: /* If we remove must start again */ + for (ss = es->es_subscription; ss; ss = ss->ss_next){ + if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, tv, <)){ + if (stream_cb_rm(es, ss) < 0) + goto done; + goto again; + } + } + retval = 0; + done: return retval; } @@ -284,26 +343,23 @@ stream_cb_delete(clicon_handle h, * @see stream_notify */ int -stream_notify_xml(clicon_handle h, - char *stream, - cxobj *xevent) +stream_notify_xml(clicon_handle h, + event_stream_t *es, + cxobj *xevent) { int retval = -1; - event_stream_t *es; struct stream_subscription *ss; clicon_debug(1, "%s", __FUNCTION__); - if ((es = stream_find(h, stream)) == NULL) - goto ok; /* Go thru all global (handle) subscriptions and find matches */ for (ss = es->es_subscription; ss; ss = ss->ss_next){ + /* Check if this is of interest, ie not larger than stoptime? */ if (ss->ss_xpath == NULL || strlen(ss->ss_xpath)==0 || xpath_first(xevent, "%s", ss->ss_xpath) != NULL) if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0) goto done; } - ok: retval = 0; done: return retval; @@ -335,8 +391,11 @@ stream_notify(clicon_handle h, cbuf *cb = NULL; char timestr[27]; struct timeval tv; + event_stream_t *es; clicon_debug(1, "%s", __FUNCTION__); + if ((es = stream_find(h, stream)) == NULL) + goto ok; va_start(args, event); len = vsnprintf(NULL, 0, event, args) + 1; va_end(args); @@ -352,12 +411,18 @@ stream_notify(clicon_handle h, clicon_err(OE_YANG, 0, "No yang spec"); goto done; } + gettimeofday(&tv, NULL); +#if 1 + /* Go thru callbacks and see if any have timed out, if so remove them + * Could also be done by a separate timer. + */ + if (stream_cb_timeout(es, &tv) < 0) + goto done; +#endif if ((cb = cbuf_new()) == NULL){ clicon_err(OE_UNIX, errno, "cbuf_new"); goto done; } - - gettimeofday(&tv, NULL); if (time2str(tv, timestr, sizeof(timestr)) < 0){ clicon_err(OE_UNIX, errno, "time2str"); goto done; @@ -367,8 +432,14 @@ stream_notify(clicon_handle h, goto done; if (xml_rootchild(xev, 0, &xev) < 0) goto done; - if (stream_notify_xml(h, stream, xev) < 0) + if (stream_notify_xml(h, es, xev) < 0) goto done; + if (es->es_replay_enabled){ + if (stream_replay_add(es, &tv, xev) < 0) + goto done; + xev = NULL; /* xml stored in replay_add and should not be freed */ + } + ok: retval = 0; done: if (cb) @@ -380,7 +451,105 @@ stream_notify(clicon_handle h, return retval; } + +/*! Replay a stream + * Start Time: + A parameter, , used to trigger the replay feature + and indicate that the replay should start at the time + specified. If is not present, this is not a replay + subscription. It is not valid to specify start times that are + later than the current time. If the specified is + earlier than the log can support, the replay will begin with + the earliest available notification. This parameter is of type + dateTime and compliant to [RFC3339]. Implementations must + support time zones. + + Stop Time: + An optional parameter, , used with the optional + replay feature to indicate the newest notifications of + interest. If is not present, the notifications will + continue until the subscription is terminated. Must be used + with and be later than . Values of in + the future are valid. This parameter is of type dateTime and + compliant to [RFC3339]. Implementations must support time + zones. + + * Should we fork?? + * Assume no future sample timestamps. + */ +int +stream_replay(clicon_handle h, + char *stream, + struct timeval *start, + struct timeval *stop) +{ + int retval = -1; + event_stream_t *es; + struct stream_replay *r; + + if ((es = stream_find(h, stream)) == NULL){ + clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream); + goto done; + } + if (!es->es_replay_enabled) + goto ok; + if ((r = es->es_replay) == NULL) + goto ok; + /* First loop to skip until start */ + if (start){ + do { + if (timercmp(&r->r_tv, start, >=)) + break; + r = NEXTQ(struct stream_replay *, r); + } while (r && r!=es->es_replay); + if (r == NULL) + goto ok; + } + /* Then notify until stop */ + do { + if (stop && timercmp(&r->r_tv, stop, >)) + break; + if (stream_notify_xml(h, es, r->r_xml) < 0) + goto done; + r = NEXTQ(struct stream_replay *, r); + } while (r && r!=es->es_replay); + ok: + retval = 0; + done: + return retval; +} + +/*! Add replay sample to stream with timestamp + * @param[in] es Stream + * @param[in] tv Timestamp + * @param[in] xv XML + */ +int +stream_replay_add(event_stream_t *es, + struct timeval *tv, + cxobj *xv) +{ + int retval = -1; + struct stream_replay *new; + + if ((new = malloc(sizeof *new)) == NULL){ + clicon_err(OE_UNIX, errno, "malloc"); + goto done; + } + memset(new, 0, (sizeof *new)); + new->r_tv = *tv; + new->r_xml = xv; + ADDQ(new, es->es_replay); + retval = 0; + done: + return retval; +} + #ifdef CLIXON_PUBLISH_STREAMS +/* SSE support using Nginx Nchan. This code needs to be enabled at configure + * time using: --enable-publish configure option + * It uses CURL and autoconf needs to set that dependency + */ #include @@ -412,7 +581,6 @@ curl_get_cb(void *ptr, memcpy(buf->b_buf+buf->b_len, ptr, len); buf->b_len += len; buf->b_buf[buf->b_len] = '\0'; - fprintf(stderr, "%s: %s\n", __FUNCTION__, buf->b_buf); return len; } @@ -581,5 +749,3 @@ stream_publish_exit() #endif return 0; } - - diff --git a/test/test_stream.sh b/test/test_stream.sh index 3225bd2f..c06afaf4 100755 --- a/test/test_stream.sh +++ b/test/test_stream.sh @@ -139,6 +139,7 @@ fi # Netconf stream subscription # Switch here since subscriptions takes time if true; then + new "netconf EXAMPLE subscription" expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE]]>]]>' '^]]>]]>20' 5 @@ -152,6 +153,9 @@ new "netconf NONEXIST subscription" expectwait "$clixon_netconf -qf $cfg -y $fyang" 'NONEXIST]]>]]>' '^invalid-valueapplicationerrorNo such stream]]>]]>$' 5 fi +#new "netconf EXAMPLE subscription with replay" +#expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE2018-10-21T19:22:162018-10-21T19:25:00]]>]]>' '^]]>]]>20' 5 + new "Kill restconf daemon" sudo pkill -u www-data clixon_restconf