diff --git a/CHANGELOG.md b/CHANGELOG.md index 96008d20..5dd3e2d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ ``` ### Minor changes +* Allow new lines in CLI prompts * uri_percent_encode() and xml_chardata_encode() changed to use stdarg parameters * Added CLIXON_DEFAULT_CONFIG=/usr/local/etc/clixon.xml as option and in example (so you dont need to provide -f command-line option). * Yang 1.1 action syntax added (but function is not supported) diff --git a/apps/backend/backend_client.c b/apps/backend/backend_client.c index 4f1692cd..4a5fb622 100644 --- a/apps/backend/backend_client.c +++ b/apps/backend/backend_client.c @@ -84,8 +84,8 @@ ce_find_bypid(struct client_entry *ce_list, /*! Stream callback for netconf stream notification (RFC 5277) * @param[in] h Clicon handle * @param[in] event Event as XML - * @param[in] arg Extra argument provided in stream_cb_add - * @see stream_cb_add + * @param[in] arg Extra argument provided in stream_ss_add + * @see stream_ss_add */ static int ce_event_cb(clicon_handle h, @@ -116,7 +116,7 @@ ce_event_cb(clicon_handle h, * Close down everything wrt clients (eg sockets, subscriptions) * Finally actually remove client struct in handle * @param[in] h Clicon handle - * @param[in] ce Client hadnle + * @param[in] ce Client handle * @see backend_client_delete for actual deallocation of client entry struct */ int @@ -137,7 +137,7 @@ backend_client_rm(clicon_handle h, ce->ce_s = 0; } /* for all streams */ - stream_cb_delete(h, NULL, ce_event_cb, (void*)ce); + stream_ss_delete_all(h, ce_event_cb, (void*)ce); break; } ce_prev = &c->ce_next; @@ -807,7 +807,6 @@ from_client_delete_config(clicon_handle h, return retval; } - /*! Internal message: Create subscription for notifications see RFC 5277 * @param[in] h Clicon handle * @param[in] xe Netconf request xml tree @@ -840,7 +839,7 @@ from_client_create_subscription(clicon_handle h, char *selector = NULL; struct timeval start; struct timeval stop; - + if ((x = xpath_first(xe, "//stream")) != NULL) stream = xml_find_value(x, "body"); if ((x = xpath_first(xe, "//stopTime")) != NULL){ @@ -874,13 +873,20 @@ from_client_create_subscription(clicon_handle h, goto done; goto ok; } - if (stream_cb_add(h, stream, selector, stoptime?&stop:NULL, + if (stream_ss_add(h, stream, selector, + starttime?&start:NULL, stoptime?&stop:NULL, ce_event_cb, (void*)ce) < 0) goto done; - /* Replay of this stream to specific subscription according to start and stop*/ - if (starttime) /* XXX No this must be done _after_ this RPC completes */ - if (stream_replay(h, stream, &start, stoptime?&stop:NULL) < 0) + /* Replay of this stream to specific subscription according to start and + * stop (if present). + * RFC 5277: If is not present, this is not a replay + * subscription. + * Schedule the replay to occur right after this RPC completes, eg "now" + */ + if (starttime){ + if (stream_replay_trigger(h, stream, ce_event_cb, (void*)ce) < 0) goto done; + } cprintf(cbret, ""); ok: retval = 0; @@ -1274,7 +1280,7 @@ from_client_msg(clicon_handle h, } else if (strcmp(name, "close-session") == 0){ xmldb_unlock_all(h, pid); - stream_cb_delete(h, NULL, ce_event_cb, (void*)ce); + stream_ss_delete_all(h, ce_event_cb, (void*)ce); cprintf(cbret, ""); } else if (strcmp(name, "kill-session") == 0){ diff --git a/lib/clixon/clixon_stream.h b/lib/clixon/clixon_stream.h index d301aa36..b4dbf42b 100644 --- a/lib/clixon/clixon_stream.h +++ b/lib/clixon/clixon_stream.h @@ -42,15 +42,16 @@ /* Subscription callback * @param[in] h Clicon handle * @param[in] event Event as XML - * @param[in] arg Extra argument provided in stream_cb_add - * @see stream_cb_add + * @param[in] arg Extra argument provided in stream_ss_add + * @see stream_ss_add */ typedef int (*stream_fn_t)(clicon_handle h, cxobj *event, void *arg); struct stream_subscription{ - struct stream_subscription *ss_next; + qelem_t ss_q; /* queue header */ char *ss_stream; /* Name of associated stream */ char *ss_xpath; /* Filter selector as xpath */ + struct timeval ss_starttime; /* Replay starttime */ struct timeval ss_stoptime; /* Replay stoptime */ stream_fn_t ss_fn; /* Callback when event occurs */ void *ss_arg; /* Callback argument */ @@ -82,19 +83,26 @@ event_stream_t *stream_find(clicon_handle h, const char *name); int stream_register(clicon_handle h, const char *name, const char *description, int replay_enabled); int stream_delete_all(event_stream_t *es); int stream_get_xml(clicon_handle h, int access, cbuf *cb); -struct stream_subscription *stream_cb_add(clicon_handle h, char *stream, - char *xpath, struct timeval *stop, stream_fn_t fn, void *arg); -int stream_cb_delete(clicon_handle h, char *stream, stream_fn_t fn, void *arg); -int stream_notify_xml(clicon_handle h, event_stream_t *es, cxobj *xevent); +int stream_timer_setup(int fd, void *arg); +/* Subscriptions */ +struct stream_subscription *stream_ss_add(clicon_handle h, char *stream, + char *xpath, struct timeval *start, struct timeval *stop, + stream_fn_t fn, void *arg); +int stream_ss_rm(event_stream_t *es, struct stream_subscription *ss); +struct stream_subscription *stream_ss_find(event_stream_t *es, + stream_fn_t fn, void *arg); +int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg); + +int stream_notify_xml(clicon_handle h, event_stream_t *es, struct timeval *tv, cxobj *xevent); #if defined(__GNUC__) && __GNUC__ >= 3 int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4))); #else int stream_notify(clicon_handle h, char *stream, const char *event, ...); #endif - -int stream_replay(clicon_handle h, char *stream, struct timeval *start, struct timeval *stop); +/* Replay */ int stream_replay_add(event_stream_t *es, struct timeval *tv, cxobj *xv); +int stream_replay_trigger(clicon_handle h, char *stream, stream_fn_t fn, void *arg); /* Experimental publish streams using SSE. CLIXON_PUBLISH_STREAMS should be set */ int stream_publish(clicon_handle h, char *stream); diff --git a/lib/src/clixon_stream.c b/lib/src/clixon_stream.c index 3d1cc985..65d0d08a 100644 --- a/lib/src/clixon_stream.c +++ b/lib/src/clixon_stream.c @@ -34,9 +34,17 @@ * Event notification streams according to RFC5277 * The stream implementation has three parts: * 1) Base stream handling: stream_find/register/delete_all/get_xml - * 2) Callback and notification handling (stream_cb_add/delete/timeout, stream_notify, etc + * 2) Stream subscription handling (stream_ss_add/delete/timeout, stream_notify, etc * 3) Stream replay: stream_replay/_add * 4) nginx/nchan publish code (use --enable-publish config option) + * + * +---------------+ * +---------------+ * +---------------+ + * | clicon_handle |--------->| event_stream |--------->| subscription | + * +---------------+ +---------------+ +---------------+ + * \ * +---------------+ + * +-->| replay | + * +---------------+ + */ #ifdef HAVE_CONFIG_H @@ -58,6 +66,7 @@ #include "clixon_queue.h" #include "clixon_err.h" #include "clixon_log.h" +#include "clixon_event.h" #include "clixon_string.h" #include "clixon_hash.h" #include "clixon_handle.h" @@ -68,6 +77,9 @@ #include "clixon_xpath.h" #include "clixon_stream.h" +/* Go through and timeout subscription timers [s] */ +#define STREAM_TIMER_TIMEOUT_S 5 + /*! Find an event notification stream given name * @param[in] h Clicon handle * @param[in] name Name of stream @@ -129,6 +141,7 @@ stream_delete_all(event_stream_t *es) { event_stream_t *e_next; struct stream_replay *r; + struct stream_subscription *ss; while (es){ e_next = es->es_next; @@ -136,6 +149,8 @@ stream_delete_all(event_stream_t *es) free(es->es_name); if (es->es_description) free(es->es_description); + while ((ss = es->es_subscription) != NULL) + stream_ss_rm(es, ss); while ((r = es->es_replay) != NULL){ if (r->r_xml) xml_free(r->r_xml); @@ -186,6 +201,54 @@ stream_get_xml(clicon_handle h, return 0; } +/*! Check all stream subscription stop timers, set up new timer + * @param[in] fd No-op + * @param[in] arg Clicon handle + * @note format is given by event_reg_timeout callback function (fd not needed) + */ +int +stream_timer_setup(int fd, + void *arg) +{ + int retval = -1; + clicon_handle h = (clicon_handle)arg; + struct timeval now; + struct timeval t; + struct timeval t1 = {STREAM_TIMER_TIMEOUT_S, 0}; + event_stream_t *es; + struct stream_subscription *ss; + struct stream_subscription *ss1; + + /* Go thru callbacks and see if any have timed out, if so remove them + * Could also be done by a separate timer. + */ + gettimeofday(&now, NULL); + /* for all event streams, remove subscription if past stop time */ + for (es=clicon_stream(h); es; es=es->es_next){ + if ((ss = es->es_subscription) != NULL) + do { + if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){ + ss1 = NEXTQ(struct stream_subscription *, ss); + if (stream_ss_rm(es, ss) < 0) + goto done; + ss = ss1; + } + else + ss = NEXTQ(struct stream_subscription *, ss); + } while (ss && ss != es->es_subscription); + } + /* Initiate new timer */ + timeradd(&now, &t1, &t); + if (event_reg_timeout(t, + stream_timer_setup, /* this function */ + h, /* clicon handle */ + "stream timer setup") < 0) + goto done; + retval = 0; + done: + return retval; +} + #ifdef NYI /*! Delete single notification event stream * XXX notused @@ -197,21 +260,22 @@ stream_del() } #endif - /*! Add an event notification callback to a stream given a callback function - * @param[in] h Clicon handle - * @param[in] stream Name of stream - * @param[in] xpath Filter selector - xpath - * @param[in] stoptime If set, - * @param[in] fn Callback when event occurs - * @param[in] arg Argument to use with callback. Also handle when deleting - * @retval 0 OK - * @retval -1 Error, ie no such stream + * @param[in] h Clicon handle + * @param[in] stream Name of stream + * @param[in] xpath Filter selector - xpath + * @param[in] startime If set, Make a replay + * @param[in] stoptime If set, dont continue past this time + * @param[in] fn Callback when event occurs + * @param[in] arg Argument to use with callback. Also handle when deleting + * @retval 0 OK + * @retval -1 Error, ie no such stream */ struct stream_subscription * -stream_cb_add(clicon_handle h, +stream_ss_add(clicon_handle h, char *stream, char *xpath, + struct timeval *starttime, struct timeval *stoptime, stream_fn_t fn, void *arg) @@ -235,14 +299,15 @@ stream_cb_add(clicon_handle h, } if (stoptime) ss->ss_stoptime = *stoptime; + if (starttime) + ss->ss_starttime = *starttime; if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){ clicon_err(OE_CFG, errno, "strdup"); goto done; } ss->ss_fn = fn; ss->ss_arg = arg; - ss->ss_next = es->es_subscription; - es->es_subscription = ss; + ADDQ(ss, es->es_subscription); return ss; done: if (ss) @@ -250,7 +315,7 @@ stream_cb_add(clicon_handle h, return NULL; } -/*! Delete event notification callback to a stream given a callback and arg +/*! Delete event stream subscription to a stream given a callback and arg * @param[in] h Clicon handle * @param[in] stream Name of stream or NULL for all streams * @param[in] fn Callback when event occurs @@ -258,77 +323,61 @@ stream_cb_add(clicon_handle h, * @retval 0 OK * @retval -1 Error */ -static int -stream_cb_rm(event_stream_t *es, - struct stream_subscription *ssrm) +int +stream_ss_rm(event_stream_t *es, + struct stream_subscription *ss) { - struct stream_subscription **ss_prev; - struct stream_subscription *ss; - struct stream_subscription *ss_next; - - ss_prev = &es->es_subscription; - for (ss = *ss_prev; ss; ss = ss_next){ - ss_next = ss->ss_next; - if (ss == ssrm){ - *ss_prev = ss->ss_next; - if (ss->ss_stream) - free(ss->ss_stream); - if (ss->ss_xpath) - free(ss->ss_xpath); - free(ss); - continue; - // break; if more > 1 - } - ss_prev = &ss->ss_next; - } + DELQ(ss, es->es_subscription, struct stream_subscription *); + if (ss->ss_stream) + free(ss->ss_stream); + if (ss->ss_xpath) + free(ss->ss_xpath); + free(ss); return 0; } -/*! Remove stream subscription identified with fn and arg +/*! Find stream callback given callback function and its (unique) argument + * @param[in] es Pointer to event stream + * @param[in] fn Stream callback + * @param[in] arg Argument - typically unique client handle + * @retval ss Event stream subscription structure + * @retval NULL Not found + */ +struct stream_subscription * +stream_ss_find(event_stream_t *es, + stream_fn_t fn, + void *arg) +{ + struct stream_subscription *ss; + + if ((ss = es->es_subscription) != NULL) + do { + if (fn == ss->ss_fn && arg == ss->ss_arg) + return ss; + ss = NEXTQ(struct stream_subscription *, ss); + } while (ss && ss != es->es_subscription); + return NULL; +} + +/*! Remove stream subscription identified with fn and arg in all streams + * @param[in] h Clicon handle + * @param[in] stream Name of stream or NULL for all streams + * @param[in] fn Stream callback + * @param[in] arg Argument - typically unique client handle */ int -stream_cb_delete(clicon_handle h, - char *stream, - stream_fn_t fn, - void *arg) +stream_ss_delete_all(clicon_handle h, + stream_fn_t fn, + void *arg) { int retval = -1; event_stream_t *es; struct stream_subscription *ss; - - for (es=clicon_stream(h); es; es=es->es_next){ - if (stream && strcmp(stream, es->es_name)!=0) - continue; - for (ss = es->es_subscription; ss; ss = ss->ss_next){ - if (fn == ss->ss_fn && arg == ss->ss_arg){ - if (stream_cb_rm(es, ss) < 0) - goto done; - break; /* scb_rm removes element, must break - but only one match */ - } - } - } - retval = 0; - done: - return retval; -} -/*! Check if any stream subscriptions have timed out, if so remove them - */ -int -stream_cb_timeout(event_stream_t *es, - struct timeval *tv) -{ - int retval = -1; - struct stream_subscription *ss; - - again: /* If we remove must start again */ - for (ss = es->es_subscription; ss; ss = ss->ss_next){ - if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, tv, <)){ - if (stream_cb_rm(es, ss) < 0) + for (es=clicon_stream(h); es; es=es->es_next) + if ((ss = stream_ss_find(es, fn, arg)) != NULL) + if (stream_ss_rm(es, ss) < 0) goto done; - goto again; - } - } retval = 0; done: return retval; @@ -337,29 +386,37 @@ stream_cb_timeout(event_stream_t *es, /*! Stream notify event and distribute to all registered callbacks * @param[in] h Clicon handle * @param[in] stream Name of event stream. CLICON is predefined as LOG stream + * @param[in] tv Timestamp. Dont notify if subscription has stoptimees_subscription; ss; ss = ss->ss_next){ - /* Check if this is of interest, ie not larger than stoptime? */ - if (ss->ss_xpath == NULL || - strlen(ss->ss_xpath)==0 || - xpath_first(xevent, "%s", ss->ss_xpath) != NULL) - if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0) - goto done; - } + /* Go thru all subscriptions and find matches */ + if ((ss = es->es_subscription) != NULL) + do { + if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */ + timercmp(&ss->ss_stoptime, tv, <)) + ; + else /* xpath match */ + if (ss->ss_xpath == NULL || + strlen(ss->ss_xpath)==0 || + xpath_first(xevent, "%s", ss->ss_xpath) != NULL) + if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0) + goto done; + ss = NEXTQ(struct stream_subscription *, ss); + } while (ss && ss != es->es_subscription); retval = 0; done: return retval; @@ -411,18 +468,11 @@ stream_notify(clicon_handle h, clicon_err(OE_YANG, 0, "No yang spec"); goto done; } - gettimeofday(&tv, NULL); -#if 1 - /* Go thru callbacks and see if any have timed out, if so remove them - * Could also be done by a separate timer. - */ - if (stream_cb_timeout(es, &tv) < 0) - goto done; -#endif if ((cb = cbuf_new()) == NULL){ clicon_err(OE_UNIX, errno, "cbuf_new"); goto done; } + gettimeofday(&tv, NULL); if (time2str(tv, timestr, sizeof(timestr)) < 0){ clicon_err(OE_UNIX, errno, "time2str"); goto done; @@ -432,7 +482,7 @@ stream_notify(clicon_handle h, goto done; if (xml_rootchild(xev, 0, &xev) < 0) goto done; - if (stream_notify_xml(h, es, xev) < 0) + if (stream_notify_xml(h, es, &tv, xev) < 0) goto done; if (es->es_replay_enabled){ if (stream_replay_add(es, &tv, xev) < 0) @@ -451,8 +501,8 @@ stream_notify(clicon_handle h, return retval; } - -/*! Replay a stream +/*! Replay a stream by sending notification messages + * @see RFC5277 Sec 2.1.1: * Start Time: A parameter, , used to trigger the replay feature and indicate that the replay should start at the time @@ -474,42 +524,39 @@ stream_notify(clicon_handle h, compliant to [RFC3339]. Implementations must support time zones. - * Should we fork?? * Assume no future sample timestamps. */ -int -stream_replay(clicon_handle h, - char *stream, - struct timeval *start, - struct timeval *stop) +static int +stream_replay_notify(clicon_handle h, + event_stream_t *es, + struct stream_subscription *ss) { int retval = -1; - event_stream_t *es; + struct stream_replay *r; - if ((es = stream_find(h, stream)) == NULL){ - clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream); - goto done; - } + /* If is not present, this is not a replay */ + if (!timerisset(&ss->ss_starttime)) + goto ok; if (!es->es_replay_enabled) goto ok; + /* Get replay linked list */ if ((r = es->es_replay) == NULL) goto ok; /* First loop to skip until start */ - if (start){ - do { - if (timercmp(&r->r_tv, start, >=)) - break; - r = NEXTQ(struct stream_replay *, r); - } while (r && r!=es->es_replay); - if (r == NULL) - goto ok; - } + do { + if (timercmp(&r->r_tv, &ss->ss_starttime, >=)) + break; + r = NEXTQ(struct stream_replay *, r); + } while (r && r!=es->es_replay); + if (r == NULL) + goto ok; /* No samples to replay */ /* Then notify until stop */ do { - if (stop && timercmp(&r->r_tv, stop, >)) + if (timerisset(&ss->ss_stoptime) && + timercmp(&r->r_tv, &ss->ss_stoptime, >)) break; - if (stream_notify_xml(h, es, r->r_xml) < 0) + if ((*ss->ss_fn)(h, r->r_xml, ss->ss_arg) < 0) goto done; r = NEXTQ(struct stream_replay *, r); } while (r && r!=es->es_replay); @@ -545,6 +592,88 @@ stream_replay_add(event_stream_t *es, return retval; } +/* tmp struct for timeout callback containing clicon handle, + * stream and subscription + */ +struct replay_arg{ + clicon_handle ra_h; + char *ra_stream; /* Name of stream - malloced: free by cb */ + stream_fn_t ra_fn; /* Stream callback */ + void *ra_arg; /* Argument - typically unique client handle */ +}; + +/*! Timeout callback for replaying stream + * @param[in] fd Ignore + * @param[in] arg tmp struct including clicon handle, stream and subscription + */ +static int +stream_replay_cb(int fd, + void *arg) +{ + int retval = -1; + struct replay_arg *ra= (struct replay_arg*)arg; + event_stream_t *es; + struct stream_subscription *ss; + + if (ra == NULL) + goto ok; + if (ra->ra_stream == NULL) + goto ok; + if ((es = stream_find(ra->ra_h, ra->ra_stream)) == NULL) + goto ok; + if ((ss = stream_ss_find(es, ra->ra_fn, ra->ra_arg)) == NULL) + goto ok; + if (stream_replay_notify(ra->ra_h, es, ss) < 0) + goto done; + ok: + retval = 0; + done: + if (ra){ + if (ra->ra_stream) + free(ra->ra_stream); + free(ra); + } + return retval; +} + +/*! Schedule stream replay to occur asap, eg "now" + * + * @param[in] h clicon handle + * @param[in] stream Name of stream + * @param[in] fn Stream callback + * @param[in] arg Argument - typically unique client handle + */ +int +stream_replay_trigger(clicon_handle h, + char *stream, + stream_fn_t fn, + void *arg) +{ + int retval = -1; + struct timeval now; + struct replay_arg *ra; + + if ((ra = malloc(sizeof(*ra))) == NULL){ + clicon_err(OE_UNIX, errno, "malloc"); + goto done; + } + memset(ra, 0, sizeof(*ra)); + ra->ra_h = h; + if ((ra->ra_stream = strdup(stream)) == NULL){ + clicon_err(OE_UNIX, errno, "strdup"); + goto done; + } + ra->ra_fn = fn; + ra->ra_arg = arg; + gettimeofday(&now, NULL); + if (event_reg_timeout(now, stream_replay_cb, ra, + "create-subscribtion stream replay") < 0) + goto done; + retval = 0; + done: + return retval; +} + #ifdef CLIXON_PUBLISH_STREAMS /* SSE support using Nginx Nchan. This code needs to be enabled at configure * time using: --enable-publish configure option @@ -649,8 +778,8 @@ url_post(char *url, * Push via curl_post to publish stream event * @param[in] h Clicon handle * @param[in] event Event as XML - * @param[in] arg Extra argument provided in stream_cb_add - * @see stream_cb_add + * @param[in] arg Extra argument provided in stream_ss_add + * @see stream_ss_add */ static int stream_publish_cb(clicon_handle h, @@ -711,7 +840,7 @@ stream_publish(clicon_handle h, #ifdef CLIXON_PUBLISH_STREAMS int retval = -1; - if (stream_cb_add(h, stream, NULL, stream_publish_cb, (void*)stream) < 0) + if (stream_ss_add(h, stream, NULL, stream_publish_cb, (void*)stream) < 0) goto done; retval = 0; done: diff --git a/test/test_xml.sh b/test/test_xml.sh index d3a978ce..b24e9f70 100755 --- a/test/test_xml.sh +++ b/test/test_xml.sh @@ -49,5 +49,8 @@ expecteof "$PROG" 0 '' '^$' new "Single quotes for attributes (returns double quotes but at least parses right)" expecteof "$PROG" 0 "" '^$' +new "Mixed quotes" +expecteof "$PROG" 0 "" '^$' + rm -rf $dir