* Major rewrite of event streams

* If you used old event callbacks API, you need to switch to the streams API
    * See clixon_stream.[ch]
  * Old streams API which needs to be removed include:
    * clicon_log_register_callback()
    * subscription_add() --> stream_register()
    * backend_notify() and backend_notify_xml() - use stream_notify() instead
* Example uses "NETCONF" stream instead of "ROUTING"
* Added timeout option -t for clixon_netconf - quit after max time.
This commit is contained in:
Olof hagsand 2018-09-30 14:51:30 +02:00
parent d7fbe75c9e
commit 98f3cd0e32
31 changed files with 597 additions and 635 deletions

View file

@ -44,6 +44,7 @@
#include <string.h>
#include <errno.h>
#include <inttypes.h>
#include <sys/time.h>
/* cligen */
#include <cligen/cligen.h>
@ -51,8 +52,14 @@
/* clicon */
#include "clixon_queue.h"
#include "clixon_err.h"
#include "clixon_string.h"
#include "clixon_hash.h"
#include "clixon_handle.h"
#include "clixon_yang.h"
#include "clixon_xml.h"
#include "clixon_options.h"
#include "clixon_xpath_ctx.h"
#include "clixon_xpath.h"
#include "clixon_stream.h"
/*! Find an event notification stream given name
@ -74,6 +81,7 @@ stream_find(clicon_handle h,
}
/*! Add notification event stream
*
*/
int
stream_register(clicon_handle h,
@ -105,10 +113,10 @@ stream_register(clicon_handle h,
return retval;
}
/*! Delete complete notification event stream list
/*! Delete complete notification event stream list (not just single stream)
*/
int
stream_free(event_stream_t *es)
stream_delete_all(event_stream_t *es)
{
event_stream_t *e_next;
@ -148,8 +156,11 @@ stream_get_xml(clicon_handle h,
if (access){
cprintf(cb, "<access>");
cprintf(cb, "<encoding>xml</encoding>");
/* Note /stream need to be in http proxy declaration */
cprintf(cb, "<location>https://example.com/stream/%s</location>", es->es_name);
/* Note /stream need to be in http proxy declaration
* XXX
*/
cprintf(cb, "<location>/stream/%s</location>", es->es_name);
cprintf(cb, "</access>");
}
cprintf(cb, "</stream>");
@ -173,6 +184,7 @@ stream_del()
/*! Add an event notification callback to a stream given a callback function
* @param[in] h Clicon handle
* @param[in] stream Name of stream
* @param[in] xpath Filter selector - xpath
* @param[in] fn Callback when event occurs
* @param[in] arg Argument to use with callback. Also handle when deleting
* @retval 0 OK
@ -182,6 +194,7 @@ stream_del()
int
stream_cb_add(clicon_handle h,
char *stream,
char *xpath,
stream_fn_t fn,
void *arg)
{
@ -190,7 +203,7 @@ stream_cb_add(clicon_handle h,
struct stream_subscription *ss;
if ((es = stream_find(h, stream)) == NULL){
clicon_err(OE_CFG, ENOENT, "Stream not found");
clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream);
goto done;
}
if ((ss = malloc(sizeof(*ss))) == NULL){
@ -198,7 +211,14 @@ stream_cb_add(clicon_handle h,
goto done;
}
memset(ss, 0, sizeof(*ss));
ss->ss_stream = strdup(stream);
if ((ss->ss_stream = strdup(stream)) == NULL){
clicon_err(OE_CFG, errno, "strdup");
goto done;
}
if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){
clicon_err(OE_CFG, errno, "strdup");
goto done;
}
ss->ss_fn = fn;
ss->ss_arg = arg;
ss->ss_next = es->es_subscription;
@ -208,34 +228,149 @@ stream_cb_add(clicon_handle h,
return retval;
}
/*! Delete event notification callback to a stream given a callback function
* Alt just send in an ss struct?
/*! Delete event notification callback to a stream given a callback and arg
* @param[in] h Clicon handle
* @param[in] stream Name of stream or NULL for all streams
* @param[in] fn Callback when event occurs
* @param[in] arg Argument to use with callback. Also handle when deleting
* @retval 0 OK
* @retval -1 Error
*/
int
stream_cb_delete(clicon_handle h,
char *stream,
stream_fn_t fn)
stream_fn_t fn,
void *arg)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
struct stream_subscription **ss_prev;
if ((es = stream_find(h, stream)) == NULL)
goto ok;
ss_prev = &es->es_subscription;
for (ss = *ss_prev; ss; ss = ss->ss_next){
if (ss->ss_fn == fn){
*ss_prev = ss->ss_next;
free(ss->ss_stream);
if (ss->ss_arg)
free(ss->ss_arg);
free(ss);
break;
struct stream_subscription *ss;
struct stream_subscription *ss_next;
for (es=clicon_stream(h); es; es=es->es_next){
if (stream && strcmp(stream, es->es_name)!=0)
continue;
ss_prev = &es->es_subscription;
for (ss = *ss_prev; ss; ss = ss_next){
ss_next = ss->ss_next;
if (fn == ss->ss_fn && arg == ss->ss_arg){
*ss_prev = ss->ss_next;
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
continue;
// break; if more > 1
}
ss_prev = &ss->ss_next;
}
ss_prev = &ss->ss_next;
}
ok:
retval = 0;
return retval;
}
/*! Stream notify event and distribute to all registered callbacks
* @param[in] h Clicon handle
* @param[in] stream Name of event stream. CLICON is predefined as LOG stream
* @param[in] event Notification as xml tree
* @retval 0 OK
* @retval -1 Error with clicon_err called
* @see stream_notify
*/
int
stream_notify_xml(clicon_handle h,
char *stream,
cxobj *xevent)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
if ((es = stream_find(h, stream)) == NULL)
goto ok;
/* Go thru all global (handle) subscriptions and find matches */
for (ss = es->es_subscription; ss; ss = ss->ss_next){
if (ss->ss_xpath == NULL ||
strlen(ss->ss_xpath)==0 ||
xpath_first(xevent, "%s", ss->ss_xpath) != NULL)
if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0)
goto done;
}
ok:
retval = 0;
done:
return retval;
}
/*! Stream notify event and distribute to all registered callbacks
* @param[in] h Clicon handle
* @param[in] stream Name of event stream. CLICON is predefined as LOG stream
* @param[in] event Notification as format string according to printf(3)
* @retval 0 OK
* @retval -1 Error with clicon_err called
* @code
* if (stream_notify(h, "NETCONF", "<event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0)
* err;
* @endcode
* @see stream_notify_xml
* @see backend_notify
*/
int
stream_notify(clicon_handle h,
char *stream,
const char *event, ...)
{
int retval = -1;
va_list args;
int len;
cxobj *xev = NULL;
yang_spec *yspec = NULL;
char *str = NULL;
cbuf *cb = NULL;
char timestr[27];
struct timeval tv;
va_start(args, event);
len = vsnprintf(NULL, 0, event, args) + 1;
va_end(args);
if ((str = malloc(len)) == NULL){
clicon_err(OE_UNIX, errno, "malloc");
goto done;
}
memset(str, 0, len);
va_start(args, event);
len = vsnprintf(str, len, event, args) + 1;
va_end(args);
if ((yspec = clicon_dbspec_yang(h)) == NULL){
clicon_err(OE_YANG, 0, "No yang spec");
goto done;
}
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_UNIX, errno, "cbuf_new");
goto done;
}
gettimeofday(&tv, NULL);
if (time2str(tv, timestr, sizeof(timestr)) < 0){
clicon_err(OE_UNIX, errno, "time2str");
goto done;
}
cprintf(cb, "<notification xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\"><eventTime>%s</eventTime>%s</notification>", timestr, str);
if (xml_parse_string(cbuf_get(cb), yspec, &xev) < 0)
goto done;
if (xml_rootchild(xev, 0, &xev) < 0)
goto done;
if (stream_notify_xml(h, stream, xev) < 0)
goto done;
retval = 0;
done:
if (cb)
cbuf_free(cb);
if (xev)
xml_free(xev);
if (str)
free(str);
return retval;
}