diff --git a/apps/backend/backend_client.c b/apps/backend/backend_client.c index 4a5fb622..b4c851af 100644 --- a/apps/backend/backend_client.c +++ b/apps/backend/backend_client.c @@ -107,7 +107,6 @@ ce_event_cb(clicon_handle h, backend_client_rm(h, ce); #endif } - } return 0; } @@ -845,14 +844,17 @@ from_client_create_subscription(clicon_handle h, if ((x = xpath_first(xe, "//stopTime")) != NULL){ stoptime = xml_find_value(x, "body"); if (str2time(stoptime, &stop) < 0){ - clicon_err(OE_PROTO, errno, "str2time"); - goto done; + if (netconf_bad_element(cbret, "application", "stopTime", "Expected timestamp") < 0) + goto done; + goto ok; } } if ((x = xpath_first(xe, "//startTime")) != NULL){ starttime = xml_find_value(x, "body"); if (str2time(starttime, &start) < 0){ - clicon_err(OE_PROTO, errno, "str2time"); + if (netconf_bad_element(cbret, "application", "startTime", "Expected timestamp") < 0) + goto done; + goto ok; goto done; } } diff --git a/apps/backend/backend_main.c b/apps/backend/backend_main.c index fcfa3570..c247d709 100644 --- a/apps/backend/backend_main.c +++ b/apps/backend/backend_main.c @@ -729,8 +729,12 @@ main(int argc, return -1; } - /* Publish stream on pubsub channels XXX conditional? */ - if (stream_publish_init() < 0) + /* Publish stream on pubsub channels. + * CLICON_STREAM_PUB should be set to URL to where streams are published + * and configure should be run with --enable-publish + */ + if (clicon_option_exists(h, "CLICON_STREAM_PUB") && + stream_publish_init() < 0) goto done; if ((xmldb_plugin = clicon_xmldb_plugin(h)) == NULL){ clicon_log(LOG_ERR, "No xmldb plugin given (specify option CLICON_XMLDB_PLUGIN).\n"); diff --git a/example/example_backend.c b/example/example_backend.c index d4415e34..16a8d6bb 100644 --- a/example/example_backend.c +++ b/example/example_backend.c @@ -305,7 +305,8 @@ clixon_plugin_init(clicon_handle h) goto done; /* assumes: CLIXON_PUBLISH_STREAMS, eg configure --enable-publish */ - if (stream_publish(h, "EXAMPLE") < 0) + if (clicon_option_exists(h, "CLICON_STREAM_PUB") && + stream_publish(h, "EXAMPLE") < 0) goto done; if (example_stream_timer_setup(h) < 0) goto done; diff --git a/lib/clixon/clixon_handle.h b/lib/clixon/clixon_handle.h index e0ebc07e..711e6cb8 100644 --- a/lib/clixon/clixon_handle.h +++ b/lib/clixon/clixon_handle.h @@ -77,6 +77,7 @@ clicon_hash_t *clicon_data(clicon_handle h); /* Return internal stream hash-array given a handle.*/ struct event_stream *clicon_stream(clicon_handle h); struct event_stream; +int clicon_stream_set(clicon_handle h, struct event_stream *es); int clicon_stream_append(clicon_handle h, struct event_stream *es); #endif /* _CLIXON_HANDLE_H_ */ diff --git a/lib/clixon/clixon_stream.h b/lib/clixon/clixon_stream.h index b4dbf42b..3ceb0dac 100644 --- a/lib/clixon/clixon_stream.h +++ b/lib/clixon/clixon_stream.h @@ -67,7 +67,7 @@ struct stream_replay{ /* See RFC8040 9.3, stream list, no replay support for now */ struct event_stream{ - struct event_stream *es_next; + qelem_t es_q; /* queue header */ char *es_name; /* name of notification event stream */ char *es_description; struct stream_subscription *es_subscription; @@ -81,7 +81,7 @@ typedef struct event_stream event_stream_t; */ event_stream_t *stream_find(clicon_handle h, const char *name); int stream_register(clicon_handle h, const char *name, const char *description, int replay_enabled); -int stream_delete_all(event_stream_t *es); +int stream_delete_all(clicon_handle h); int stream_get_xml(clicon_handle h, int access, cbuf *cb); int stream_timer_setup(int fd, void *arg); /* Subscriptions */ diff --git a/lib/src/clixon_handle.c b/lib/src/clixon_handle.c index 64d58628..fd30f0b7 100644 --- a/lib/src/clixon_handle.c +++ b/lib/src/clixon_handle.c @@ -135,7 +135,7 @@ clicon_handle_exit(clicon_handle h) hash_free(ha); if ((ha = clicon_data(h)) != NULL) hash_free(ha); - stream_delete_all(clicon_stream(h)); + stream_delete_all(h); free(ch); return 0; } @@ -187,14 +187,22 @@ clicon_stream(clicon_handle h) return ch->ch_stream; } +int +clicon_stream_set(clicon_handle h, + event_stream_t *es) +{ + struct clicon_handle *ch = handle(h); + + ch->ch_stream = es; + return 0; +} + int clicon_stream_append(clicon_handle h, event_stream_t *es) { struct clicon_handle *ch = handle(h); - event_stream_t **ep; - - for (ep = &ch->ch_stream; (*ep); ep=&(*ep)->es_next); - *ep = es; + + ADDQ(es, ch->ch_stream); return 0; } diff --git a/lib/src/clixon_stream.c b/lib/src/clixon_stream.c index 65d0d08a..d216076c 100644 --- a/lib/src/clixon_stream.c +++ b/lib/src/clixon_stream.c @@ -90,12 +90,17 @@ event_stream_t * stream_find(clicon_handle h, const char *name) { + event_stream_t *es0; event_stream_t *es = NULL; - for (es=clicon_stream(h); es; es=es->es_next) - if (strcmp(name, es->es_name)==0) - break; - return es; + es0 = clicon_stream(h); + if ((es = es0) != NULL) + do { + if (strcmp(name, es->es_name)==0) + return es; + es = NEXTQ(struct event_stream *, es); + } while (es && es != es0); + return NULL; } /*! Add notification event stream @@ -137,14 +142,16 @@ stream_register(clicon_handle h, * @param[in] es */ int -stream_delete_all(event_stream_t *es) +stream_delete_all(clicon_handle h) { - event_stream_t *e_next; struct stream_replay *r; struct stream_subscription *ss; + event_stream_t *es; + event_stream_t *head = clicon_stream(h); - while (es){ - e_next = es->es_next; + while ((es = clicon_stream(h)) != NULL){ + DELQ(es, head, event_stream_t *); + clicon_stream_set(h, head); if (es->es_name) free(es->es_name); if (es->es_description) @@ -158,7 +165,6 @@ stream_delete_all(event_stream_t *es) free(r); } free(es); - es = e_next; } return 0; } @@ -180,22 +186,25 @@ stream_get_xml(clicon_handle h, char *stream_path; cprintf(cb, ""); - for (es=clicon_stream(h); es; es=es->es_next){ - cprintf(cb, ""); - cprintf(cb, "%s", es->es_name); - if (es->es_description) - cprintf(cb, "%s", es->es_description); - cprintf(cb, "false"); - if (access){ - cprintf(cb, ""); - cprintf(cb, "xml"); - url_prefix = clicon_option_str(h, "CLICON_STREAM_URL"); - stream_path = clicon_option_str(h, "CLICON_STREAM_PATH"); - cprintf(cb, "%s/%s/%s", - url_prefix, stream_path, es->es_name); - cprintf(cb, ""); - } - cprintf(cb, ""); + if ((es = clicon_stream(h)) != NULL){ + do { + cprintf(cb, ""); + cprintf(cb, "%s", es->es_name); + if (es->es_description) + cprintf(cb, "%s", es->es_description); + cprintf(cb, "false"); + if (access){ + cprintf(cb, ""); + cprintf(cb, "xml"); + url_prefix = clicon_option_str(h, "CLICON_STREAM_URL"); + stream_path = clicon_option_str(h, "CLICON_STREAM_PATH"); + cprintf(cb, "%s/%s/%s", + url_prefix, stream_path, es->es_name); + cprintf(cb, ""); + } + cprintf(cb, ""); + es = NEXTQ(struct event_stream *, es); + } while (es && es != clicon_stream(h)); } cprintf(cb, ""); return 0; @@ -224,18 +233,24 @@ stream_timer_setup(int fd, */ gettimeofday(&now, NULL); /* for all event streams, remove subscription if past stop time */ - for (es=clicon_stream(h); es; es=es->es_next){ - if ((ss = es->es_subscription) != NULL) - do { - if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){ - ss1 = NEXTQ(struct stream_subscription *, ss); - if (stream_ss_rm(es, ss) < 0) - goto done; - ss = ss1; - } - else - ss = NEXTQ(struct stream_subscription *, ss); - } while (ss && ss != es->es_subscription); + + + + if ((es = clicon_stream(h)) != NULL){ + do { + if ((ss = es->es_subscription) != NULL) + do { + if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){ + ss1 = NEXTQ(struct stream_subscription *, ss); + if (stream_ss_rm(es, ss) < 0) + goto done; + ss = ss1; + } + else + ss = NEXTQ(struct stream_subscription *, ss); + } while (ss && ss != es->es_subscription); + es = NEXTQ(struct event_stream *, es); + } while (es && es != clicon_stream(h)); } /* Initiate new timer */ timeradd(&now, &t1, &t); @@ -374,10 +389,14 @@ stream_ss_delete_all(clicon_handle h, event_stream_t *es; struct stream_subscription *ss; - for (es=clicon_stream(h); es; es=es->es_next) - if ((ss = stream_ss_find(es, fn, arg)) != NULL) - if (stream_ss_rm(es, ss) < 0) - goto done; + if ((es = clicon_stream(h)) != NULL){ + do { + if ((ss = stream_ss_find(es, fn, arg)) != NULL) + if (stream_ss_rm(es, ss) < 0) + goto done; + es = NEXTQ(struct event_stream *, es); + } while (es && es != clicon_stream(h)); + } retval = 0; done: return retval; diff --git a/test/test_stream.sh b/test/test_stream.sh index c06afaf4..9f7ab600 100755 --- a/test/test_stream.sh +++ b/test/test_stream.sh @@ -10,10 +10,9 @@ APPNAME=example # include err() and new() functions and creates $dir . ./lib.sh cfg=$dir/conf.xml -fyang=$dir/restconf.yang +fyang=$dir/stream.yang xml=$dir/xml.xml - # example cat < $cfg @@ -34,7 +33,6 @@ cat < $cfg true streams https://localhost - http://localhost/pub EOF @@ -98,8 +96,6 @@ sudo pkill -u www-data clixon_restconf new "start restconf daemon" sudo start-stop-daemon -S -q -o -b -x /www-data/clixon_restconf -d /www-data -c www-data -- -f $cfg -y $fyang # -D 1 -sleep 1 - new "netconf event stream discovery RFC5277 Sec 3.2.5" expecteof "$clixon_netconf -qf $cfg -y $fyang" 0 ']]>]]>' 'EXAMPLEExample event streamfalse]]>]]>' @@ -153,8 +149,13 @@ new "netconf NONEXIST subscription" expectwait "$clixon_netconf -qf $cfg -y $fyang" 'NONEXIST]]>]]>' '^invalid-valueapplicationerrorNo such stream]]>]]>$' 5 fi -#new "netconf EXAMPLE subscription with replay" -#expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE2018-10-21T19:22:162018-10-21T19:25:00]]>]]>' '^]]>]]>20' 5 +new "netconf EXAMPLE subscription with wrong date" +expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLEkallekaka]]>]]>' '^bad-elementapplicationstartTimeerrorExpected timestamp]]>]]>$' 0 + +new "netconf EXAMPLE subscription with replay" +NOW=$(date +"%Y-%m-%dT%H:%M:%S") +sleep 10 +expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE$NOW]]>]]>" '^]]>]]>20' 10 new "Kill restconf daemon" sudo pkill -u www-data clixon_restconf diff --git a/yang/clixon-config@2018-10-21.yang b/yang/clixon-config@2018-10-21.yang index 31718ab2..6ee83d29 100644 --- a/yang/clixon-config@2018-10-21.yang +++ b/yang/clixon-config@2018-10-21.yang @@ -411,11 +411,12 @@ module clixon-config { } leaf CLICON_STREAM_PUB { type string; - default "http://localhost/pub"; description "For stream publish using eg nchan, the base address - to publish to. - Example: http://localhost/pub/NETCONF. Note this may - be local URL behind reverse-proxy"; + to publish to. Example value: http://localhost/pub + Example: stream NETCONF would then be pushed to + http://localhost/pub/NETCONF. + Note this may be a local/provate URL behind reverse-proxy. + If not given, do NOT enable stream publishing using NCHAN."; } } }