Stream debug and tests

This commit is contained in:
Olof hagsand 2018-10-30 22:28:24 +01:00
parent e4adec413a
commit fa9b9c7e2e
8 changed files with 295 additions and 121 deletions

View file

@ -164,7 +164,7 @@ stream_delete_all(clicon_handle h)
if (es->es_description)
free(es->es_description);
while ((ss = es->es_subscription) != NULL)
stream_ss_rm(es, ss);
stream_ss_rm(h, es, ss);
while ((r = es->es_replay) != NULL){
DELQ(r, es->es_replay, struct stream_replay *);
if (r->r_xml)
@ -239,6 +239,7 @@ stream_timer_setup(int fd,
struct stream_replay *r;
struct stream_replay *r1;
clicon_debug(2, "%s", __FUNCTION__);
/* Go thru callbacks and see if any have timed out, if so remove them
* Could also be done by a separate timer.
*/
@ -255,7 +256,8 @@ stream_timer_setup(int fd,
do {
if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
ss1 = NEXTQ(struct stream_subscription *, ss);
if (stream_ss_rm(es, ss) < 0)
/* Signal to remove stream for upper levels */
if (stream_ss_rm(h, es, ss) < 0)
goto done;
ss = ss1;
}
@ -369,15 +371,20 @@ stream_ss_add(clicon_handle h,
* @retval -1 Error
*/
int
stream_ss_rm(event_stream_t *es,
stream_ss_rm(clicon_handle h,
event_stream_t *es,
struct stream_subscription *ss)
{
clicon_debug(1, "%s", __FUNCTION__);
DELQ(ss, es->es_subscription, struct stream_subscription *);
/* Remove from upper layers - close socket etc. */
(*ss->ss_fn)(h, 1, NULL, ss->ss_arg);
if (ss->ss_stream)
free(ss->ss_stream);
if (ss->ss_xpath)
free(ss->ss_xpath);
free(ss);
return 0;
}
@ -421,9 +428,10 @@ stream_ss_delete_all(clicon_handle h,
if ((es = clicon_stream(h)) != NULL){
do {
if ((ss = stream_ss_find(es, fn, arg)) != NULL)
if (stream_ss_rm(es, ss) < 0)
if ((ss = stream_ss_find(es, fn, arg)) != NULL){
if (stream_ss_rm(h, es, ss) < 0)
goto done;
}
es = NEXTQ(struct event_stream *, es);
} while (es && es != clicon_stream(h));
}
@ -451,18 +459,24 @@ stream_notify_xml(clicon_handle h,
int retval = -1;
struct stream_subscription *ss;
clicon_debug(1, "%s", __FUNCTION__);
clicon_debug(2, "%s", __FUNCTION__);
/* Go thru all subscriptions and find matches */
if ((ss = es->es_subscription) != NULL)
do {
if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */
timercmp(&ss->ss_stoptime, tv, <))
;
timercmp(&ss->ss_stoptime, tv, <)){
struct stream_subscription *ss1;
ss1 = NEXTQ(struct stream_subscription *, ss);
/* Signal to remove stream for upper levels */
if (stream_ss_rm(h, es, ss) < 0)
goto done;
ss = ss1;
}
else /* xpath match */
if (ss->ss_xpath == NULL ||
strlen(ss->ss_xpath)==0 ||
xpath_first(xevent, "%s", ss->ss_xpath) != NULL)
if ((*ss->ss_fn)(h, xevent, ss->ss_arg) < 0)
if ((*ss->ss_fn)(h, 0, xevent, ss->ss_arg) < 0)
goto done;
ss = NEXTQ(struct stream_subscription *, ss);
} while (ss && ss != es->es_subscription);
@ -499,7 +513,7 @@ stream_notify(clicon_handle h,
struct timeval tv;
event_stream_t *es;
clicon_debug(1, "%s", __FUNCTION__);
clicon_debug(2, "%s", __FUNCTION__);
if ((es = stream_find(h, stream)) == NULL)
goto ok;
va_start(args, event);
@ -604,7 +618,7 @@ stream_replay_notify(clicon_handle h,
if (timerisset(&ss->ss_stoptime) &&
timercmp(&r->r_tv, &ss->ss_stoptime, >))
break;
if ((*ss->ss_fn)(h, r->r_xml, ss->ss_arg) < 0)
if ((*ss->ss_fn)(h, 0, r->r_xml, ss->ss_arg) < 0)
goto done;
r = NEXTQ(struct stream_replay *, r);
} while (r && r!=es->es_replay);