* Stream replay support

* RFC8040 Restconf replay support: start-time and stop-time query parameter support
    * This only applies to "native" restconf stream support, Nchan mode has different replay functionality
  * RFC5277 Netconf replay support using <startTime> and
  * Replay support is only in-memory and not persistent. External time-series DB could be added.
This commit is contained in:
Olof hagsand 2018-10-27 11:12:01 +02:00
parent 77e56868d4
commit af16760279
13 changed files with 355 additions and 65 deletions

View file

@ -588,6 +588,40 @@ clicon_dbspec_yang_set(clicon_handle h,
return 0;
}
#if 1 /* Temporary function until "Top-level Yang symbol cannot be called "config"" is fixed */
/*! Get YANG specification for clixon config
* Must use hash functions directly since they are not strings.
*/
yang_spec *
clicon_config_yang(clicon_handle h)
{
clicon_hash_t *cdat = clicon_data(h);
size_t len;
void *p;
if ((p = hash_value(cdat, "control_yang", &len)) != NULL)
return *(yang_spec **)p;
return NULL;
}
/*! Set yang specification for control
* ys must be a malloced pointer
*/
int
clicon_config_yang_set(clicon_handle h,
struct yang_spec *ys)
{
clicon_hash_t *cdat = clicon_data(h);
/* It is the pointer to ys that should be copied by hash,
so we send a ptr to the ptr to indicate what to copy.
*/
if (hash_add(cdat, "control_yang", &ys, sizeof(ys)) == NULL)
return -1;
return 0;
}
#endif
/*! Get YANG specification for Clixon system options and features
* Must use hash functions directly since they are not strings.
* Example: features are typically accessed directly in the config tree.

View file

@ -32,7 +32,11 @@
***** END LICENSE BLOCK *****
* Event notification streams according to RFC5277
* See (old) subscription code in clixon_backend_handle.c and backend_client.c
* The stream implementation has three parts:
* 1) Base stream handling: stream_find/register/delete_all/get_xml
* 2) Callback and notification handling (stream_cb_add/delete/timeout, stream_notify, etc
* 3) Stream replay: stream_replay/_add
* 4) nginx/nchan publish code (use --enable-publish config option)
*/
#ifdef HAVE_CONFIG_H
@ -88,7 +92,8 @@ stream_find(clicon_handle h,
int
stream_register(clicon_handle h,
const char *name,
const char *description)
const char *description,
const int replay_enabled)
{
int retval = -1;
event_stream_t *es;
@ -108,6 +113,7 @@ stream_register(clicon_handle h,
clicon_err(OE_XML, errno, "strdup");
goto done;
}
es->es_replay_enabled = replay_enabled;
clicon_stream_append(h, es);
ok:
retval = 0;
@ -121,21 +127,28 @@ stream_register(clicon_handle h,
int
stream_delete_all(event_stream_t *es)
{
event_stream_t *e_next;
event_stream_t *e_next;
struct stream_replay *r;
while (es){
e_next = es->es_next;
if (es->es_name)
free(es->es_name);
if (es->es_description)
free(es->es_description);
while ((r = es->es_replay) != NULL){
if (r->r_xml)
xml_free(r->r_xml);
DELQ(r, es->es_replay, struct stream_replay *);
free(r);
}
free(es);
es = e_next;
}
return 0;
}
/*! Return stream definition
/*! Return stream definition state in XML supporting RFC 8040 and RFC5277
* @param[in] h Clicon handle
* @param[in] access If set, include access/location
* @param[out] cb Output buffer containing XML on exit
@ -189,21 +202,22 @@ stream_del()
* @param[in] h Clicon handle
* @param[in] stream Name of stream
* @param[in] xpath Filter selector - xpath
* @param[in] stoptime If set,
* @param[in] fn Callback when event occurs
* @param[in] arg Argument to use with callback. Also handle when deleting
* @retval 0 OK
* @retval -1 Error, ie no such stream
*/
int
struct stream_subscription *
stream_cb_add(clicon_handle h,
char *stream,
char *xpath,
struct timeval *stoptime,
stream_fn_t fn,
void *arg)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
struct stream_subscription *ss = NULL;
clicon_debug(1, "%s", __FUNCTION__);
if ((es = stream_find(h, stream)) == NULL){
@ -219,6 +233,8 @@ stream_cb_add(clicon_handle h,
clicon_err(OE_CFG, errno, "strdup");
goto done;
}
if (stoptime)
ss->ss_stoptime = *stoptime;
if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){
clicon_err(OE_CFG, errno, "strdup");
goto done;
@ -227,9 +243,11 @@ stream_cb_add(clicon_handle h,
ss->ss_arg = arg;
ss->ss_next = es->es_subscription;
es->es_subscription = ss;
retval = 0;
return ss;
done:
return retval;
if (ss)
free(ss);
return NULL;
}
/*! Delete event notification callback to a stream given a callback and arg
@ -240,6 +258,34 @@ stream_cb_add(clicon_handle h,
* @retval 0 OK
* @retval -1 Error
*/
static int
stream_cb_rm(event_stream_t *es,
struct stream_subscription *ssrm)
{
struct stream_subscription **ss_prev;
struct stream_subscription *ss;
struct stream_subscription *ss_next;
ss_prev = &es->es_subscription;
for (ss = *ss_prev; ss; ss = ss_next){
ss_next = ss->ss_next;
if (ss == ssrm){
*ss_prev = ss->ss_next;
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
continue;
// break; if more > 1
}
ss_prev = &ss->ss_next;
}
return 0;
}
/*! Remove stream subscription identified with fn and arg
*/
int
stream_cb_delete(clicon_handle h,
char *stream,
@ -248,30 +294,43 @@ stream_cb_delete(clicon_handle h,
{
int retval = -1;
event_stream_t *es;
struct stream_subscription **ss_prev;
struct stream_subscription *ss;
struct stream_subscription *ss_next;
for (es=clicon_stream(h); es; es=es->es_next){
if (stream && strcmp(stream, es->es_name)!=0)
continue;
ss_prev = &es->es_subscription;
for (ss = *ss_prev; ss; ss = ss_next){
ss_next = ss->ss_next;
for (ss = es->es_subscription; ss; ss = ss->ss_next){
if (fn == ss->ss_fn && arg == ss->ss_arg){
*ss_prev = ss->ss_next;
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
continue;
// break; if more > 1
if (stream_cb_rm(es, ss) < 0)
goto done;
break; /* scb_rm removes element, must break - but only one match */
}
ss_prev = &ss->ss_next;
}
}
retval = 0;
done:
return retval;
}
/*! Check if any stream subscriptions have timed out, if so remove them
*/
int
stream_cb_timeout(event_stream_t *es,
struct timeval *tv)
{
int retval = -1;
struct stream_subscription *ss;
again: /* If we remove must start again */
for (ss = es->es_subscription; ss; ss = ss->ss_next){
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, tv, <)){
if (stream_cb_rm(es, ss) < 0)
goto done;
goto again;
}
}
retval = 0;
done:
return retval;
}
@ -284,26 +343,23 @@ stream_cb_delete(clicon_handle h,
* @see stream_notify
*/
int
stream_notify_xml(clicon_handle h,
char *stream,
cxobj *xevent)
stream_notify_xml(clicon_handle h,
event_stream_t *es,
cxobj *xevent)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
clicon_debug(1, "%s", __FUNCTION__);
if ((es = stream_find(h, stream)) == NULL)
goto ok;
/* Go thru all global (handle) subscriptions and find matches */
for (ss = es->es_subscription; ss; ss = ss->ss_next){
/* Check if this is of interest, ie not larger than stoptime? */
if (ss->ss_xpath == NULL ||
strlen(ss->ss_xpath)==0 ||
xpath_first(xevent, "%s", ss->ss_xpath) != NULL)
if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0)
goto done;
}
ok:
retval = 0;
done:
return retval;
@ -335,8 +391,11 @@ stream_notify(clicon_handle h,
cbuf *cb = NULL;
char timestr[27];
struct timeval tv;
event_stream_t *es;
clicon_debug(1, "%s", __FUNCTION__);
if ((es = stream_find(h, stream)) == NULL)
goto ok;
va_start(args, event);
len = vsnprintf(NULL, 0, event, args) + 1;
va_end(args);
@ -352,12 +411,18 @@ stream_notify(clicon_handle h,
clicon_err(OE_YANG, 0, "No yang spec");
goto done;
}
gettimeofday(&tv, NULL);
#if 1
/* Go thru callbacks and see if any have timed out, if so remove them
* Could also be done by a separate timer.
*/
if (stream_cb_timeout(es, &tv) < 0)
goto done;
#endif
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_UNIX, errno, "cbuf_new");
goto done;
}
gettimeofday(&tv, NULL);
if (time2str(tv, timestr, sizeof(timestr)) < 0){
clicon_err(OE_UNIX, errno, "time2str");
goto done;
@ -367,8 +432,14 @@ stream_notify(clicon_handle h,
goto done;
if (xml_rootchild(xev, 0, &xev) < 0)
goto done;
if (stream_notify_xml(h, stream, xev) < 0)
if (stream_notify_xml(h, es, xev) < 0)
goto done;
if (es->es_replay_enabled){
if (stream_replay_add(es, &tv, xev) < 0)
goto done;
xev = NULL; /* xml stored in replay_add and should not be freed */
}
ok:
retval = 0;
done:
if (cb)
@ -380,7 +451,105 @@ stream_notify(clicon_handle h,
return retval;
}
/*! Replay a stream
* Start Time:
A parameter, <startTime>, used to trigger the replay feature
and indicate that the replay should start at the time
specified. If <startTime> is not present, this is not a replay
subscription. It is not valid to specify start times that are
later than the current time. If the <startTime> specified is
earlier than the log can support, the replay will begin with
the earliest available notification. This parameter is of type
dateTime and compliant to [RFC3339]. Implementations must
support time zones.
Stop Time:
An optional parameter, <stopTime>, used with the optional
replay feature to indicate the newest notifications of
interest. If <stopTime> is not present, the notifications will
continue until the subscription is terminated. Must be used
with and be later than <startTime>. Values of <stopTime> in
the future are valid. This parameter is of type dateTime and
compliant to [RFC3339]. Implementations must support time
zones.
* Should we fork??
* Assume no future sample timestamps.
*/
int
stream_replay(clicon_handle h,
char *stream,
struct timeval *start,
struct timeval *stop)
{
int retval = -1;
event_stream_t *es;
struct stream_replay *r;
if ((es = stream_find(h, stream)) == NULL){
clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream);
goto done;
}
if (!es->es_replay_enabled)
goto ok;
if ((r = es->es_replay) == NULL)
goto ok;
/* First loop to skip until start */
if (start){
do {
if (timercmp(&r->r_tv, start, >=))
break;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);
if (r == NULL)
goto ok;
}
/* Then notify until stop */
do {
if (stop && timercmp(&r->r_tv, stop, >))
break;
if (stream_notify_xml(h, es, r->r_xml) < 0)
goto done;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);
ok:
retval = 0;
done:
return retval;
}
/*! Add replay sample to stream with timestamp
* @param[in] es Stream
* @param[in] tv Timestamp
* @param[in] xv XML
*/
int
stream_replay_add(event_stream_t *es,
struct timeval *tv,
cxobj *xv)
{
int retval = -1;
struct stream_replay *new;
if ((new = malloc(sizeof *new)) == NULL){
clicon_err(OE_UNIX, errno, "malloc");
goto done;
}
memset(new, 0, (sizeof *new));
new->r_tv = *tv;
new->r_xml = xv;
ADDQ(new, es->es_replay);
retval = 0;
done:
return retval;
}
#ifdef CLIXON_PUBLISH_STREAMS
/* SSE support using Nginx Nchan. This code needs to be enabled at configure
* time using: --enable-publish configure option
* It uses CURL and autoconf needs to set that dependency
*/
#include <curl/curl.h>
@ -412,7 +581,6 @@ curl_get_cb(void *ptr,
memcpy(buf->b_buf+buf->b_len, ptr, len);
buf->b_len += len;
buf->b_buf[buf->b_len] = '\0';
fprintf(stderr, "%s: %s\n", __FUNCTION__, buf->b_buf);
return len;
}
@ -581,5 +749,3 @@ stream_publish_exit()
#endif
return 0;
}