add Multilink support from Khaled Al Hamwi

This commit is contained in:
Brendan O'Dea 2006-04-27 09:53:49 +00:00
parent 57a847d9a3
commit 5faf075c8d
13 changed files with 1021 additions and 97 deletions

160
cluster.c
View file

@ -1,6 +1,6 @@
// L2TPNS Clustering Stuff
char const *cvs_id_cluster = "$Id: cluster.c,v 1.50 2006-04-05 02:13:48 bodea Exp $";
char const *cvs_id_cluster = "$Id: cluster.c,v 1.51 2006-04-27 09:53:49 bodea Exp $";
#include <stdio.h>
#include <stdlib.h>
@ -42,6 +42,7 @@ extern int cluster_sockfd; // The filedescriptor for the cluster communications
in_addr_t my_address = 0; // The network address of my ethernet port.
static int walk_session_number = 0; // The next session to send when doing the slow table walk.
static int walk_bundle_number = 0; // The next bundle to send when doing the slow table walk.
static int walk_tunnel_number = 0; // The next tunnel to send when doing the slow table walk.
int forked = 0; // Sanity check: CLI must not diddle with heartbeat table
@ -85,6 +86,7 @@ int cluster_init()
int opt;
config->cluster_undefined_sessions = MAXSESSION-1;
config->cluster_undefined_bundles = MAXBUNDLE-1;
config->cluster_undefined_tunnels = MAXTUNNEL-1;
if (!config->cluster_address)
@ -227,7 +229,7 @@ static void cluster_uptodate(void)
if (config->cluster_iam_uptodate)
return;
if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels)
if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles)
return;
config->cluster_iam_uptodate = 1;
@ -520,7 +522,7 @@ void cluster_check_slaves(void)
//
void cluster_check_master(void)
{
int i, count, tcount, high_unique_id = 0;
int i, count, tcount, bcount, high_unique_id = 0;
int last_free = 0;
clockt t = TIME;
static int probed = 0;
@ -614,6 +616,19 @@ void cluster_check_master(void)
config->cluster_highest_tunnelid = i;
}
//
// Go through and mark all the bundles as defined.
// Count the highest used bundle number as well.
//
config->cluster_highest_bundleid = 0;
for (i = 0, bcount = 0; i < MAXBUNDLE; ++i) {
if (bundle[i].state == BUNDLEUNDEF)
bundle[i].state = BUNDLEFREE;
if (bundle[i].state != BUNDLEFREE && i > config->cluster_highest_bundleid)
config->cluster_highest_bundleid = i;
}
//
// Go through and mark all the sessions as being defined.
// reset the idle timeouts.
@ -675,10 +690,11 @@ void cluster_check_master(void)
rebuild_address_pool();
// If we're not the very first master, this is a big issue!
if(count>0)
if (count > 0)
LOG(0, 0, 0, "Warning: Fixed %d uninitialized sessions in becoming master!\n", count);
config->cluster_undefined_sessions = 0;
config->cluster_undefined_bundles = 0;
config->cluster_undefined_tunnels = 0;
config->cluster_iam_uptodate = 1; // assume all peers are up-to-date
@ -696,7 +712,7 @@ void cluster_check_master(void)
// we fix it up here, and we ensure that the 'first free session'
// pointer is valid.
//
static void cluster_check_sessions(int highsession, int freesession_ptr, int hightunnel)
static void cluster_check_sessions(int highsession, int freesession_ptr, int highbundle, int hightunnel)
{
int i;
@ -705,7 +721,7 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig
if (config->cluster_iam_uptodate)
return;
if (highsession > config->cluster_undefined_sessions && hightunnel > config->cluster_undefined_tunnels)
if (highsession > config->cluster_undefined_sessions && highbundle > config->cluster_undefined_bundles && hightunnel > config->cluster_undefined_tunnels)
return;
// Clear out defined sessions, counting the number of
@ -721,6 +737,19 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig
++config->cluster_undefined_sessions;
}
// Clear out defined bundles, counting the number of
// undefs remaining.
config->cluster_undefined_bundles = 0;
for (i = 1 ; i < MAXBUNDLE; ++i) {
if (i > highbundle) {
if (bundle[i].state == BUNDLEUNDEF) bundle[i].state = BUNDLEFREE; // Defined.
continue;
}
if (bundle[i].state == BUNDLEUNDEF)
++config->cluster_undefined_bundles;
}
// Clear out defined tunnels, counting the number of
// undefs remaining.
config->cluster_undefined_tunnels = 0;
@ -735,9 +764,9 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig
}
if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels) {
LOG(2, 0, 0, "Cleared undefined sessions/tunnels. %d sess (high %d), %d tunn (high %d)\n",
config->cluster_undefined_sessions, highsession, config->cluster_undefined_tunnels, hightunnel);
if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles) {
LOG(2, 0, 0, "Cleared undefined sessions/bundles/tunnels. %d sess (high %d), %d bund (high %d), %d tunn (high %d)\n",
config->cluster_undefined_sessions, highsession, config->cluster_undefined_bundles, highbundle, config->cluster_undefined_tunnels, hightunnel);
return;
}
@ -770,6 +799,27 @@ static int hb_add_type(uint8_t **p, int type, int id)
add_type(p, C_SESSION, id, (uint8_t *) &session[id], sizeof(sessiont));
break;
case C_CBUNDLE: { // Compressed C_BUNDLE
uint8_t c[sizeof(bundlet) * 2]; // Bigger than worst case.
uint8_t *d = (uint8_t *) &bundle[id];
uint8_t *orig = d;
int size;
size = rle_compress( &d, sizeof(bundlet), c, sizeof(c) );
// Did we compress the full structure, and is the size actually
// reduced??
if ( (d - orig) == sizeof(bundlet) && size < sizeof(bundlet) ) {
add_type(p, C_CBUNDLE, id, c, size);
break;
}
// Failed to compress : Fall through.
}
case C_BUNDLE:
add_type(p, C_BUNDLE, id, (uint8_t *) &bundle[id], sizeof(bundlet));
break;
case C_CTUNNEL: { // Compressed C_TUNNEL
uint8_t c[sizeof(tunnelt) * 2]; // Bigger than worst case.
uint8_t *d = (uint8_t *) &tunnel[id];
@ -802,7 +852,7 @@ static int hb_add_type(uint8_t **p, int type, int id)
//
void cluster_heartbeat()
{
int i, count = 0, tcount = 0;
int i, count = 0, tcount = 0, bcount = 0;
uint8_t buff[MAX_HEART_SIZE + sizeof(heartt) + sizeof(int) ];
heartt h;
uint8_t *p = buff;
@ -823,7 +873,9 @@ void cluster_heartbeat()
h.highsession = config->cluster_highest_sessionid;
h.freesession = sessionfree;
h.hightunnel = config->cluster_highest_tunnelid;
h.highbundle = config->cluster_highest_bundleid;
h.size_sess = sizeof(sessiont); // Just in case.
h.size_bund = sizeof(bundlet);
h.size_tunn = sizeof(tunnelt);
h.interval = config->cluster_hb_interval;
h.timeout = config->cluster_hb_timeout;
@ -878,6 +930,22 @@ void cluster_heartbeat()
++tcount;
}
//
// Fill out the packet with bundles from the bundle table...
while ( (p + sizeof(uint32_t) * 2 + sizeof(bundlet) ) < (buff + MAX_HEART_SIZE) ) {
if (!walk_bundle_number) // bundle #0 isn't valid.
++walk_bundle_number;
if (bcount >= config->cluster_highest_bundleid)
break;
hb_add_type(&p, C_CTUNNEL, walk_bundle_number);
walk_tunnel_number = (1+walk_bundle_number)%(config->cluster_highest_bundleid+1); // +1 avoids divide by zero.
++bcount;
}
//
// Did we do something wrong?
if (p > (buff + sizeof(buff))) { // Did we somehow manage to overun the buffer?
@ -887,9 +955,9 @@ void cluster_heartbeat()
}
LOG(3, 0, 0, "Sending v%d heartbeat #%d, change #%" PRIu64 " with %d changes "
"(%d x-sess, %d x-tunnels, %d highsess, %d hightun, size %d)\n",
"(%d x-sess, %d x-bundles, %d x-tunnels, %d highsess, %d highbund, %d hightun, size %d)\n",
HB_VERSION, h.seq, h.table_version, config->cluster_num_changes,
count, tcount, config->cluster_highest_sessionid,
count, bcount, tcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid,
config->cluster_highest_tunnelid, (int) (p - buff));
config->cluster_num_changes = 0;
@ -938,6 +1006,17 @@ int cluster_send_session(int sid)
return type_changed(C_CSESSION, sid);
}
// A particular bundle has been changed!
int cluster_send_bundle(int bid)
{
if (!config->cluster_iam_master) {
LOG(0, 0, bid, "I'm not a master, but I just tried to change a bundle!\n");
return -1;
}
return type_changed(C_CBUNDLE, bid);
}
// A particular tunnel has been changed!
int cluster_send_tunnel(int tid)
{
@ -1175,6 +1254,31 @@ static int cluster_recv_session(int more, uint8_t *p)
return 0;
}
static int cluster_recv_bundle(int more, uint8_t *p)
{
if (more >= MAXBUNDLE) {
LOG(0, 0, 0, "DANGER: Received a bundle id > MAXBUNDLE!\n");
return -1;
}
if (bundle[more].state == BUNDLEUNDEF) {
if (config->cluster_iam_uptodate) { // Sanity.
LOG(0, 0, 0, "I thought I was uptodate but I just found an undefined bundle!\n");
} else {
--config->cluster_undefined_bundles;
}
}
memcpy(&bundle[more], p, sizeof(bundle[more]) );
LOG(5, 0, more, "Received bundle update\n");
if (!config->cluster_iam_uptodate)
cluster_uptodate(); // Check to see if we're up to date.
return 0;
}
static int cluster_recv_tunnel(int more, uint8_t *p)
{
if (more >= MAXTUNNEL) {
@ -1461,7 +1565,7 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t
// Check that we don't have too many undefined sessions, and
// that the free session pointer is correct.
cluster_check_sessions(h->highsession, h->freesession, h->hightunnel);
cluster_check_sessions(h->highsession, h->freesession, h->highbundle, h->hightunnel);
if (h->interval != config->cluster_hb_interval)
{
@ -1569,6 +1673,34 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t
p += sizeof(tunnel[more]);
s -= sizeof(tunnel[more]);
break;
case C_CBUNDLE: { // Compressed bundle structure.
uint8_t c[ sizeof(bundlet) + 2];
int size;
uint8_t *orig_p = p;
size = rle_decompress((uint8_t **) &p, s, c, sizeof(c));
s -= (p - orig_p);
if (size != sizeof(bundlet) ) { // Ouch! Very very bad!
LOG(0, 0, 0, "DANGER: Received a CBUNDLE that didn't decompress correctly!\n");
// Now what? Should exit! No-longer up to date!
break;
}
cluster_recv_bundle(more, c);
break;
}
case C_BUNDLE:
if ( s < sizeof(bundle[more]))
goto shortpacket;
cluster_recv_bundle(more, p);
p += sizeof(bundle[more]);
s -= sizeof(bundle[more]);
break;
default:
LOG(0, 0, 0, "DANGER: I received a heartbeat element where I didn't understand the type! (%d)\n", type);
return -1; // can't process any more of the packet!!
@ -1754,11 +1886,13 @@ int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc)
cli_print(cli, "Table version # : %" PRIu64, config->cluster_table_version);
cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number);
cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid);
cli_print(cli, "%d bundles undefined of %d", config->cluster_undefined_bundles, config->cluster_highest_bundleid);
cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid);
} else {
cli_print(cli, "Table version # : %" PRIu64, config->cluster_table_version);
cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number);
cli_print(cli, "Highest session : %d", config->cluster_highest_sessionid);
cli_print(cli, "Highest bundle : %d", config->cluster_highest_bundleid);
cli_print(cli, "Highest tunnel : %d", config->cluster_highest_tunnelid);
cli_print(cli, "%d changes queued for sending", config->cluster_num_changes);
}