* Restconf stream notification support - two variants.

* Both a "native" stream support and one using nginx/nchan pub/sub.
  * See (apps/restconf/README.md) for details.
* clixon-config YAML file has new revision: 2018-10-21.
This commit is contained in:
Olof hagsand 2018-10-21 22:19:38 +02:00
parent a4e29bcdb7
commit 71eddeaa74
21 changed files with 811 additions and 144 deletions

View file

@ -850,6 +850,11 @@ from_client_create_subscription(clicon_handle h,
goto done;
}
}
if ((stream_find(h, stream)) == NULL){
if (netconf_invalid_value(cbret, "application", "No such stream") < 0)
goto done;
goto ok;
}
if (stream_cb_add(h, stream, selector, ce_event_cb, (void*)ce) < 0)
goto done;
cprintf(cbret, "<rpc-reply><ok/></rpc-reply>");

View file

@ -728,6 +728,14 @@ netconf_discard_changes(clicon_handle h,
<severity>major</severity>
</event>
</notification>
* @see rfc5277:
* An event notification is sent to the client who initiated a
* <create-subscription> command asynchronously when an event of
* interest...
* Parameters: eventTime type dateTime and compliant to [RFC3339]
* Also contains notification-specific tagged content, if any. With
* the exception of <eventTime>, the content of the notification is
* beyond the scope of this document.
*/
static int
netconf_notification_cb(int s,
@ -822,6 +830,8 @@ netconf_create_subscription(clicon_handle h,
}
if (clicon_rpc_netconf_xml(h, xml_parent(xn), xret, &s) < 0)
goto done;
if (xpath_first(*xret, "rpc-reply/rpc-error") != NULL)
goto ok;
if (event_reg_fd(s,
netconf_notification_cb,
NULL,

View file

@ -22,14 +22,9 @@ server {
fastcgi_pass unix:/www-data/fastcgi_restconf.sock;
include fastcgi_params;
}
location /stream { # for restconf notifications
fastcgi_pass unix:/www-data/fastcgi_restconf.sock;
include fastcgi_params;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
}
```
Start nginx daemon
```
sudo /etc/init.d nginx start
@ -73,12 +68,77 @@ olof@vandal> curl -G http://127.0.0.1/restconf/data/interfaces/interface/name=et
curl -sX POST -d '{"interfaces":{"interface":{"name":"eth1","type":"eth","enabled":"true"}}}' http://localhost/restconf/data
```
### Nginx Nchan for streams
Restconf notification event streams needs a server-side push
package. Clixon has used Nchan (nchan.io) for this
### Event streams
Download and install nchan, see nchan.io, Install section.
Clixon have two experimental restconf event stream implementations following
RFC8040 Section 6 using SSE. One native and one using Nginx
nchan. The two variants to subscribe to the stream is described in the
next section.
The example [../../example/README.md] creates and EXAMPLE stream.
Set the Clixon configuration options if they differ from default values - if they are OK you do not need to modify them:
```
<CLICON_STREAM_PATH>streams</CLICON_STREAM_PATH>
<CLICON_STREAM_URL>https://example.com</CLICON_STREAM_URL>
<CLICON_STREAM_PUB>http://localhost/pub</CLICON_STREAM_PUB>
```
where
- https://example.com/streams is the public fronting subscription base URL. A specific stream NAME can be accessed as https://example.com/streams/NAME
- http://localhost/pub is the local internal base publish stream.
You access the streams using curl, but they differ slightly in behaviour as described in the following two sections.
### Native event streams
Add the following to extend the nginx configuration file with the following statements:
```
location /streams {
fastcgi_pass unix:/www-data/fastcgi_restconf.sock;
include fastcgi_params;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
```
You access a native stream as follos:
```
curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE
curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE?start-time=2014-10-25T10%3A02%3A00Z&stop-time=2014-10-25T12%3A31%3A00Z
```
where the first command retrieves only new notifications, and the second receives a range of messages.
### Nginx Nchan streams
Nginx uses pub/sub channels and can be configured in a variety of
ways. The following uses a simple variant with one generic subscription
channel (streams) and one publication channel (pub).
Configure clixon with `--enable-publish` which enables curl code for publishing streams to nchan.
Download and install nchan, see (https://nchan.io/#install).
Add the following to extend the nginx configuration file with the following statements:
```
location ~ /streams/(\w+)$ {
nchan_subscriber;
nchan_channel_id $1; #first capture of the location match
}
location ~ /pub/(\w+)$ {
nchan_publisher;
nchan_channel_id $1; #first capture of the location match
}
```
Access the event stream EXAMPLE using curl:
```
curl -H "Accept: text/event-stream" -s -X GET http://localhost/streams/EXAMPLE
curl -H "Accept: text/event-stream" -H "Last-Event-ID: 1539961709:0" -s -X GET http://localhost/streams/EXAMPLE
```
where the first command retrieves the whole stream history, and the second only retreives the most recent messages given by the ID.
See (https://nchan.io/#eventsource) on more info on how to access an SSE sub endpoint.
### Debugging

View file

@ -221,6 +221,26 @@ notfound(FCGX_Request *r)
return 0;
}
/*! HTTP error 406 Not acceptable
* @param[in] r Fastcgi request handle
*/
int
notacceptable(FCGX_Request *r)
{
char *path;
clicon_debug(1, "%s", __FUNCTION__);
path = FCGX_GetParam("DOCUMENT_URI", r->envp);
FCGX_FPrintF(r->out, "Status: 406\r\n"); /* 406 not acceptible */
FCGX_FPrintF(r->out, "Content-Type: text/html\r\n\r\n");
FCGX_FPrintF(r->out, "<h1>Not Acceptable</h1>\n");
FCGX_FPrintF(r->out, "Not Acceptable\n");
FCGX_FPrintF(r->out, "The target resource does not have a current representation that would be acceptable to the user agent.\n",
path);
return 0;
}
/*! HTTP error 409
* @param[in] r Fastcgi request handle
*/

View file

@ -40,7 +40,6 @@
* Constants
*/
#define RESTCONF_API "restconf"
#define RESTCONF_STREAM "stream"
/*
* Prototypes (also in clixon_restconf.h)
@ -52,6 +51,7 @@ int badrequest(FCGX_Request *r);
int unauthorized(FCGX_Request *r);
int forbidden(FCGX_Request *r);
int notfound(FCGX_Request *r);
int notacceptable(FCGX_Request *r);
int conflict(FCGX_Request *r);
int internal_server_error(FCGX_Request *r);
int notimplemented(FCGX_Request *r);

View file

@ -529,6 +529,7 @@ main(int argc,
yang_spec *yspec = NULL;
yang_spec *yspecfg = NULL; /* For config XXX clixon bug */
char *yang_filename = NULL;
char *stream_path;
/* In the startup, logs to stderr & debug flag set later */
clicon_log_init(__PROGRAM__, LOG_INFO, logdst);
@ -561,7 +562,6 @@ main(int argc,
goto done;
break;
} /* switch getopt */
/*
* Logs, error and debug to stderr or syslog, set debug level
*/
@ -583,7 +583,7 @@ main(int argc,
/* Find and read configfile */
if (clicon_options_main(h, yspecfg) < 0)
goto done;
stream_path = clicon_option_str(h, "CLICON_STREAM_PATH");
/* Now rest of options, some overwrite option file */
optind = 1;
opterr = 0;
@ -652,10 +652,6 @@ main(int argc,
if (clicon_option_bool(h, "CLICON_STREAM_DISCOVERY_RFC5277") &&
yang_spec_parse_module(h, "ietf-netconf-notification", CLIXON_DATADIR, NULL, yspec, NULL)< 0)
goto done;
if (stream_register(h, "NETCONF", "default NETCONF event stream") < 0)
goto done;
/* Call start function in all plugins before we go interactive
Pass all args after the standard options to plugin_start
*/
@ -692,8 +688,8 @@ main(int argc,
clicon_debug(1, "path: %s", path);
if (strncmp(path, "/" RESTCONF_API, strlen("/" RESTCONF_API)) == 0)
api_restconf(h, r); /* This is the function */
else if (strncmp(path, "/" RESTCONF_STREAM, strlen("/" RESTCONF_STREAM)) == 0) {
api_stream(h, r);
else if (strncmp(path+1, stream_path, strlen(stream_path)) == 0) {
api_stream(h, r, stream_path);
}
else if (strncmp(path, RESTCONF_WELL_KNOWN, strlen(RESTCONF_WELL_KNOWN)) == 0) {
api_well_known(h, r); /* */

View file

@ -35,6 +35,26 @@
See RFC 8040 RESTCONF Protocol
Sections 3.8, 6, 9.3
RFC8040:
A RESTCONF server MAY send the "retry" field, and if it does, RESTCONF
clients SHOULD use it. A RESTCONF server SHOULD NOT send the "event"
or "id" fields, as there are no meaningful values. RESTCONF
servers that do not send the "id" field also do not need to support
the HTTP header field "Last-Event-ID"
The RESTCONF client can then use this URL value to start monitoring
the event stream:
GET /streams/NETCONF HTTP/1.1
Host: example.com
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
The server MAY support the "start-time", "stop-time", and "filter"
query parameters, defined in Section 4.8. Refer to Appendix B.3.6
for filter parameter examples.
*/
#ifdef HAVE_CONFIG_H
@ -62,31 +82,136 @@
/* clicon */
#include <clixon/clixon.h>
#include <fcgi_stdio.h> /* Need to be after clixon_xml-h due to attribute format */
#include <fcgi_stdio.h> /* Need to be after clixon_xml.h due to attribute format */
#include "restconf_lib.h"
#include "restconf_stream.h"
/*! Callback when stream notifications arrive from backend
*/
static int
restconf_stream(clicon_handle h,
FCGX_Request *r,
event_stream_t *es)
restconf_stream_cb(int s,
void *arg)
{
int retval = -1;
int retval = -1;
FCGX_Request *r = (FCGX_Request *)arg;
int eof;
struct clicon_msg *reply = NULL;
cxobj *xtop = NULL; /* top xml */
cxobj *xn; /* notification xml */
cbuf *cb;
int pretty = 0; /* XXX should be via arg */
clicon_debug(1, "%s", __FUNCTION__);
/* get msg (this is the reason this function is called) */
if (clicon_msg_rcv(s, &reply, &eof) < 0){
clicon_debug(1, "%s msg_rcv error", __FUNCTION__);
goto done;
}
clicon_debug(1, "%s msg: %s", __FUNCTION__, reply->op_body);
/* handle close from remote end: this will exit the client */
if (eof){
clicon_debug(1, "%s eof", __FUNCTION__);
clicon_err(OE_PROTO, ESHUTDOWN, "Socket unexpected close");
close(s);
errno = ESHUTDOWN;
event_unreg_fd(s, restconf_stream_cb);
FCGX_FPrintF(r->out, "SHUTDOWN\r\n");
FCGX_FPrintF(r->out, "\r\n");
FCGX_FFlush(r->out);
clicon_exit_set();
goto done;
}
if (clicon_msg_decode(reply, &xtop) < 0)
goto done;
/* create event */
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_PLUGIN, errno, "cbuf_new");
goto done;
}
if ((xn = xpath_first(xtop, "notification")) == NULL)
goto ok;
#ifdef notused
xt = xpath_first(xn, "eventTime");
if ((xe = xpath_first(xn, "event")) == NULL) /* event can depend on yang? */
goto ok;
if (xt)
FCGX_FPrintF(r->out, "M#id: %s\r\n", xml_body(xt));
else{ /* XXX */
gettimeofday(&tv, NULL);
FCGX_FPrintF(r->out, "M#id: %02d:0\r\n", tv.tv_sec);
}
#endif
if (clicon_xml2cbuf(cb, xn, 0, pretty) < 0)
goto done;
FCGX_FPrintF(r->out, "data: %s\r\n", cbuf_get(cb));
FCGX_FPrintF(r->out, "\r\n");
FCGX_FFlush(r->out);
ok:
retval = 0;
done:
clicon_debug(1, "%s retval: %d", __FUNCTION__, retval);
if (xtop != NULL)
xml_free(xtop);
if (reply)
free(reply);
if (cb)
cbuf_free(cb);
return retval;
}
/*! Send subsctription to backend
* @param[in] h Clicon handle
* @param[in] r Fastcgi request handle
* @param[in] name Stream name
* @param[out] sp Socket -1 if not set
*/
static int
restconf_stream(clicon_handle h,
FCGX_Request *r,
char *name,
int pretty,
int use_xml,
int *sp)
{
int retval = -1;
cxobj *xret = NULL;
cxobj *xe;
cbuf *cb = NULL;
int s; /* socket */
*sp = -1;
clicon_debug(1, "%s", __FUNCTION__);
if ((cb = cbuf_new()) == NULL){
clicon_err(OE_XML, errno, "cbuf_new");
goto done;
}
cprintf(cb, "<rpc><create-subscription><stream>%s</stream></create-subscription></rpc>]]>]]>", name);
if (clicon_rpc_netconf(h, cbuf_get(cb), &xret, &s) < 0)
goto done;
if ((xe = xpath_first(xret, "rpc-reply/rpc-error")) != NULL){
if (api_return_err(h, r, xe, pretty, use_xml) < 0)
goto done;
goto ok;
}
/* Setting up stream */
FCGX_SetExitStatus(201, r->out); /* Created */
FCGX_FPrintF(r->out, "Content-Type: text/event-stream\r\n");
FCGX_FPrintF(r->out, "Cache-Control: no-cache\r\n");
FCGX_FPrintF(r->out, "Connection: keep-alive\r\n");
FCGX_FPrintF(r->out, "X-Accel-Buffering: no\r\n");
FCGX_FPrintF(r->out, "\r\n");
FCGX_FPrintF(r->out, "Here is output\r\n");
FCGX_FPrintF(r->out, "\r\n");
FCGX_FFlush(r->out);
sync();
sleep(1);
FCGX_FPrintF(r->out, "Here is output 2\r\n");
*sp = s;
ok:
retval = 0;
// done:
done:
if (xret)
xml_free(xret);
if (cb)
cbuf_free(cb);
return retval;
}
@ -94,12 +219,33 @@ restconf_stream(clicon_handle h,
#include "restconf_lib.h"
#include "restconf_stream.h"
int
stream_timeout(int s,
void *arg)
{
struct timeval t;
struct timeval t1;
FCGX_Request *r = (FCGX_Request *)arg;
clicon_debug(1, "%s", __FUNCTION__);
if (FCGX_GetError(r->out) != 0) /* break loop */
clicon_exit_set();
else{
gettimeofday(&t, NULL);
t1.tv_sec = 1; t1.tv_usec = 0;
timeradd(&t, &t1, &t);
event_reg_timeout(t, stream_timeout, arg, "Stream timeout");
}
return 0;
}
/*! Process a FastCGI request
* @param[in] r Fastcgi request handle
*/
int
api_stream(clicon_handle h,
FCGX_Request *r)
FCGX_Request *r,
char *streampath)
{
int retval = -1;
char *path;
@ -113,28 +259,18 @@ api_stream(clicon_handle h,
cbuf *cb = NULL;
char *data;
int authenticated = 0;
char *media_accept;
char *media_content_type;
int pretty;
int parse_xml = 0; /* By default expect and parse JSON */
int use_xml = 0; /* By default use JSON */
int use_xml = 1; /* default */
cbuf *cbret = NULL;
cxobj *xret = NULL;
cxobj *xerr;
event_stream_t *es;
int s=-1;
clicon_debug(1, "%s", __FUNCTION__);
path = FCGX_GetParam("REQUEST_URI", r->envp);
query = FCGX_GetParam("QUERY_STRING", r->envp);
pretty = clicon_option_bool(h, "CLICON_RESTCONF_PRETTY");
/* get xml/json in put and output */
media_accept = FCGX_GetParam("HTTP_ACCEPT", r->envp);
if (media_accept && strcmp(media_accept, "application/yang-data+xml")==0)
use_xml++;
media_content_type = FCGX_GetParam("HTTP_CONTENT_TYPE", r->envp);
if (media_content_type &&
strcmp(media_content_type, "application/yang-data+xml")==0)
parse_xml++;
test(r, 1);
if ((pvec = clicon_strsep(path, "/", &pn)) == NULL)
goto done;
/* Sanity check of path. Should be /stream/<name> */
@ -146,11 +282,11 @@ api_stream(clicon_handle h,
retval = notfound(r);
goto done;
}
if (strcmp(pvec[1], RESTCONF_STREAM)){
if (strcmp(pvec[1], streampath)){
retval = notfound(r);
goto done;
}
test(r, 1);
if ((method = pvec[2]) == NULL){
retval = notfound(r);
goto done;
@ -158,6 +294,7 @@ api_stream(clicon_handle h,
clicon_debug(1, "%s: method=%s", __FUNCTION__, method);
if (str2cvec(query, '&', '=', &qvec) < 0)
goto done;
if (str2cvec(path, '/', '=', &pcvec) < 0) /* rest url eg /album=ricky/foo */
goto done;
/* data */
@ -167,7 +304,6 @@ api_stream(clicon_handle h,
clicon_debug(1, "%s DATA=%s", __FUNCTION__, data);
if (str2cvec(data, '&', '=', &dvec) < 0)
goto done;
/* If present, check credentials. See "plugin_credentials" in plugin
* See RFC 8040 section 2.5
*/
@ -191,12 +327,22 @@ api_stream(clicon_handle h,
goto ok;
}
clicon_debug(1, "%s auth2:%d %s", __FUNCTION__, authenticated, clicon_username_get(h));
if ((es = stream_find(h, method)) == NULL){
retval = notfound(r);
if (restconf_stream(h, r, method, pretty, use_xml, &s) < 0)
goto done;
if (s != -1){
/* Listen to backend socket */
if (event_reg_fd(s,
restconf_stream_cb,
(void*)r,
"stream socket") < 0)
goto done;
/* Poll upstream errors */
stream_timeout(0, (void*)r);
/* Start loop */
event_loop();
event_unreg_fd(s, restconf_stream_cb);
clicon_exit_reset();
}
if (restconf_stream(h, r, es) < 0)
goto done;
ok:
retval = 0;
done:

View file

@ -39,6 +39,6 @@
/*
* Prototypes
*/
int api_stream(clicon_handle h, FCGX_Request *r);
int api_stream(clicon_handle h, FCGX_Request *r, char *streampath);
#endif /* _RESTCONF_STREAM_H_ */