Backward compatible testing w streams. Nchan manual tests.

This commit is contained in:
Olof Hagsand 2018-11-04 16:56:56 +01:00
parent fdfbbcdb9e
commit 270bf78e1f
6 changed files with 134 additions and 28 deletions

View file

@ -46,9 +46,11 @@
* Major rewrite of event streams (as described above) * Major rewrite of event streams (as described above)
* If you used old event callbacks API, you need to switch to the streams API * If you used old event callbacks API, you need to switch to the streams API
* See clixon_stream.[ch] * See clixon_stream.[ch]
* Old streams API which needs to be removed include: * Old streams API which needs to be modified:
* clicon_log_register_callback() * clicon_log_register_callback() removed
* subscription_add() --> stream_add() * subscription_add() --> stream_add()
* stream_cb_add() --> stream_ss_add()
* stream_cb_delete() --> stream_ss_delete()
* backend_notify() and backend_notify_xml() - use stream_notify() instead * backend_notify() and backend_notify_xml() - use stream_notify() instead
* Example uses "NETCONF" stream instead of "ROUTING" * Example uses "NETCONF" stream instead of "ROUTING"
* clixon_restconf and clixon_netconf changed to take -D `<level>` as command-line option instead of just -D (without debig level) * clixon_restconf and clixon_netconf changed to take -D `<level>` as command-line option instead of just -D (without debig level)

View file

