stream replay and subscription update
This commit is contained in:
parent
fb0d0606e3
commit
f23a21d5db
9 changed files with 103 additions and 66 deletions
|
|
@ -77,6 +77,7 @@ clicon_hash_t *clicon_data(clicon_handle h);
|
|||
/* Return internal stream hash-array given a handle.*/
|
||||
struct event_stream *clicon_stream(clicon_handle h);
|
||||
struct event_stream;
|
||||
int clicon_stream_set(clicon_handle h, struct event_stream *es);
|
||||
int clicon_stream_append(clicon_handle h, struct event_stream *es);
|
||||
|
||||
#endif /* _CLIXON_HANDLE_H_ */
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ struct stream_replay{
|
|||
/* See RFC8040 9.3, stream list, no replay support for now
|
||||
*/
|
||||
struct event_stream{
|
||||
struct event_stream *es_next;
|
||||
qelem_t es_q; /* queue header */
|
||||
char *es_name; /* name of notification event stream */
|
||||
char *es_description;
|
||||
struct stream_subscription *es_subscription;
|
||||
|
|
@ -81,7 +81,7 @@ typedef struct event_stream event_stream_t;
|
|||
*/
|
||||
event_stream_t *stream_find(clicon_handle h, const char *name);
|
||||
int stream_register(clicon_handle h, const char *name, const char *description, int replay_enabled);
|
||||
int stream_delete_all(event_stream_t *es);
|
||||
int stream_delete_all(clicon_handle h);
|
||||
int stream_get_xml(clicon_handle h, int access, cbuf *cb);
|
||||
int stream_timer_setup(int fd, void *arg);
|
||||
/* Subscriptions */
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ clicon_handle_exit(clicon_handle h)
|
|||
hash_free(ha);
|
||||
if ((ha = clicon_data(h)) != NULL)
|
||||
hash_free(ha);
|
||||
stream_delete_all(clicon_stream(h));
|
||||
stream_delete_all(h);
|
||||
free(ch);
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -187,14 +187,22 @@ clicon_stream(clicon_handle h)
|
|||
return ch->ch_stream;
|
||||
}
|
||||
|
||||
int
|
||||
clicon_stream_set(clicon_handle h,
|
||||
event_stream_t *es)
|
||||
{
|
||||
struct clicon_handle *ch = handle(h);
|
||||
|
||||
ch->ch_stream = es;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
clicon_stream_append(clicon_handle h,
|
||||
event_stream_t *es)
|
||||
{
|
||||
struct clicon_handle *ch = handle(h);
|
||||
event_stream_t **ep;
|
||||
|
||||
for (ep = &ch->ch_stream; (*ep); ep=&(*ep)->es_next);
|
||||
*ep = es;
|
||||
|
||||
ADDQ(es, ch->ch_stream);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,12 +90,17 @@ event_stream_t *
|
|||
stream_find(clicon_handle h,
|
||||
const char *name)
|
||||
{
|
||||
event_stream_t *es0;
|
||||
event_stream_t *es = NULL;
|
||||
|
||||
for (es=clicon_stream(h); es; es=es->es_next)
|
||||
if (strcmp(name, es->es_name)==0)
|
||||
break;
|
||||
return es;
|
||||
es0 = clicon_stream(h);
|
||||
if ((es = es0) != NULL)
|
||||
do {
|
||||
if (strcmp(name, es->es_name)==0)
|
||||
return es;
|
||||
es = NEXTQ(struct event_stream *, es);
|
||||
} while (es && es != es0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*! Add notification event stream
|
||||
|
|
@ -137,14 +142,16 @@ stream_register(clicon_handle h,
|
|||
* @param[in] es
|
||||
*/
|
||||
int
|
||||
stream_delete_all(event_stream_t *es)
|
||||
stream_delete_all(clicon_handle h)
|
||||
{
|
||||
event_stream_t *e_next;
|
||||
struct stream_replay *r;
|
||||
struct stream_subscription *ss;
|
||||
event_stream_t *es;
|
||||
event_stream_t *head = clicon_stream(h);
|
||||
|
||||
while (es){
|
||||
e_next = es->es_next;
|
||||
while ((es = clicon_stream(h)) != NULL){
|
||||
DELQ(es, head, event_stream_t *);
|
||||
clicon_stream_set(h, head);
|
||||
if (es->es_name)
|
||||
free(es->es_name);
|
||||
if (es->es_description)
|
||||
|
|
@ -158,7 +165,6 @@ stream_delete_all(event_stream_t *es)
|
|||
free(r);
|
||||
}
|
||||
free(es);
|
||||
es = e_next;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -180,22 +186,25 @@ stream_get_xml(clicon_handle h,
|
|||
char *stream_path;
|
||||
|
||||
cprintf(cb, "<streams>");
|
||||
for (es=clicon_stream(h); es; es=es->es_next){
|
||||
cprintf(cb, "<stream>");
|
||||
cprintf(cb, "<name>%s</name>", es->es_name);
|
||||
if (es->es_description)
|
||||
cprintf(cb, "<description>%s</description>", es->es_description);
|
||||
cprintf(cb, "<replay-support>false</replay-support>");
|
||||
if (access){
|
||||
cprintf(cb, "<access>");
|
||||
cprintf(cb, "<encoding>xml</encoding>");
|
||||
url_prefix = clicon_option_str(h, "CLICON_STREAM_URL");
|
||||
stream_path = clicon_option_str(h, "CLICON_STREAM_PATH");
|
||||
cprintf(cb, "<location>%s/%s/%s</location>",
|
||||
url_prefix, stream_path, es->es_name);
|
||||
cprintf(cb, "</access>");
|
||||
}
|
||||
cprintf(cb, "</stream>");
|
||||
if ((es = clicon_stream(h)) != NULL){
|
||||
do {
|
||||
cprintf(cb, "<stream>");
|
||||
cprintf(cb, "<name>%s</name>", es->es_name);
|
||||
if (es->es_description)
|
||||
cprintf(cb, "<description>%s</description>", es->es_description);
|
||||
cprintf(cb, "<replay-support>false</replay-support>");
|
||||
if (access){
|
||||
cprintf(cb, "<access>");
|
||||
cprintf(cb, "<encoding>xml</encoding>");
|
||||
url_prefix = clicon_option_str(h, "CLICON_STREAM_URL");
|
||||
stream_path = clicon_option_str(h, "CLICON_STREAM_PATH");
|
||||
cprintf(cb, "<location>%s/%s/%s</location>",
|
||||
url_prefix, stream_path, es->es_name);
|
||||
cprintf(cb, "</access>");
|
||||
}
|
||||
cprintf(cb, "</stream>");
|
||||
es = NEXTQ(struct event_stream *, es);
|
||||
} while (es && es != clicon_stream(h));
|
||||
}
|
||||
cprintf(cb, "</streams>");
|
||||
return 0;
|
||||
|
|
@ -224,18 +233,24 @@ stream_timer_setup(int fd,
|
|||
*/
|
||||
gettimeofday(&now, NULL);
|
||||
/* for all event streams, remove subscription if past stop time */
|
||||
for (es=clicon_stream(h); es; es=es->es_next){
|
||||
if ((ss = es->es_subscription) != NULL)
|
||||
do {
|
||||
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
|
||||
ss1 = NEXTQ(struct stream_subscription *, ss);
|
||||
if (stream_ss_rm(es, ss) < 0)
|
||||
goto done;
|
||||
ss = ss1;
|
||||
}
|
||||
else
|
||||
ss = NEXTQ(struct stream_subscription *, ss);
|
||||
} while (ss && ss != es->es_subscription);
|
||||
|
||||
|
||||
|
||||
if ((es = clicon_stream(h)) != NULL){
|
||||
do {
|
||||
if ((ss = es->es_subscription) != NULL)
|
||||
do {
|
||||
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
|
||||
ss1 = NEXTQ(struct stream_subscription *, ss);
|
||||
if (stream_ss_rm(es, ss) < 0)
|
||||
goto done;
|
||||
ss = ss1;
|
||||
}
|
||||
else
|
||||
ss = NEXTQ(struct stream_subscription *, ss);
|
||||
} while (ss && ss != es->es_subscription);
|
||||
es = NEXTQ(struct event_stream *, es);
|
||||
} while (es && es != clicon_stream(h));
|
||||
}
|
||||
/* Initiate new timer */
|
||||
timeradd(&now, &t1, &t);
|
||||
|
|
@ -374,10 +389,14 @@ stream_ss_delete_all(clicon_handle h,
|
|||
event_stream_t *es;
|
||||
struct stream_subscription *ss;
|
||||
|
||||
for (es=clicon_stream(h); es; es=es->es_next)
|
||||
if ((ss = stream_ss_find(es, fn, arg)) != NULL)
|
||||
if (stream_ss_rm(es, ss) < 0)
|
||||
goto done;
|
||||
if ((es = clicon_stream(h)) != NULL){
|
||||
do {
|
||||
if ((ss = stream_ss_find(es, fn, arg)) != NULL)
|
||||
if (stream_ss_rm(es, ss) < 0)
|
||||
goto done;
|
||||
es = NEXTQ(struct event_stream *, es);
|
||||
} while (es && es != clicon_stream(h));
|
||||
}
|
||||
retval = 0;
|
||||
done:
|
||||
return retval;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue