From 270bf78e1f552b8a20f4c272bed54d868bf08b5e Mon Sep 17 00:00:00 2001 From: Olof Hagsand Date: Sun, 4 Nov 2018 16:56:56 +0100 Subject: [PATCH] Backward compatible testing w streams. Nchan manual tests. --- CHANGELOG.md | 6 ++- apps/restconf/README.md | 23 +++++--- example/example_backend.c | 3 +- lib/clixon/clixon_stream.h | 3 +- lib/src/clixon_stream.c | 104 ++++++++++++++++++++++++++++++++++--- test/test_stream.sh | 23 ++++---- 6 files changed, 134 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ace3179b..5c99596d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,9 +46,11 @@ * Major rewrite of event streams (as described above) * If you used old event callbacks API, you need to switch to the streams API * See clixon_stream.[ch] - * Old streams API which needs to be removed include: - * clicon_log_register_callback() + * Old streams API which needs to be modified: + * clicon_log_register_callback() removed * subscription_add() --> stream_add() + * stream_cb_add() --> stream_ss_add() + * stream_cb_delete() --> stream_ss_delete() * backend_notify() and backend_notify_xml() - use stream_notify() instead * Example uses "NETCONF" stream instead of "ROUTING" * clixon_restconf and clixon_netconf changed to take -D `` as command-line option instead of just -D (without debig level) diff --git a/apps/restconf/README.md b/apps/restconf/README.md index 4db81e72..972ab99f 100644 --- a/apps/restconf/README.md +++ b/apps/restconf/README.md @@ -132,16 +132,17 @@ Nginx uses pub/sub channels and can be configured in a variety of ways. The following uses a simple variant with one generic subscription channel (streams) and one publication channel (pub). -The advantage with Nchan is the large eco-system of +The advantage with Nchan is the large eco-system around Nginx and Nchan. -Native mode and Nchan mode can co-exist? -Nchan mode does not use Clixon retention, +Native mode and Nchan mode can co-exist, but the publish URL of Nchan should be different from the streams URL of the native streams. + +Nchan mode does not use Clixon retention, since it uses its own replay mechanism. Download and install nchan, see (https://nchan.io/#install). Add the following to extend the Nginx configuration file with the following statements (example): ``` - location ~ /streams/(\w+)$ { + location ~ /sub/(\w+)$ { nchan_subscriber; nchan_channel_id $1; #first capture of the location match } @@ -163,9 +164,19 @@ Clicon will then publish events from stream EXAMPLE to `http://localhost/pub/EXA Access the event stream EXAMPLE using curl: ``` curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE - curl -H "Accept: text/event-stream" -H "Last-Event-ID: 1539961709:0" -s -X GET http://localhost/streams/EXAMPLE +: hi + +id: 1541344320:0 +data: 2018-11-04T15:12:00.435769faultEthernet0major + +id: 1541344325:0 +data: 2018-11-04T15:12:05.446425faultEthernet0major + +``` +Note that the SSE stream output is different than for native streams, and that `Last-Event-Id` is used for replay: +``` +curl -H "Accept: text/event-stream" -H "Last-Event-ID: 1539961709:0" -s -X GET http://localhost/streams/EXAMPLE ``` -where the first command retrieves the whole stream history, and the second only retreives the most recent messages given by the ID. See (https://nchan.io/#eventsource) on more info on how to access an SSE sub endpoint. diff --git a/example/example_backend.c b/example/example_backend.c index 3feb0409..3a420a0e 100644 --- a/example/example_backend.c +++ b/example/example_backend.c @@ -306,7 +306,8 @@ clixon_plugin_init(clicon_handle h) retention.tv_sec = clicon_option_int(h, "CLICON_STREAM_RETENTION"); if (stream_add(h, "EXAMPLE", "Example event stream", 1, &retention) < 0) goto done; - /* assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish + /* Enable nchan pub/sub streams + * assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish */ if (clicon_option_exists(h, "CLICON_STREAM_PUB") && stream_publish(h, "EXAMPLE") < 0) diff --git a/lib/clixon/clixon_stream.h b/lib/clixon/clixon_stream.h index 19d6d65c..a3a1457f 100644 --- a/lib/clixon/clixon_stream.h +++ b/lib/clixon/clixon_stream.h @@ -95,8 +95,9 @@ int stream_ss_rm(clicon_handle h, event_stream_t *es, struct stream_subscription struct stream_subscription *stream_ss_find(event_stream_t *es, stream_fn_t fn, void *arg); int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg); +int stream_ss_delete(clicon_handle h, char *name, stream_fn_t fn, void *arg); -int stream_notify_xml(clicon_handle h, event_stream_t *es, struct timeval *tv, cxobj *xevent); +int stream_notify_xml(clicon_handle h, char *stream, cxobj *xml); #if defined(__GNUC__) && __GNUC__ >= 3 int stream_notify(clicon_handle h, char *stream, const char *event, ...) __attribute__ ((format (printf, 3, 4))); #else diff --git a/lib/src/clixon_stream.c b/lib/src/clixon_stream.c index 3fc8c914..b5e8dd52 100644 --- a/lib/src/clixon_stream.c +++ b/lib/src/clixon_stream.c @@ -420,9 +420,9 @@ stream_ss_find(event_stream_t *es, /*! Remove stream subscription identified with fn and arg in all streams * @param[in] h Clicon handle - * @param[in] stream Name of stream or NULL for all streams * @param[in] fn Stream callback * @param[in] arg Argument - typically unique client handle + * @see stream_ss_delete For single stream */ int stream_ss_delete_all(clicon_handle h, @@ -447,6 +447,34 @@ stream_ss_delete_all(clicon_handle h, return retval; } +/*! Delete a single stream + * @see stream_ss_delete_all (merge with this?) + */ +int +stream_ss_delete(clicon_handle h, + char *name, + stream_fn_t fn, + void *arg) +{ + int retval = -1; + event_stream_t *es; + struct stream_subscription *ss; + + if ((es = clicon_stream(h)) != NULL){ + do { + if (strcmp(name, es->es_name)==0) + if ((ss = stream_ss_find(es, fn, arg)) != NULL){ + if (stream_ss_rm(h, es, ss) < 0) + goto done; + } + es = NEXTQ(struct event_stream *, es); + } while (es && es != clicon_stream(h)); + } + retval = 0; + done: + return retval; +} + /*! Stream notify event and distribute to all registered callbacks * @param[in] h Clicon handle * @param[in] stream Name of event stream. CLICON is predefined as LOG stream @@ -457,11 +485,11 @@ stream_ss_delete_all(clicon_handle h, * @see stream_notify * @see stream_ss_timeout where subscriptions are removed if stoptimefaultEthernet0major") < 0) * err; * @endcode - * @see stream_notify_xml + * @see stream_notify1 Internal */ int stream_notify(clicon_handle h, @@ -553,7 +581,7 @@ stream_notify(clicon_handle h, goto done; if (xml_rootchild(xev, 0, &xev) < 0) goto done; - if (stream_notify_xml(h, es, &tv, xev) < 0) + if (stream_notify1(h, es, &tv, xev) < 0) goto done; if (es->es_replay_enabled){ if (stream_replay_add(es, &tv, xev) < 0) @@ -572,6 +600,66 @@ stream_notify(clicon_handle h, return retval; } +/*! Backward compatible function + * @see stream_notify Should be merged with this + */ +int +stream_notify_xml(clicon_handle h, + char *stream, + cxobj *xml) +{ + int retval = -1; + cxobj *xev = NULL; + yang_spec *yspec = NULL; + char *str = NULL; + cbuf *cb = NULL; + char timestr[27]; + struct timeval tv; + event_stream_t *es; + + clicon_debug(2, "%s", __FUNCTION__); + if ((es = stream_find(h, stream)) == NULL) + goto ok; + if ((yspec = clicon_dbspec_yang(h)) == NULL){ + clicon_err(OE_YANG, 0, "No yang spec"); + goto done; + } + if ((cb = cbuf_new()) == NULL){ + clicon_err(OE_UNIX, errno, "cbuf_new"); + goto done; + } + gettimeofday(&tv, NULL); + if (time2str(tv, timestr, sizeof(timestr)) < 0){ + clicon_err(OE_UNIX, errno, "time2str"); + goto done; + } + cprintf(cb, "%s%s", timestr, str); + if (xml_parse_string(cbuf_get(cb), yspec, &xev) < 0) + goto done; + if (xml_rootchild(xev, 0, &xev) < 0) + goto done; + if (xml_addsub(xev, xml) < 0) + goto done; + if (stream_notify1(h, es, &tv, xev) < 0) + goto done; + if (es->es_replay_enabled){ + if (stream_replay_add(es, &tv, xev) < 0) + goto done; + xev = NULL; /* xml stored in replay_add and should not be freed */ + } + ok: + retval = 0; + done: + if (cb) + cbuf_free(cb); + if (xev) + xml_free(xev); + if (str) + free(str); + return retval; +} + + /*! Replay a stream by sending notification messages * @see RFC5277 Sec 2.1.1: * Start Time: diff --git a/test/test_stream.sh b/test/test_stream.sh index 7529911e..94becf7e 100755 --- a/test/test_stream.sh +++ b/test/test_stream.sh @@ -22,6 +22,10 @@ APPNAME=example UTIL=../util/clixon_util_stream +if [ ! -x $UTIL ]; then + echo "$UTIL not found. To build: (cd ../util; make clixon_util_stream)" + exit 1 +fi DATE=$(date +"%Y-%m-%d") # include err() and new() functions and creates $dir . ./lib.sh @@ -50,6 +54,7 @@ cat < $cfg streams https://localhost 60 + http://localhost/pub EOF @@ -253,6 +258,11 @@ if [ $nr -lt 1 -o $nr -gt 2 ]; then err 2 "$nr" fi +#-------------------------------------------------------------------- +# NCHAN Need manual testing +echo "Nchan streams requires manual testing" +echo "Eg: curl -H \"Accept: text/event-stream\" -s -X GET http://localhost/sub/EXAMPLE" + #----------------- sudo pkill -u www-data clixon_restconf @@ -272,16 +282,9 @@ fi rm -rf $dir -exit # DONT REMOVE MANUAL TESTING - -#-------------------------------------------------------------------- -# NCHAN Need manual testing -new "restconf monitor streams nchan NEEDS manual testing" -if false; then - # url -H "Accept: text/event-stream" http://localhost/streams/EXAMPLE - # Expect: - echo foo -fi + + +