@ -132,16 +132,17 @@ Nginx uses pub/sub channels and can be configured in a variety of
ways. The following uses a simple variant with one generic subscription ways. The following uses a simple variant with one generic subscription
channel (streams) and one publication channel (pub). channel (streams) and one publication channel (pub).
The advantage with Nchan is the large eco-system of The advantage with Nchan is the large eco-system around Nginx and Nchan.
Native mode and Nchan mode can co-exist? Native mode and Nchan mode can co-exist, but the publish URL of Nchan should be different from the streams URL of the native streams.
Nchan mode does not use Clixon retention,
Nchan mode does not use Clixon retention, since it uses its own replay mechanism.
Download and install nchan, see (https://nchan.io/#install). Download and install nchan, see (https://nchan.io/#install).
Add the following to extend the Nginx configuration file with the following statements (example): Add the following to extend the Nginx configuration file with the following statements (example):
``` ```
location ~ /streams/(\w+)$ { location ~ /sub/(\w+)$ {
nchan_subscriber; nchan_subscriber;
nchan_channel_id $1; #first capture of the location match nchan_channel_id $1; #first capture of the location match
} }
@ -163,9 +164,19 @@ Clicon will then publish events from stream EXAMPLE to `http://localhost/pub/EXA
Access the event stream EXAMPLE using curl: Access the event stream EXAMPLE using curl:
``` ```
curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE
curl -H "Accept: text/event-stream" -H "Last-Event-ID: 1539961709:0" -s -X GET http://localhost/streams/EXAMPLE : hi
id: 1541344320:0
data: <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0"><eventTime>2018-11-04T15:12:00.435769</eventTime><event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event></notification>
id: 1541344325:0
data: <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0"><eventTime>2018-11-04T15:12:05.446425</eventTime><event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event></notification>
```
Note that the SSE stream output is different than for native streams, and that `Last-Event-Id` is used for replay:
```
curl -H "Accept: text/event-stream" -H "Last-Event-ID: 1539961709:0" -s -X GET http://localhost/streams/EXAMPLE
``` ```
where the first command retrieves the whole stream history, and the second only retreives the most recent messages given by the ID.
See (https://nchan.io/#eventsource) on more info on how to access an SSE sub endpoint. See (https://nchan.io/#eventsource) on more info on how to access an SSE sub endpoint.

View file

@ -306,7 +306,8 @@ clixon_plugin_init(clicon_handle h)
retention.tv_sec = clicon_option_int(h, "CLICON_STREAM_RETENTION"); retention.tv_sec = clicon_option_int(h, "CLICON_STREAM_RETENTION");
if (stream_add(h, "EXAMPLE", "Example event stream", 1, &retention) < 0) if (stream_add(h, "EXAMPLE", "Example event stream", 1, &retention) < 0)
goto done; goto done;
/* assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish /* Enable nchan pub/sub streams
* assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish
*/ */
if (clicon_option_exists(h, "CLICON_STREAM_PUB") && if (clicon_option_exists(h, "CLICON_STREAM_PUB") &&
stream_publish(h, "EXAMPLE") < 0) stream_publish(h, "EXAMPLE") < 0)

View file

@ -95,8 +95,9 @@ int stream_ss_rm(clicon_handle h, event_stream_t *es, struct stream_subscription
struct stream_subscription *stream_ss_find(event_stream_t *es, struct stream_subscription *stream_ss_find(event_stream_t *es,
stream_fn_t fn, void *arg); stream_fn_t fn, void *arg);
int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg); int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg);
int stream_ss_delete(clicon_handle h, char *name, stream_fn_t fn, void *arg);
int stream_notify_xml(clicon_handle h, event_stream_t *es, struct timeval *tv, cxobj *xevent); int stream_notify_xml(clicon_handle h, char *stream, cxobj *xml);
#if defined(__GNUC__) && __GNUC__ >= 3 #if defined(__GNUC__) && __GNUC__ >= 3
int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4))); int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4)));
#else #else

View file

@ -420,9 +420,9 @@ stream_ss_find(event_stream_t *es,
/*! Remove stream subscription identified with fn and arg in all streams /*! Remove stream subscription identified with fn and arg in all streams
* @param[in] h Clicon handle * @param[in] h Clicon handle
* @param[in] stream Name of stream or NULL for all streams
* @param[in] fn Stream callback * @param[in] fn Stream callback
* @param[in] arg Argument - typically unique client handle * @param[in] arg Argument - typically unique client handle
* @see stream_ss_delete For single stream
*/ */
int int
stream_ss_delete_all(clicon_handle h, stream_ss_delete_all(clicon_handle h,
@ -447,6 +447,34 @@ stream_ss_delete_all(clicon_handle h,
return retval; return retval;
} }
/*! Delete a single stream
* @see stream_ss_delete_all (merge with this?)
*/
int
stream_ss_delete(clicon_handle h,
char *name,
stream_fn_t fn,
void *arg)
{
int retval = -1;
event_stream_t *es;
struct stream_subscription *ss;
if ((es = clicon_stream(h)) != NULL){
do {
if (strcmp(name, es->es_name)==0)
if ((ss = stream_ss_find(es, fn, arg)) != NULL){
if (stream_ss_rm(h, es, ss) < 0)
goto done;
}
es = NEXTQ(struct event_stream *, es);
} while (es && es != clicon_stream(h));
}
retval = 0;
done:
return retval;
}
/*! Stream notify event and distribute to all registered callbacks /*! Stream notify event and distribute to all registered callbacks
* @param[in] h Clicon handle * @param[in] h Clicon handle
* @param[in] stream Name of event stream. CLICON is predefined as LOG stream * @param[in] stream Name of event stream. CLICON is predefined as LOG stream
@ -457,11 +485,11 @@ stream_ss_delete_all(clicon_handle h,
* @see stream_notify * @see stream_notify
* @see stream_ss_timeout where subscriptions are removed if stoptime<now * @see stream_ss_timeout where subscriptions are removed if stoptime<now
*/ */
int static int
stream_notify_xml(clicon_handle h, stream_notify1(clicon_handle h,
event_stream_t *es, event_stream_t *es,
struct timeval *tv, struct timeval *tv,
cxobj *xevent) cxobj *xevent)
{ {
int retval = -1; int retval = -1;
struct stream_subscription *ss; struct stream_subscription *ss;
@ -503,7 +531,7 @@ stream_notify_xml(clicon_handle h,
* if (stream_notify(h, "NETCONF", "<event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0) * if (stream_notify(h, "NETCONF", "<event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0)
* err; * err;
* @endcode * @endcode
* @see stream_notify_xml * @see stream_notify1 Internal
*/ */
int int
stream_notify(clicon_handle h, stream_notify(clicon_handle h,
@ -553,7 +581,7 @@ stream_notify(clicon_handle h,
goto done; goto done;
if (xml_rootchild(xev, 0, &xev) < 0) if (xml_rootchild(xev, 0, &xev) < 0)
goto done; goto done;
if (stream_notify_xml(h, es, &tv, xev) < 0) if (stream_notify1(h, es, &tv, xev) < 0)
goto done; goto done;
if (es->es_replay_enabled){ if (es->es_replay_enabled){
if (stream_replay_add(es, &tv, xev) < 0) if (stream_replay_add(es, &tv, xev) < 0)
@ -572,6 +600,66 @@ stream_notify(clicon_handle h,
return retval; return retval;
} }
/*! Backward compatible function
* @see stream_notify Should be merged with this
*/
int
stream_notify_xml(clicon_handle h,
char *stream,
cxobj *xml)
{
int retval = -1;
cxobj *xev = NULL;
yang_spec *yspec = NULL;
char *str = NULL;
cbuf *cb = NULL;
char timestr[27];
struct timeval tv;
event_stream_t *es;
clicon_debug(2, "%s", __FUNCTION__);
if ((es = stream_find(h, stream)) == NULL)
goto ok;
if ((yspec = clicon_dbspec_yang(h)) == NULL){
clicon_err(OE_YANG, 0, "No yang spec");
goto done;
}
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_UNIX, errno, "cbuf_new");
goto done;
}
gettimeofday(&tv, NULL);
if (time2str(tv, timestr, sizeof(timestr)) < 0){
clicon_err(OE_UNIX, errno, "time2str");
goto done;
}
cprintf(cb, "<notification xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\"><eventTime>%s</eventTime>%s</notification>", timestr, str);
if (xml_parse_string(cbuf_get(cb), yspec, &xev) < 0)
goto done;
if (xml_rootchild(xev, 0, &xev) < 0)
goto done;
if (xml_addsub(xev, xml) < 0)
goto done;
if (stream_notify1(h, es, &tv, xev) < 0)
goto done;
if (es->es_replay_enabled){
if (stream_replay_add(es, &tv, xev) < 0)
goto done;
xev = NULL; /* xml stored in replay_add and should not be freed */
}
ok:
retval = 0;
done:
if (cb)
cbuf_free(cb);
if (xev)
xml_free(xev);
if (str)
free(str);
return retval;
}
/*! Replay a stream by sending notification messages /*! Replay a stream by sending notification messages
* @see RFC5277 Sec 2.1.1: * @see RFC5277 Sec 2.1.1:
* Start Time: * Start Time:

View file

@ -22,6 +22,10 @@
APPNAME=example APPNAME=example
UTIL=../util/clixon_util_stream UTIL=../util/clixon_util_stream
if [ ! -x $UTIL ]; then
echo "$UTIL not found. To build: (cd ../util; make clixon_util_stream)"
exit 1
fi
DATE=$(date +"%Y-%m-%d") DATE=$(date +"%Y-%m-%d")
# include err() and new() functions and creates $dir # include err() and new() functions and creates $dir
. ./lib.sh . ./lib.sh
@ -50,6 +54,7 @@ cat <<EOF > $cfg
<CLICON_STREAM_PATH>streams</CLICON_STREAM_PATH> <CLICON_STREAM_PATH>streams</CLICON_STREAM_PATH>
<CLICON_STREAM_URL>https://localhost</CLICON_STREAM_URL> <CLICON_STREAM_URL>https://localhost</CLICON_STREAM_URL>
<CLICON_STREAM_RETENTION>60</CLICON_STREAM_RETENTION> <CLICON_STREAM_RETENTION>60</CLICON_STREAM_RETENTION>
<CLICON_STREAM_PUB>http://localhost/pub</CLICON_STREAM_PUB>
</config> </config>
EOF EOF
@ -253,6 +258,11 @@ if [ $nr -lt 1 -o $nr -gt 2 ]; then
err 2 "$nr" err 2 "$nr"
fi fi
#--------------------------------------------------------------------
# NCHAN Need manual testing
echo "Nchan streams requires manual testing"
echo "Eg: curl -H \"Accept: text/event-stream\" -s -X GET http://localhost/sub/EXAMPLE"
#----------------- #-----------------
sudo pkill -u www-data clixon_restconf sudo pkill -u www-data clixon_restconf
@ -272,16 +282,9 @@ fi
rm -rf $dir rm -rf $dir
exit # DONT REMOVE MANUAL TESTING
#--------------------------------------------------------------------
# NCHAN Need manual testing
new "restconf monitor streams nchan NEEDS manual testing"
if false; then
# url -H "Accept: text/event-stream" http://localhost/streams/EXAMPLE
# Expect:
echo foo
fi