stream replay and subscription update

This commit is contained in:
Olof hagsand 2018-10-28 17:17:36 +01:00
parent 25c761202e
commit fb0d0606e3
5 changed files with 285 additions and 138 deletions

View file

@ -85,6 +85,7 @@
```
### Minor changes
* Allow new lines in CLI prompts
* uri_percent_encode() and xml_chardata_encode() changed to use stdarg parameters
* Added CLIXON_DEFAULT_CONFIG=/usr/local/etc/clixon.xml as option and in example (so you dont need to provide -f command-line option).
* Yang 1.1 action syntax added (but function is not supported)

View file

@ -84,8 +84,8 @@ ce_find_bypid(struct client_entry *ce_list,
/*! Stream callback for netconf stream notification (RFC 5277)
* @param[in] h Clicon handle
* @param[in] event Event as XML
* @param[in] arg Extra argument provided in stream_cb_add
* @see stream_cb_add
* @param[in] arg Extra argument provided in stream_ss_add
* @see stream_ss_add
*/
static int
ce_event_cb(clicon_handle h,
@ -116,7 +116,7 @@ ce_event_cb(clicon_handle h,
* Close down everything wrt clients (eg sockets, subscriptions)
* Finally actually remove client struct in handle
* @param[in] h Clicon handle
* @param[in] ce Client hadnle
* @param[in] ce Client handle
* @see backend_client_delete for actual deallocation of client entry struct
*/
int
@ -137,7 +137,7 @@ backend_client_rm(clicon_handle h,
ce->ce_s = 0;
}
/* for all streams */
stream_cb_delete(h, NULL, ce_event_cb, (void*)ce);
stream_ss_delete_all(h, ce_event_cb, (void*)ce);
break;
}
ce_prev = &c->ce_next;
@ -807,7 +807,6 @@ from_client_delete_config(clicon_handle h,
return retval;
}
/*! Internal message: Create subscription for notifications see RFC 5277
* @param[in] h Clicon handle
* @param[in] xe Netconf request xml tree
@ -840,7 +839,7 @@ from_client_create_subscription(clicon_handle h,
char *selector = NULL;
struct timeval start;
struct timeval stop;
if ((x = xpath_first(xe, "//stream")) != NULL)
stream = xml_find_value(x, "body");
if ((x = xpath_first(xe, "//stopTime")) != NULL){
@ -874,13 +873,20 @@ from_client_create_subscription(clicon_handle h,
goto done;
goto ok;
}
if (stream_cb_add(h, stream, selector, stoptime?&stop:NULL,
if (stream_ss_add(h, stream, selector,
starttime?&start:NULL, stoptime?&stop:NULL,
ce_event_cb, (void*)ce) < 0)
goto done;
/* Replay of this stream to specific subscription according to start and stop*/
if (starttime) /* XXX No this must be done _after_ this RPC completes */
if (stream_replay(h, stream, &start, stoptime?&stop:NULL) < 0)
/* Replay of this stream to specific subscription according to start and
* stop (if present).
* RFC 5277: If <startTime> is not present, this is not a replay
* subscription.
* Schedule the replay to occur right after this RPC completes, eg "now"
*/
if (starttime){
if (stream_replay_trigger(h, stream, ce_event_cb, (void*)ce) < 0)
goto done;
}
cprintf(cbret, "<rpc-reply><ok/></rpc-reply>");
ok:
retval = 0;
@ -1274,7 +1280,7 @@ from_client_msg(clicon_handle h,
}
else if (strcmp(name, "close-session") == 0){
xmldb_unlock_all(h, pid);
stream_cb_delete(h, NULL, ce_event_cb, (void*)ce);
stream_ss_delete_all(h, ce_event_cb, (void*)ce);
cprintf(cbret, "<rpc-reply><ok/></rpc-reply>");
}
else if (strcmp(name, "kill-session") == 0){

View file

@ -42,15 +42,16 @@
/* Subscription callback
* @param[in] h Clicon handle
* @param[in] event Event as XML
* @param[in] arg Extra argument provided in stream_cb_add
* @see stream_cb_add
* @param[in] arg Extra argument provided in stream_ss_add
* @see stream_ss_add
*/
typedef int (*stream_fn_t)(clicon_handle h, cxobj *event, void *arg);
struct stream_subscription{
struct stream_subscription *ss_next;
qelem_t ss_q; /* queue header */
char *ss_stream; /* Name of associated stream */
char *ss_xpath; /* Filter selector as xpath */
struct timeval ss_starttime; /* Replay starttime */
struct timeval ss_stoptime; /* Replay stoptime */
stream_fn_t ss_fn; /* Callback when event occurs */
void *ss_arg; /* Callback argument */
@ -82,19 +83,26 @@ event_stream_t *stream_find(clicon_handle h, const char *name);
int stream_register(clicon_handle h, const char *name, const char *description, int replay_enabled);
int stream_delete_all(event_stream_t *es);
int stream_get_xml(clicon_handle h, int access, cbuf *cb);
struct stream_subscription *stream_cb_add(clicon_handle h, char *stream,
char *xpath, struct timeval *stop, stream_fn_t fn, void *arg);
int stream_cb_delete(clicon_handle h, char *stream, stream_fn_t fn, void *arg);
int stream_notify_xml(clicon_handle h, event_stream_t *es, cxobj *xevent);
int stream_timer_setup(int fd, void *arg);
/* Subscriptions */
struct stream_subscription *stream_ss_add(clicon_handle h, char *stream,
char *xpath, struct timeval *start, struct timeval *stop,
stream_fn_t fn, void *arg);
int stream_ss_rm(event_stream_t *es, struct stream_subscription *ss);
struct stream_subscription *stream_ss_find(event_stream_t *es,
stream_fn_t fn, void *arg);
int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg);
int stream_notify_xml(clicon_handle h, event_stream_t *es, struct timeval *tv, cxobj *xevent);
#if defined(__GNUC__) && __GNUC__ >= 3
int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4)));
#else
int stream_notify(clicon_handle h, char *stream, const char *event, ...);
#endif
int stream_replay(clicon_handle h, char *stream, struct timeval *start, struct timeval *stop);
/* Replay */
int stream_replay_add(event_stream_t *es, struct timeval *tv, cxobj *xv);
int stream_replay_trigger(clicon_handle h, char *stream, stream_fn_t fn, void *arg);
/* Experimental publish streams using SSE. CLIXON_PUBLISH_STREAMS should be set */
int stream_publish(clicon_handle h, char *stream);

View file

@ -34,9 +34,17 @@
* Event notification streams according to RFC5277
* The stream implementation has three parts:
* 1) Base stream handling: stream_find/register/delete_all/get_xml
* 2) Callback and notification handling (stream_cb_add/delete/timeout, stream_notify, etc
* 2) Stream subscription handling (stream_ss_add/delete/timeout, stream_notify, etc
* 3) Stream replay: stream_replay/_add
* 4) nginx/nchan publish code (use --enable-publish config option)
*
* +---------------+ * +---------------+ * +---------------+
* | clicon_handle |--------->| event_stream |--------->| subscription |
* +---------------+ +---------------+ +---------------+
* \ * +---------------+
* +-->| replay |
* +---------------+
*/
#ifdef HAVE_CONFIG_H
@ -58,6 +66,7 @@
#include "clixon_queue.h"
#include "clixon_err.h"
#include "clixon_log.h"
#include "clixon_event.h"
#include "clixon_string.h"
#include "clixon_hash.h"
#include "clixon_handle.h"
@ -68,6 +77,9 @@
#include "clixon_xpath.h"
#include "clixon_stream.h"
/* Go through and timeout subscription timers [s] */
#define STREAM_TIMER_TIMEOUT_S 5
/*! Find an event notification stream given name
* @param[in] h Clicon handle
* @param[in] name Name of stream
@ -129,6 +141,7 @@ stream_delete_all(event_stream_t *es)
{
event_stream_t *e_next;
struct stream_replay *r;
struct stream_subscription *ss;
while (es){
e_next = es->es_next;
@ -136,6 +149,8 @@ stream_delete_all(event_stream_t *es)
free(es->es_name);
if (es->es_description)
free(es->es_description);
while ((ss = es->es_subscription) != NULL)
stream_ss_rm(es, ss);
while ((r = es->es_replay) != NULL){
if (r->r_xml)
xml_free(r->r_xml);
@ -186,6 +201,54 @@ stream_get_xml(clicon_handle h,
return 0;
}
/*! Check all stream subscription stop timers, set up new timer
* @param[in] fd No-op
* @param[in] arg Clicon handle
* @note format is given by event_reg_timeout callback function (fd not needed)
*/
int
stream_timer_setup(int fd,
void *arg)
{
int retval = -1;
clicon_handle h = (clicon_handle)arg;
struct timeval now;
struct timeval t;
struct timeval t1 = {STREAM_TIMER_TIMEOUT_S, 0};
event_stream_t *es;
struct stream_subscription *ss;
struct stream_subscription *ss1;
/* Go thru callbacks and see if any have timed out, if so remove them
* Could also be done by a separate timer.
*/
gettimeofday(&now, NULL);
/* for all event streams, remove subscription if past stop time */
for (es=clicon_stream(h); es; es=es->es_next){
if ((ss = es->es_subscription) != NULL)
do {
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
ss1 = NEXTQ(struct stream_subscription *, ss);
if (stream_ss_rm(es, ss) < 0)
goto done;
ss = ss1;
}
else
ss = NEXTQ(struct stream_subscription *, ss);
} while (ss && ss != es->es_subscription);
}
/* Initiate new timer */
timeradd(&now, &t1, &t);
if (event_reg_timeout(t,
stream_timer_setup, /* this function */
h, /* clicon handle */
"stream timer setup") < 0)
goto done;
retval = 0;
done:
return retval;
}
#ifdef NYI
/*! Delete single notification event stream
* XXX notused
@ -197,21 +260,22 @@ stream_del()
}
#endif
/*! Add an event notification callback to a stream given a callback function
* @param[in] h Clicon handle
* @param[in] stream Name of stream
* @param[in] xpath Filter selector - xpath
* @param[in] stoptime If set,
* @param[in] fn Callback when event occurs
* @param[in] arg Argument to use with callback. Also handle when deleting
* @retval 0 OK
* @retval -1 Error, ie no such stream
* @param[in] h Clicon handle
* @param[in] stream Name of stream
* @param[in] xpath Filter selector - xpath
* @param[in] startime If set, Make a replay
* @param[in] stoptime If set, dont continue past this time
* @param[in] fn Callback when event occurs
* @param[in] arg Argument to use with callback. Also handle when deleting
* @retval 0 OK
* @retval -1 Error, ie no such stream
*/
struct stream_subscription *
stream_cb_add(clicon_handle h,
stream_ss_add(clicon_handle h,
char *stream,
char *xpath,
struct timeval *starttime,
struct timeval *stoptime,
stream_fn_t fn,
void *arg)
@ -235,14 +299,15 @@ stream_cb_add(clicon_handle h,
}
if (stoptime)
ss->ss_stoptime = *stoptime;
if (starttime)
ss->ss_starttime = *starttime;
if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){
clicon_err(OE_CFG, errno, "strdup");
goto done;
}
ss->ss_fn = fn;
ss->ss_arg = arg;
ss->ss_next = es->es_subscription;
es->es_subscription = ss;
ADDQ(ss, es->es_subscription);
return ss;
done:
if (ss)
@ -250,7 +315,7 @@ stream_cb_add(clicon_handle h,
return NULL;
}
/*! Delete event notification callback to a stream given a callback and arg
/*! Delete event stream subscription to a stream given a callback and arg
* @param[in] h Clicon handle
* @param[in] stream Name of stream or NULL for all streams
* @param[in] fn Callback when event occurs
@ -258,77 +323,61 @@ stream_cb_add(clicon_handle h,
* @retval 0 OK
* @retval -1 Error
*/
static int
stream_cb_rm(event_stream_t *es,
struct stream_subscription *ssrm)
int
stream_ss_rm(event_stream_t *es,
struct stream_subscription *ss)
{
struct stream_subscription **ss_prev;
struct stream_subscription *ss;
struct stream_subscription *ss_next;
ss_prev = &es->es_subscription;
for (ss = *ss_prev; ss; ss = ss_next){
ss_next = ss->ss_next;
if (ss == ssrm){
*ss_prev = ss->ss_next;
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
continue;
// break; if more > 1
}
ss_prev = &ss->ss_next;
}
DELQ(ss, es->es_subscription, struct stream_subscription *);
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
return 0;
}
/*! Remove stream subscription identified with fn and arg
/*! Find stream callback given callback function and its (unique) argument
* @param[in] es Pointer to event stream
* @param[in] fn Stream callback
* @param[in] arg Argument - typically unique client handle
* @retval ss Event stream subscription structure
* @retval NULL Not found
*/
struct stream_subscription *
stream_ss_find(event_stream_t *es,
stream_fn_t fn,
void *arg)
{
struct stream_subscription *ss;
if ((ss = es->es_subscription) != NULL)
do {
if (fn == ss->ss_fn && arg == ss->ss_arg)
return ss;
ss = NEXTQ(struct stream_subscription *, ss);
} while (ss && ss != es->es_subscription);
return NULL;
}
/*! Remove stream subscription identified with fn and arg in all streams
* @param[in] h Clicon handle
* @param[in] stream Name of stream or NULL for all streams
* @param[in] fn Stream callback
* @param[in] arg Argument - typically unique client handle
*/
int
stream_cb_delete(clicon_handle h,
char *stream,
stream_fn_t fn,
void *arg)
stream_ss_delete_all(clicon_handle h,
stream_fn_t fn,
void *arg)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
for (es=clicon_stream(h); es; es=es->es_next){
if (stream && strcmp(stream, es->es_name)!=0)
continue;
for (ss = es->es_subscription; ss; ss = ss->ss_next){
if (fn == ss->ss_fn && arg == ss->ss_arg){
if (stream_cb_rm(es, ss) < 0)
goto done;
break; /* scb_rm removes element, must break - but only one match */
}
}
}
retval = 0;
done:
return retval;
}
/*! Check if any stream subscriptions have timed out, if so remove them
*/
int
stream_cb_timeout(event_stream_t *es,
struct timeval *tv)
{
int retval = -1;
struct stream_subscription *ss;
again: /* If we remove must start again */
for (ss = es->es_subscription; ss; ss = ss->ss_next){
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, tv, <)){
if (stream_cb_rm(es, ss) < 0)
for (es=clicon_stream(h); es; es=es->es_next)
if ((ss = stream_ss_find(es, fn, arg)) != NULL)
if (stream_ss_rm(es, ss) < 0)
goto done;
goto again;
}
}
retval = 0;
done:
return retval;
@ -337,29 +386,37 @@ stream_cb_timeout(event_stream_t *es,
/*! Stream notify event and distribute to all registered callbacks
* @param[in] h Clicon handle
* @param[in] stream Name of event stream. CLICON is predefined as LOG stream
* @param[in] tv Timestamp. Dont notify if subscription has stoptime<tv
* @param[in] event Notification as xml tree
* @retval 0 OK
* @retval -1 Error with clicon_err called
* @see stream_notify
* @see stream_ss_timeout where subscriptions are removed if stoptime<now
*/
int
stream_notify_xml(clicon_handle h,
event_stream_t *es,
struct timeval *tv,
cxobj *xevent)
{
int retval = -1;
struct stream_subscription *ss;
clicon_debug(1, "%s", __FUNCTION__);
/* Go thru all global (handle) subscriptions and find matches */
for (ss = es->es_subscription; ss; ss = ss->ss_next){
/* Check if this is of interest, ie not larger than stoptime? */
if (ss->ss_xpath == NULL ||
strlen(ss->ss_xpath)==0 ||
xpath_first(xevent, "%s", ss->ss_xpath) != NULL)
if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0)
goto done;
}
/* Go thru all subscriptions and find matches */
if ((ss = es->es_subscription) != NULL)
do {
if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */
timercmp(&ss->ss_stoptime, tv, <))
;
else /* xpath match */
if (ss->ss_xpath == NULL ||
strlen(ss->ss_xpath)==0 ||
xpath_first(xevent, "%s", ss->ss_xpath) != NULL)
if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0)
goto done;
ss = NEXTQ(struct stream_subscription *, ss);
} while (ss && ss != es->es_subscription);
retval = 0;
done:
return retval;
@ -411,18 +468,11 @@ stream_notify(clicon_handle h,
clicon_err(OE_YANG, 0, "No yang spec");
goto done;
}
gettimeofday(&tv, NULL);
#if 1
/* Go thru callbacks and see if any have timed out, if so remove them
* Could also be done by a separate timer.
*/
if (stream_cb_timeout(es, &tv) < 0)
goto done;
#endif
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_UNIX, errno, "cbuf_new");
goto done;
}
gettimeofday(&tv, NULL);
if (time2str(tv, timestr, sizeof(timestr)) < 0){
clicon_err(OE_UNIX, errno, "time2str");
goto done;
@ -432,7 +482,7 @@ stream_notify(clicon_handle h,
goto done;
if (xml_rootchild(xev, 0, &xev) < 0)
goto done;
if (stream_notify_xml(h, es, xev) < 0)
if (stream_notify_xml(h, es, &tv, xev) < 0)
goto done;
if (es->es_replay_enabled){
if (stream_replay_add(es, &tv, xev) < 0)
@ -451,8 +501,8 @@ stream_notify(clicon_handle h,
return retval;
}
/*! Replay a stream
/*! Replay a stream by sending notification messages
* @see RFC5277 Sec 2.1.1:
* Start Time:
A parameter, <startTime>, used to trigger the replay feature
and indicate that the replay should start at the time
@ -474,42 +524,39 @@ stream_notify(clicon_handle h,
compliant to [RFC3339]. Implementations must support time
zones.
* Should we fork??
* Assume no future sample timestamps.
*/
int
stream_replay(clicon_handle h,
char *stream,
struct timeval *start,
struct timeval *stop)
static int
stream_replay_notify(clicon_handle h,
event_stream_t *es,
struct stream_subscription *ss)
{
int retval = -1;
event_stream_t *es;
struct stream_replay *r;
if ((es = stream_find(h, stream)) == NULL){
clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream);
goto done;
}
/* If <startTime> is not present, this is not a replay */
if (!timerisset(&ss->ss_starttime))
goto ok;
if (!es->es_replay_enabled)
goto ok;
/* Get replay linked list */
if ((r = es->es_replay) == NULL)
goto ok;
/* First loop to skip until start */
if (start){
do {
if (timercmp(&r->r_tv, start, >=))
break;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);
if (r == NULL)
goto ok;
}
do {
if (timercmp(&r->r_tv, &ss->ss_starttime, >=))
break;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);
if (r == NULL)
goto ok; /* No samples to replay */
/* Then notify until stop */
do {
if (stop && timercmp(&r->r_tv, stop, >))
if (timerisset(&ss->ss_stoptime) &&
timercmp(&r->r_tv, &ss->ss_stoptime, >))
break;
if (stream_notify_xml(h, es, r->r_xml) < 0)
if ((*ss->ss_fn)(h, r->r_xml, ss->ss_arg) < 0)
goto done;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);
@ -545,6 +592,88 @@ stream_replay_add(event_stream_t *es,
return retval;
}
/* tmp struct for timeout callback containing clicon handle,
* stream and subscription
*/
struct replay_arg{
clicon_handle ra_h;
char *ra_stream; /* Name of stream - malloced: free by cb */
stream_fn_t ra_fn; /* Stream callback */
void *ra_arg; /* Argument - typically unique client handle */
};
/*! Timeout callback for replaying stream
* @param[in] fd Ignore
* @param[in] arg tmp struct including clicon handle, stream and subscription
*/
static int
stream_replay_cb(int fd,
void *arg)
{
int retval = -1;
struct replay_arg *ra= (struct replay_arg*)arg;
event_stream_t *es;
struct stream_subscription *ss;
if (ra == NULL)
goto ok;
if (ra->ra_stream == NULL)
goto ok;
if ((es = stream_find(ra->ra_h, ra->ra_stream)) == NULL)
goto ok;
if ((ss = stream_ss_find(es, ra->ra_fn, ra->ra_arg)) == NULL)
goto ok;
if (stream_replay_notify(ra->ra_h, es, ss) < 0)
goto done;
ok:
retval = 0;
done:
if (ra){
if (ra->ra_stream)
free(ra->ra_stream);
free(ra);
}
return retval;
}
/*! Schedule stream replay to occur asap, eg "now"
*
* @param[in] h clicon handle
* @param[in] stream Name of stream
* @param[in] fn Stream callback
* @param[in] arg Argument - typically unique client handle
*/
int
stream_replay_trigger(clicon_handle h,
char *stream,
stream_fn_t fn,
void *arg)
{
int retval = -1;
struct timeval now;
struct replay_arg *ra;
if ((ra = malloc(sizeof(*ra))) == NULL){
clicon_err(OE_UNIX, errno, "malloc");
goto done;
}
memset(ra, 0, sizeof(*ra));
ra->ra_h = h;
if ((ra->ra_stream = strdup(stream)) == NULL){
clicon_err(OE_UNIX, errno, "strdup");
goto done;
}
ra->ra_fn = fn;
ra->ra_arg = arg;
gettimeofday(&now, NULL);
if (event_reg_timeout(now, stream_replay_cb, ra,
"create-subscribtion stream replay") < 0)
goto done;
retval = 0;
done:
return retval;
}
#ifdef CLIXON_PUBLISH_STREAMS
/* SSE support using Nginx Nchan. This code needs to be enabled at configure
* time using: --enable-publish configure option
@ -649,8 +778,8 @@ url_post(char *url,
* Push via curl_post to publish stream event
* @param[in] h Clicon handle
* @param[in] event Event as XML
* @param[in] arg Extra argument provided in stream_cb_add
* @see stream_cb_add
* @param[in] arg Extra argument provided in stream_ss_add
* @see stream_ss_add
*/
static int
stream_publish_cb(clicon_handle h,
@ -711,7 +840,7 @@ stream_publish(clicon_handle h,
#ifdef CLIXON_PUBLISH_STREAMS
int retval = -1;
if (stream_cb_add(h, stream, NULL, stream_publish_cb, (void*)stream) < 0)
if (stream_ss_add(h, stream, NULL, stream_publish_cb, (void*)stream) < 0)
goto done;
retval = 0;
done:

View file

@ -49,5 +49,8 @@ expecteof "$PROG" 0 '<x a="t"/>' '^<x a="t"/>$'
new "Single quotes for attributes (returns double quotes but at least parses right)"
expecteof "$PROG" 0 "<x a='t'/>" '^<x a="t"/>$'
new "Mixed quotes"
expecteof "$PROG" 0 "<x a='t' b=\"q\"/>" '^<x a="t" b="q"/>$'
rm -rf $dir