diff --git a/apps/backend/backend_client.c b/apps/backend/backend_client.c index b4c851af..4f69de8d 100644 --- a/apps/backend/backend_client.c +++ b/apps/backend/backend_client.c @@ -83,29 +83,32 @@ ce_find_bypid(struct client_entry *ce_list, /*! Stream callback for netconf stream notification (RFC 5277) * @param[in] h Clicon handle + * @param[in] op 0:event, 1:rm * @param[in] event Event as XML * @param[in] arg Extra argument provided in stream_ss_add * @see stream_ss_add */ static int ce_event_cb(clicon_handle h, + int op, cxobj *event, void *arg) { struct client_entry *ce = (struct client_entry *)arg; - if (send_msg_notify_xml(ce->ce_s, event) < 0){ - if (errno == ECONNRESET || errno == EPIPE){ - clicon_log(LOG_WARNING, "client %d reset", ce->ce_nr); -#if 0 - /* We should remove here but removal is not possible - from a client since backend_client is not linked. - Maybe we should add it to the plugin, but it feels - "safe" that you cant remove a client. - Instead, the client is (hopefully) removed elsewhere? - */ + clicon_debug(1, "%s op:%d", __FUNCTION__, op); + switch (op){ + case 1: + /* Risk of recursion here */ + if (ce->ce_s) backend_client_rm(h, ce); -#endif + break; + default: + if (send_msg_notify_xml(ce->ce_s, event) < 0){ + if (errno == ECONNRESET || errno == EPIPE){ + clicon_log(LOG_WARNING, "client %d reset", ce->ce_nr); + } + break; } } return 0; @@ -126,6 +129,7 @@ backend_client_rm(clicon_handle h, struct client_entry *c0; struct client_entry **ce_prev; + clicon_debug(1, "%s", __FUNCTION__); c0 = backend_client_list(h); ce_prev = &c0; /* this points to stack and is not real backpointer */ for (c = *ce_prev; c; c = c->ce_next){ @@ -842,20 +846,19 @@ from_client_create_subscription(clicon_handle h, if ((x = xpath_first(xe, "//stream")) != NULL) stream = xml_find_value(x, "body"); if ((x = xpath_first(xe, "//stopTime")) != NULL){ - stoptime = xml_find_value(x, "body"); - if (str2time(stoptime, &stop) < 0){ + if ((stoptime = xml_find_value(x, "body")) != NULL && + str2time(stoptime, &stop) < 0){ if (netconf_bad_element(cbret, "application", "stopTime", "Expected timestamp") < 0) goto done; goto ok; } } if ((x = xpath_first(xe, "//startTime")) != NULL){ - starttime = xml_find_value(x, "body"); - if (str2time(starttime, &start) < 0){ + if ((starttime = xml_find_value(x, "body")) != NULL && + str2time(starttime, &start) < 0){ if (netconf_bad_element(cbret, "application", "startTime", "Expected timestamp") < 0) goto done; goto ok; - goto done; } } if ((xfilter = xpath_first(xe, "//filter")) != NULL){ diff --git a/apps/backend/backend_main.c b/apps/backend/backend_main.c index c247d709..780381a4 100644 --- a/apps/backend/backend_main.c +++ b/apps/backend/backend_main.c @@ -133,7 +133,7 @@ usage(clicon_handle h, char *confpid = clicon_backend_pidfile(h); char *group = clicon_sock_group(h); - fprintf(stderr, "usage:%s\n" + fprintf(stderr, "usage:%s *\n" "where options are\n" "\t-h\t\tHelp\n" "\t-D \tDebug level\n" @@ -860,6 +860,8 @@ main(int argc, if (debug) clicon_option_dump(h, debug); + if (stream_timer_setup(0, h) < 0) + goto done; if (event_loop() < 0) goto done; retval = 0; diff --git a/apps/restconf/README.md b/apps/restconf/README.md index 14c2dcf4..2bf8c224 100644 --- a/apps/restconf/README.md +++ b/apps/restconf/README.md @@ -90,8 +90,6 @@ where You access the streams using curl, but they differ slightly in behaviour as described in the following two sections. -### Native event streams - Add the following to extend the nginx configuration file with the following statements: ``` location /streams { @@ -111,11 +109,13 @@ where the first command retrieves only new notifications, and the second receive ### Nginx Nchan streams +As an alternative, Nginx/Nchan can be used for streams. Nginx uses pub/sub channels and can be configured in a variety of ways. The following uses a simple variant with one generic subscription channel (streams) and one publication channel (pub). Configure clixon with `--enable-publish` which enables curl code for publishing streams to nchan. +Set configure option CLICON_STREAM_PUB to, for example, http://localhost/pub to enable pushing notifications to nchan. Download and install nchan, see (https://nchan.io/#install). diff --git a/apps/restconf/restconf_stream.c b/apps/restconf/restconf_stream.c index 04edd129..fa9b1ec3 100644 --- a/apps/restconf/restconf_stream.c +++ b/apps/restconf/restconf_stream.c @@ -108,14 +108,12 @@ restconf_stream_cb(int s, clicon_debug(1, "%s msg_rcv error", __FUNCTION__); goto done; } - clicon_debug(1, "%s msg: %s", __FUNCTION__, reply->op_body); + clicon_debug(1, "%s msg: %s", __FUNCTION__, reply?reply->op_body:"null"); /* handle close from remote end: this will exit the client */ if (eof){ clicon_debug(1, "%s eof", __FUNCTION__); clicon_err(OE_PROTO, ESHUTDOWN, "Socket unexpected close"); - close(s); errno = ESHUTDOWN; - event_unreg_fd(s, restconf_stream_cb); FCGX_FPrintF(r->out, "SHUTDOWN\r\n"); FCGX_FPrintF(r->out, "\r\n"); FCGX_FFlush(r->out); @@ -148,7 +146,6 @@ restconf_stream_cb(int s, FCGX_FPrintF(r->out, "data: %s\r\n", cbuf_get(cb)); FCGX_FPrintF(r->out, "\r\n"); FCGX_FFlush(r->out); - ok: retval = 0; done: @@ -186,8 +183,8 @@ restconf_stream(clicon_handle h, cg_var *cv; char *vname; - *sp = -1; clicon_debug(1, "%s", __FUNCTION__); + *sp = -1; if ((cb = cbuf_new()) == NULL){ clicon_err(OE_XML, errno, "cbuf_new"); goto done; @@ -228,6 +225,7 @@ restconf_stream(clicon_handle h, ok: retval = 0; done: + clicon_debug(1, "%s retval: %d", __FUNCTION__, retval); if (xret) xml_free(xret); if (cb) @@ -239,6 +237,19 @@ restconf_stream(clicon_handle h, #include "restconf_lib.h" #include "restconf_stream.h" +static int +stream_checkuplink(int s, + void *arg) +{ + FCGX_Request *r = (FCGX_Request *)arg; + clicon_debug(1, "%s", __FUNCTION__); + if (FCGX_GetError(r->out) != 0){ /* break loop */ + clicon_debug(1, "%s FCGX_GetError upstream", __FUNCTION__); + clicon_exit_set(); + } + return 0; +} + int stream_timeout(int s, void *arg) @@ -248,8 +259,10 @@ stream_timeout(int s, FCGX_Request *r = (FCGX_Request *)arg; clicon_debug(1, "%s", __FUNCTION__); - if (FCGX_GetError(r->out) != 0) /* break loop */ + if (FCGX_GetError(r->out) != 0){ /* break loop */ + clicon_debug(1, "%s FCGX_GetError upstream", __FUNCTION__); clicon_exit_set(); + } else{ gettimeofday(&t, NULL); t1.tv_sec = 1; t1.tv_usec = 0; @@ -355,10 +368,16 @@ api_stream(clicon_handle h, (void*)r, "stream socket") < 0) goto done; + if (event_reg_fd(r->listen_sock, + stream_checkuplink, + (void*)r, + "stream socket") < 0) + goto done; /* Poll upstream errors */ stream_timeout(0, (void*)r); /* Start loop */ event_loop(); + close(s); event_unreg_fd(s, restconf_stream_cb); clicon_exit_reset(); } diff --git a/lib/clixon/clixon_stream.h b/lib/clixon/clixon_stream.h index 98be4fb4..19d6d65c 100644 --- a/lib/clixon/clixon_stream.h +++ b/lib/clixon/clixon_stream.h @@ -41,11 +41,12 @@ */ /* Subscription callback * @param[in] h Clicon handle + * @param[in] op Operation: 0 OK, 1 Close * @param[in] event Event as XML * @param[in] arg Extra argument provided in stream_ss_add * @see stream_ss_add */ -typedef int (*stream_fn_t)(clicon_handle h, cxobj *event, void *arg); +typedef int (*stream_fn_t)(clicon_handle h, int op, cxobj *event, void *arg); struct stream_subscription{ qelem_t ss_q; /* queue header */ @@ -90,7 +91,7 @@ int stream_timer_setup(int fd, void *arg); struct stream_subscription *stream_ss_add(clicon_handle h, char *stream, char *xpath, struct timeval *start, struct timeval *stop, stream_fn_t fn, void *arg); -int stream_ss_rm(event_stream_t *es, struct stream_subscription *ss); +int stream_ss_rm(clicon_handle h, event_stream_t *es, struct stream_subscription *ss); struct stream_subscription *stream_ss_find(event_stream_t *es, stream_fn_t fn, void *arg); int stream_ss_delete_all(clicon_handle h, stream_fn_t fn, void *arg); diff --git a/lib/src/clixon_stream.c b/lib/src/clixon_stream.c index c9b7c143..e0c145e3 100644 --- a/lib/src/clixon_stream.c +++ b/lib/src/clixon_stream.c @@ -164,7 +164,7 @@ stream_delete_all(clicon_handle h) if (es->es_description) free(es->es_description); while ((ss = es->es_subscription) != NULL) - stream_ss_rm(es, ss); + stream_ss_rm(h, es, ss); while ((r = es->es_replay) != NULL){ DELQ(r, es->es_replay, struct stream_replay *); if (r->r_xml) @@ -239,6 +239,7 @@ stream_timer_setup(int fd, struct stream_replay *r; struct stream_replay *r1; + clicon_debug(2, "%s", __FUNCTION__); /* Go thru callbacks and see if any have timed out, if so remove them * Could also be done by a separate timer. */ @@ -255,7 +256,8 @@ stream_timer_setup(int fd, do { if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){ ss1 = NEXTQ(struct stream_subscription *, ss); - if (stream_ss_rm(es, ss) < 0) + /* Signal to remove stream for upper levels */ + if (stream_ss_rm(h, es, ss) < 0) goto done; ss = ss1; } @@ -369,15 +371,20 @@ stream_ss_add(clicon_handle h, * @retval -1 Error */ int -stream_ss_rm(event_stream_t *es, +stream_ss_rm(clicon_handle h, + event_stream_t *es, struct stream_subscription *ss) { + clicon_debug(1, "%s", __FUNCTION__); DELQ(ss, es->es_subscription, struct stream_subscription *); + /* Remove from upper layers - close socket etc. */ + (*ss->ss_fn)(h, 1, NULL, ss->ss_arg); if (ss->ss_stream) free(ss->ss_stream); if (ss->ss_xpath) free(ss->ss_xpath); free(ss); + return 0; } @@ -421,9 +428,10 @@ stream_ss_delete_all(clicon_handle h, if ((es = clicon_stream(h)) != NULL){ do { - if ((ss = stream_ss_find(es, fn, arg)) != NULL) - if (stream_ss_rm(es, ss) < 0) + if ((ss = stream_ss_find(es, fn, arg)) != NULL){ + if (stream_ss_rm(h, es, ss) < 0) goto done; + } es = NEXTQ(struct event_stream *, es); } while (es && es != clicon_stream(h)); } @@ -451,18 +459,24 @@ stream_notify_xml(clicon_handle h, int retval = -1; struct stream_subscription *ss; - clicon_debug(1, "%s", __FUNCTION__); + clicon_debug(2, "%s", __FUNCTION__); /* Go thru all subscriptions and find matches */ if ((ss = es->es_subscription) != NULL) do { if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */ - timercmp(&ss->ss_stoptime, tv, <)) - ; + timercmp(&ss->ss_stoptime, tv, <)){ + struct stream_subscription *ss1; + ss1 = NEXTQ(struct stream_subscription *, ss); + /* Signal to remove stream for upper levels */ + if (stream_ss_rm(h, es, ss) < 0) + goto done; + ss = ss1; + } else /* xpath match */ if (ss->ss_xpath == NULL || strlen(ss->ss_xpath)==0 || xpath_first(xevent, "%s", ss->ss_xpath) != NULL) - if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0) + if ((*ss->ss_fn)(h, 0, xevent, ss->ss_arg) < 0) goto done; ss = NEXTQ(struct stream_subscription *, ss); } while (ss && ss != es->es_subscription); @@ -499,7 +513,7 @@ stream_notify(clicon_handle h, struct timeval tv; event_stream_t *es; - clicon_debug(1, "%s", __FUNCTION__); + clicon_debug(2, "%s", __FUNCTION__); if ((es = stream_find(h, stream)) == NULL) goto ok; va_start(args, event); @@ -604,7 +618,7 @@ stream_replay_notify(clicon_handle h, if (timerisset(&ss->ss_stoptime) && timercmp(&r->r_tv, &ss->ss_stoptime, >)) break; - if ((*ss->ss_fn)(h, r->r_xml, ss->ss_arg) < 0) + if ((*ss->ss_fn)(h, 0, r->r_xml, ss->ss_arg) < 0) goto done; r = NEXTQ(struct stream_replay *, r); } while (r && r!=es->es_replay); diff --git a/test/test_stream.sh b/test/test_stream.sh index f1f84bc6..117bd8ac 100755 --- a/test/test_stream.sh +++ b/test/test_stream.sh @@ -4,9 +4,24 @@ # 1. http server setup, such as nginx described in apps/restconf/README.md # especially SSE - ngchan setup # 2. Example stream as Clixon example which needs registration, callback and -# notification generating code +# notification generating code every 5s +# +# Testing of streams is quite complicated. +# Here are some testing dimensions in restconf alone: +# - start/stop subscription +# - start-time/stop-time in subscription +# - stream retention time +# - native vs nchan implementation +# Focussing on 1-3 +# 2a) start sub 8s - see 2 notifications +# 2b) start sub 8s - stoptime after 5s - see 1 notifications +# 2c) start sub 8s - replay from start -8s - see 4 notifications +# 2d) start sub 8s - replay from start -8s to stop +4s - see 3 notifications +# 2e) start sub 8s - replay from -90s w retention 60s - see 10 notifications APPNAME=example +UTIL=../util/clixon_util_stream +DATE=$(date +"%Y-%m-%d") # include err() and new() functions and creates $dir . ./lib.sh cfg=$dir/conf.xml @@ -33,6 +48,7 @@ cat < $cfg true streams https://localhost + 60 EOF @@ -84,34 +100,131 @@ if [ $? -ne 0 ]; then err fi new "start backend -s init -f $cfg -y $fyang" -sudo $clixon_backend -s init -f $cfg -y $fyang # -D 1 +sudo $clixon_backend -s init -f $cfg -y $fyang -D 1 if [ $? -ne 0 ]; then err fi new "kill old restconf daemon" -sudo pkill -u www-data clixon_restconf +#sudo pkill -u www-data clixon_restconf new "start restconf daemon" -sudo start-stop-daemon -S -q -o -b -x /www-data/clixon_restconf -d /www-data -c www-data -- -f $cfg -y $fyang # -D 1 +#sudo start-stop-daemon -S -q -o -b -x /www-data/clixon_restconf -d /www-data -c www-data -- -f $cfg -y $fyang -D 1 +sleep 2 + +# +# 1. Netconf RFC5277 stream testing +new "1. Netconf RFC5277 stream testing" +# 1.1 Stream discovery new "netconf event stream discovery RFC5277 Sec 3.2.5" -expecteof "$clixon_netconf -qf $cfg -y $fyang" 0 ']]>]]>' 'EXAMPLEExample event streamfalse]]>]]>' +expecteof "$clixon_netconf -qf $cfg -y $fyang" 0 ']]>]]>' 'EXAMPLEExample event streamtrue]]>]]>' new "netconf event stream discovery RFC8040 Sec 6.2" -expecteof "$clixon_netconf -qf $cfg -y $fyang" 0 ']]>]]>' 'EXAMPLEExample event streamfalsexmlhttps://localhost/streams/EXAMPLE]]>]]>' +expecteof "$clixon_netconf -qf $cfg -y $fyang" 0 ']]>]]>' 'EXAMPLEExample event streamtruexmlhttps://localhost/streams/EXAMPLE]]>]]>' +# +# 1.2 Netconf stream subscription +new "netconf EXAMPLE subscription" +expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE]]>]]>' '^]]>]]>20' 5 + +new "netconf subscription with empty startTime" +expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE]]>]]>' '^]]>]]>20' 5 + +new "netconf EXAMPLE subscription with simple filter" +expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE]]>]]>" '^]]>]]>20' 5 + +new "netconf EXAMPLE subscription with filter classifier" +expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE]]>]]>" '^]]>]]>20' 5 + +new "netconf NONEXIST subscription" +expectwait "$clixon_netconf -qf $cfg -y $fyang" 'NONEXIST]]>]]>' '^invalid-valueapplicationerrorNo such stream]]>]]>$' 5 + +new "netconf EXAMPLE subscription with wrong date" +expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLEkallekaka]]>]]>' '^bad-elementapplicationstartTimeerrorExpected timestamp]]>]]>$' 0 + +#new "netconf EXAMPLE subscription with replay" +#NOW=$(date +"%Y-%m-%dT%H:%M:%S") +#sleep 10 +#expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE$NOW]]>]]>" '^]]>]]>20' 10 + +# +# 2. Restconf RFC8040 stream testing +new "2. Restconf RFC8040 stream testing" +# 2.1 Stream discovery new "restconf event stream discovery RFC8040 Sec 6.2" -expectfn "curl -s -X GET http://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams" 0 '{"streams": {"stream": \[{"name": "EXAMPLE","description": "Example event stream","replay-support": false,"access": \[{"encoding": "xml","location": "https://localhost/streams/EXAMPLE"}\]}\]}' +expectfn "curl -s -X GET http://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams" 0 '{"streams": {"stream": \[{"name": "EXAMPLE","description": "Example event stream","replay-support": true,"access": \[{"encoding": "xml","location": "https://localhost/streams/EXAMPLE"}\]}\]}' new "restconf subscribe RFC8040 Sec 6.3, get location" expectfn "curl -s -X GET http://localhost/restconf/data/ietf-restconf-monitoring:restconf-state/streams/stream=EXAMPLE/access=xml/location" 0 '{"location": "https://localhost/streams/EXAMPLE"}' -# Restconf stream subscription RFC8040 Sec 6.3 - Native solution +# Restconf stream subscription RFC8040 Sec 6.3 +# Start Subscription w error new "restconf monitor event nonexist stream" expectwait 'curl -s -X GET -H "Accept: text/event-stream" -H "Cache-Control: no-cache" -H "Connection: keep-alive" http://localhost/streams/NOTEXIST' 0 'invalid-valueapplicationerrorNo such stream' 2 +# 2a) start subscription 8s - see 2 notifications +new "2a) start subscriptions 8s - see 2 notifications" +ret=$($UTIL -u http://localhost/streams/EXAMPLE -t 8) +expect="data: ${DATE}T[0-9:.]*faultEthernet0major" + +match=$(echo "$ret" | grep -Eo "$expect") +if [ -z "$match" ]; then + err "$expect" "$ret" +fi +nr=$(echo "$ret" | grep -c "data:") +if [ $nr != 1 -a $nr != 2 ]; then + err 2 "$nr" +fi + +# 2b) start subscription 8s - stoptime after 5s - see 1 notifications + +new "2b) start subscriptions 8s - stoptime after 5s - see 1 notifications" +ret=$($UTIL -u http://localhost/streams/EXAMPLE -t 8 -e +10) +expect="data: ${DATE}T[0-9:.]*faultEthernet0major" +match=$(echo "$ret" | grep -Eo "$expect") +if [ -z "$match" ]; then + err "$expect" "$ret" +fi +nr=$(echo "$ret" | grep -c "data:") +if [ $nr != 1 -a $nr != 2 ]; then + err 1 "$nr" +fi + +# 2c +new "2c) start sub 8s - replay from start -8s - see 4 notifications" +ret=$($UTIL -u http://localhost/streams/EXAMPLE -t 10 -s -8) +expect="data: ${DATE}T[0-9:.]*faultEthernet0major" +match=$(echo "$ret" | grep -Eo "$expect") +if [ -z "$match" ]; then + err "$expect" "$ret" +fi +nr=$(echo "$ret" | grep -c "data:") +if [ $nr != 4 ]; then + err 4 "$nr" +fi +exit +#----------------- + +sudo pkill -u www-data clixon_restconf + +new "Kill backend" +# kill backend +sudo clixon_backend -zf $cfg +if [ $? -ne 0 ]; then + err "kill backend" +fi + +# Check if still alive +pid=`pgrep clixon_backend` +if [ -n "$pid" ]; then + sudo kill $pid +fi + +rm -rf $dir +exit +#-------------------------------------------------------------------- # Need manual testing new "restconf monitor streams native NEEDS manual testing" if false; then @@ -138,45 +251,6 @@ if false; then # Expect: echo foo fi -# Netconf stream subscription -# Switch here since subscriptions takes time -if true; then -new "netconf EXAMPLE subscription" -expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLE]]>]]>' '^]]>]]>20' 5 -new "netconf EXAMPLE subscription with simple filter" -expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE]]>]]>" '^]]>]]>20' 5 -new "netconf EXAMPLE subscription with filter classifier" -expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE]]>]]>" '^]]>]]>20' 5 - -new "netconf NONEXIST subscription" -expectwait "$clixon_netconf -qf $cfg -y $fyang" 'NONEXIST]]>]]>' '^invalid-valueapplicationerrorNo such stream]]>]]>$' 5 -fi - -new "netconf EXAMPLE subscription with wrong date" -expectwait "$clixon_netconf -qf $cfg -y $fyang" 'EXAMPLEkallekaka]]>]]>' '^bad-elementapplicationstartTimeerrorExpected timestamp]]>]]>$' 0 - -new "netconf EXAMPLE subscription with replay" -NOW=$(date +"%Y-%m-%dT%H:%M:%S") -sleep 10 -expectwait "$clixon_netconf -qf $cfg -y $fyang" "EXAMPLE$NOW]]>]]>" '^]]>]]>20' 10 - -new "Kill restconf daemon" -sudo pkill -u www-data clixon_restconf - -new "Kill backend" -# kill backend -sudo clixon_backend -zf $cfg -if [ $? -ne 0 ]; then - err "kill backend" -fi - -# Check if still alive -pid=`pgrep clixon_backend` -if [ -n "$pid" ]; then - sudo kill $pid -fi - -rm -rf $dir diff --git a/util/clixon_util_stream.c b/util/clixon_util_stream.c index 2cff755a..6c802799 100644 --- a/util/clixon_util_stream.c +++ b/util/clixon_util_stream.c @@ -33,7 +33,7 @@ * Stream restconf support functions. * (Original in grideye) - * Example: clixon_util_stream http://localhost/streams/EXAMPLE 10 + * Example: clixon_util_stream -u http://localhost/streams/EXAMPLE -s 2018-10-21T19:22:16 */ #ifdef HAVE_CONFIG_H @@ -49,6 +49,7 @@ #include #include #include +#include #include /* cligen */ @@ -93,8 +94,7 @@ curl_get_cb(void *ptr, * * If getdata is set, return the (malloced) data (which should be freed). * - * @param[in] query 'q' parameter that should be URL-encoded, ie ?q= - * XXX: dont add q=, there may be more parameters. + * @param[in] start 'start-time' parameter that will be URL-encoded * @retval -1 fatal error * @retval 1 ok * @@ -103,10 +103,11 @@ curl_get_cb(void *ptr, * better TCP performance */ int -url_get(char *url, - char *query, - int timeout, - char **getdata) +stream_url_get(char *url, + char *start, + char *stop, + int timeout, + char **getdata) { int retval = -1; CURL *curl; @@ -126,30 +127,36 @@ url_get(char *url, if ((cbf = cbuf_new()) == NULL) goto done; - if (query){ - if ((encoded = curl_easy_escape(curl, query, 0)) == NULL){ - clicon_debug(1, "curl_easy_escape"); - goto done; - } - } if ((err = malloc(CURL_ERROR_SIZE)) == NULL) { clicon_debug(1, "%s: malloc", __FUNCTION__); goto done; } - /* specify URL to get */ - if (query) - cprintf(cbf, "%s?q=%s", url, encoded); - else - cprintf(cbf, "%s", url); curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); + /* HEADERS */ list = curl_slist_append(list, "Accept: text/event-stream"); list = curl_slist_append(list, "Cache-Control: no-cache"); list = curl_slist_append(list, "Connection: keep-alive"); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list); - - /* For reference, this url works - "http://192.36.171.239:8086/db/nordunet/series?q=select%20tcmp2%20from%20%22dk-ore%22%20limit%201" - */ + /* specify URL to get */ + cprintf(cbf, "%s", url); + if (strlen(start)||strlen(stop)) + cprintf(cbf, "?"); + if (strlen(start)){ + if ((encoded = curl_easy_escape(curl, start, 0)) == NULL) + goto done; + cprintf(cbf, "start-time=%s", encoded); + curl_free(encoded); + encoded = NULL; + } + if (strlen(stop)){ + if (strlen(start)) + cprintf(cbf, "&"); + if ((encoded = curl_easy_escape(curl, stop, 0)) == NULL) + goto done; + cprintf(cbf, "stop-time=%s", encoded); + curl_free(encoded); + encoded = NULL; + } clicon_debug(1, "url: %s\n", cbuf_get(cbf)); curl_easy_setopt(curl, CURLOPT_URL, cbuf_get(cbf)); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_get_cb); @@ -159,9 +166,7 @@ url_get(char *url, field, so we provide one */ curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, err); - // curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, timeout); - // curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout); curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L); ret = curl_easy_perform(curl); if (ret != CURLE_OPERATION_TIMEDOUT && ret != CURLE_OK){ @@ -190,7 +195,16 @@ url_get(char *url, static int usage(char *argv0) { - fprintf(stderr, "usage:%s .\n\tInput on stdin\n", argv0); + fprintf(stderr, "usage:%s *\n" + "where options are:\n" + "\t-h\t\tHelp\n" + "\t-D \tDebug level\n" + "\t-u \tURL (mandatory)\n" + "\t-s \tStart-time (format: 2018-10-21T19:22:16 OR +/-s\n" + "\t-e \tStop-time (same format as start)\n" + "\t-t \tTimeout (default: 10)\n" + , argv0); + exit(0); } @@ -198,22 +212,69 @@ int main(int argc, char **argv) { cbuf *cb = cbuf_new(); - char *url; - char *query = NULL; + char *url = NULL; char *getdata = NULL; - int timeout; + int timeout = 10; + char start[27] = {0,}; /* strlen = 0 */ + char stop[27] = {0,}; + char c; + char *argv0 = argv[0]; + struct timeval now; - if (argc != 3){ + clicon_log_init("xpath", LOG_DEBUG, CLICON_LOG_STDERR); + gettimeofday(&now, NULL); + optind = 1; + opterr = 0; + while ((c = getopt(argc, argv, "hDu:s:e:t:")) != -1) + switch (c) { + case 'h': + usage(argv0); + break; + case 'D': + debug++; + break; + case 'u': /* URL */ + url = optarg; + break; + case 's': /* start-time */ + if (*optarg == '+' || *optarg == '-'){ + struct timeval t; + t = now; + t.tv_sec += atoi(optarg); + if (time2str(t, start, sizeof(start)) < 0) + goto done; + } + else + strcpy(start, optarg); + break; + case 'e': /* stop-time */ + if (*optarg == '+' || *optarg == '-'){ + struct timeval t; + t = now; + t.tv_sec += atoi(optarg); + if (time2str(t, stop, sizeof(stop)) < 0) + goto done; + } + else + strcpy(stop, optarg); + break; + case 't': /* timeout */ + timeout = atoi(optarg); + break; + default: + usage(argv[0]); + break; + } + if (url == NULL) usage(argv[0]); - return 0; - } - url = argv[1]; - timeout = atoi(argv[2]); - if (url_get(url, query, timeout, &getdata) < 0) + curl_global_init(0); + if (stream_url_get(url, start, stop, timeout, &getdata) < 0) goto done; - fprintf(stdout, "%s", getdata); + if (getdata) + fprintf(stdout, "%s", getdata); fflush(stdout); done: + curl_global_cleanup(); if (getdata) free(getdata); if (cb)