cluster: Support running multiple instances on the same host

With IP_MULTICAST_LOOP they can see each other. We "just" have to make sure
they use different IP addresses and route metrics to distinguish from each
other.
This commit is contained in:
Samuel Thibault 2025-03-17 20:51:06 +01:00
parent 366faaea76
commit b2942b3c53
5 changed files with 86 additions and 22 deletions

View file

@ -38,7 +38,8 @@
*/ */
// Module variables. // Module variables.
extern int cluster_sockfd; // The filedescriptor for the cluster communications port. extern int cluster_sockfd; // The filedescriptor for the cluster communications multicast.
extern int cluster_sockfd2; // The filedescriptor for the cluster communications p2p.
in_addr_t my_address = 0; // The network address of my ethernet port. in_addr_t my_address = 0; // The network address of my ethernet port.
static int walk_session_number = 0; // The next session to send when doing the slow table walk. static int walk_session_number = 0; // The next session to send when doing the slow table walk.
@ -103,9 +104,10 @@ int cluster_init()
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(config->cluster_port); addr.sin_port = htons(config->cluster_port);
addr.sin_addr.s_addr = INADDR_ANY; addr.sin_addr.s_addr = htonl(INADDR_ANY);
opt = 1; opt = 1;
setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
setsockopt(cluster_sockfd, SOL_IP, IP_PKTINFO, &opt, sizeof(opt)); // recvfromto
opt = fcntl(cluster_sockfd, F_GETFL, 0); opt = fcntl(cluster_sockfd, F_GETFL, 0);
fcntl(cluster_sockfd, F_SETFL, opt | O_NONBLOCK); fcntl(cluster_sockfd, F_SETFL, opt | O_NONBLOCK);
@ -117,21 +119,33 @@ int cluster_init()
} }
strcpy(ifr.ifr_name, config->cluster_interface); strcpy(ifr.ifr_name, config->cluster_interface);
if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) < 0) if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) == 0)
{
memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr));
my_address = interface_addr.sin_addr.s_addr;
}
else
{
struct in_addr myaddr;
if (inet_aton(config->cluster_interface, &myaddr) != 0)
{
memset(&interface_addr, 0, sizeof(interface_addr));
interface_addr.sin_family = AF_INET;
interface_addr.sin_addr = myaddr;
my_address = myaddr.s_addr;
}
else
{ {
LOG(0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno)); LOG(0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno));
return -1; return -1;
} }
}
memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr));
my_address = interface_addr.sin_addr.s_addr;
// Join multicast group. // Join multicast group.
mreq.imr_multiaddr.s_addr = config->cluster_address; mreq.imr_multiaddr.s_addr = config->cluster_address;
mreq.imr_interface = interface_addr.sin_addr; mreq.imr_interface = interface_addr.sin_addr;
opt = 1; // Turn on multicast loopback.
opt = 0; // Turn off multicast loopback.
setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt)); setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt));
opt = 7; // Highest priority to avoid getting hit by high-traffic opt = 7; // Highest priority to avoid getting hit by high-traffic
@ -161,6 +175,21 @@ int cluster_init()
config->cluster_last_hb = TIME; config->cluster_last_hb = TIME;
config->cluster_seq_number = -1; config->cluster_seq_number = -1;
cluster_sockfd2 = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
addr.sin_addr.s_addr = my_address;
opt = 1;
setsockopt(cluster_sockfd2, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (bind(cluster_sockfd2, (void *) &addr, sizeof(addr)) < 0)
{
LOG(0, 0, 0, "Failed to bind cluster socket2: %s\n", strerror(errno));
return -1;
}
opt = fcntl(cluster_sockfd2, F_GETFL, 0);
fcntl(cluster_sockfd2, F_SETFL, opt | O_NONBLOCK);
return cluster_sockfd; return cluster_sockfd;
} }
@ -173,6 +202,7 @@ int cluster_init()
static int cluster_send_data(void *data, int datalen) static int cluster_send_data(void *data, int datalen)
{ {
struct sockaddr_in addr = {0}; struct sockaddr_in addr = {0};
struct in_addr myaddr = { .s_addr = my_address };
if (!cluster_sockfd) return -1; if (!cluster_sockfd) return -1;
if (!config->cluster_address) return 0; if (!config->cluster_address) return 0;
@ -183,7 +213,7 @@ static int cluster_send_data(void *data, int datalen)
LOG(5, 0, 0, "Cluster send data: %d bytes\n", datalen); LOG(5, 0, 0, "Cluster send data: %d bytes\n", datalen);
if (sendto(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) if (sendtofrom(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr), &myaddr) < 0)
{ {
LOG(0, 0, 0, "sendto: %s\n", strerror(errno)); LOG(0, 0, 0, "sendto: %s\n", strerror(errno));
return -1; return -1;
@ -254,8 +284,7 @@ static int peer_send_data(in_addr_t peer, uint8_t *data, int size)
{ {
struct sockaddr_in addr = {0}; struct sockaddr_in addr = {0};
if (!cluster_sockfd) return -1; if (!cluster_sockfd2) return -1;
if (!config->cluster_address) return 0;
if (!peer) // Odd?? if (!peer) // Odd??
return -1; return -1;
@ -266,7 +295,7 @@ static int peer_send_data(in_addr_t peer, uint8_t *data, int size)
LOG_HEX(5, "Peer send", data, size); LOG_HEX(5, "Peer send", data, size);
if (sendto(cluster_sockfd, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) if (sendto(cluster_sockfd2, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0)
{ {
LOG(0, 0, 0, "sendto: %s\n", strerror(errno)); LOG(0, 0, 0, "sendto: %s\n", strerror(errno));
return -1; return -1;

View file

@ -348,7 +348,12 @@ should be set by a line like: set configstring \"value\" set ipaddress
`cluster_interface` (string) `cluster_interface` (string)
: Interface for cluster packets (default: eth0) : Interface for cluster packets (default: eth0).
An IPv4 address can also be specified, e.g. to run several l2tpns on the
same host (but beware of enabling kernel acceleration only on one instance,
to set a different `bind_portremotelns`, and to set a different
`route_metric`. It is also useful to set a different `route_protocol`
number)
`cluster_mcast_ttl` (int) `cluster_mcast_ttl` (int)

View file

@ -92,7 +92,8 @@ int daefd = -1; // Socket listening for DAE connections.
int snoopfd = -1; // UDP file handle for sending out intercept data int snoopfd = -1; // UDP file handle for sending out intercept data
int *radfds = NULL; // RADIUS requests file handles int *radfds = NULL; // RADIUS requests file handles
int rand_fd = -1; // Random data source int rand_fd = -1; // Random data source
int cluster_sockfd = -1; // Intra-cluster communications socket. int cluster_sockfd = -1; // Intra-cluster multicast communications socket.
int cluster_sockfd2 = -1; // Intra-cluster p2p communications socket.
int epollfd = -1; // event polling int epollfd = -1; // event polling
time_t basetime = 0; // base clock time_t basetime = 0; // base clock
char hostname[MAXHOSTNAME] = ""; // us. char hostname[MAXHOSTNAME] = ""; // us.
@ -2386,6 +2387,9 @@ static void initdae(void)
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(config->radius_dae_port); addr.sin_port = htons(config->radius_dae_port);
daefd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); daefd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
#ifdef SO_REUSEPORT
setsockopt(daefd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
#endif
setsockopt(daefd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); setsockopt(daefd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
setsockopt(daefd, SOL_IP, IP_PKTINFO, &on, sizeof(on)); // recvfromto setsockopt(daefd, SOL_IP, IP_PKTINFO, &on, sizeof(on)); // recvfromto
if (bind(daefd, (struct sockaddr *) &addr, sizeof(addr)) < 0) if (bind(daefd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
@ -2407,6 +2411,9 @@ static int initudp(int * pudpfd, in_addr_t ip_bind, in_addr_t ip_dest, uint16_t
addr.sin_port = htons(L2TPPORT); addr.sin_port = htons(L2TPPORT);
addr.sin_addr.s_addr = ip_bind; addr.sin_addr.s_addr = ip_bind;
(*pudpfd) = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); (*pudpfd) = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
#ifdef SO_REUSEPORT
setsockopt((*pudpfd), SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
#endif
setsockopt((*pudpfd), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); setsockopt((*pudpfd), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
{ {
int flags = fcntl((*pudpfd), F_GETFL, 0); int flags = fcntl((*pudpfd), F_GETFL, 0);
@ -5845,8 +5852,8 @@ static int still_busy(void)
return 0; return 0;
} }
// the base set of fds polled: cli, cluster, tun, udp (MAX_UDPFD), control, dae, netlink, udplac, pppoedisc, pppoesess, dhcpv6, icmpv6 // the base set of fds polled: cli, cluster, cluster2, tun, udp (MAX_UDPFD), control, dae, netlink, udplac, pppoedisc, pppoesess, dhcpv6, icmpv6
#define BASE_FDS (11 + MAX_UDPFD) #define BASE_FDS (12 + MAX_UDPFD)
// additional polled fds // additional polled fds
#ifdef BGP #ifdef BGP
@ -5883,8 +5890,8 @@ static void mainloop(void)
exit(1); exit(1);
} }
LOG(4, 0, 0, "Beginning of main loop. clifd=%d, cluster_sockfd=%d, tunfd=%d, udpfd=%d, controlfd=%d, daefd=%d, rtnlfd=%d , udplacfd=%d, pppoefd=%d, pppoesessfd=%d\n", LOG(4, 0, 0, "Beginning of main loop. clifd=%d, cluster_sockfd=%d, cluster_sockfd=%d, tunfd=%d, udpfd=%d, controlfd=%d, daefd=%d, rtnlfd=%d , udplacfd=%d, pppoefd=%d, pppoesessfd=%d\n",
clifd, cluster_sockfd, tunfd, udpfd[0], controlfd, daefd, rtnlfd, udplacfd, pppoediscfd, pppoesessfd); clifd, cluster_sockfd, cluster_sockfd2, tunfd, udpfd[0], controlfd, daefd, rtnlfd, udplacfd, pppoediscfd, pppoesessfd);
/* setup our fds to poll for input */ /* setup our fds to poll for input */
{ {
@ -5905,6 +5912,10 @@ static void mainloop(void)
e.data.ptr = &d[i++]; e.data.ptr = &d[i++];
epoll_ctl(epollfd, EPOLL_CTL_ADD, cluster_sockfd, &e); epoll_ctl(epollfd, EPOLL_CTL_ADD, cluster_sockfd, &e);
d[i].type = FD_TYPE_CLUSTER2;
e.data.ptr = &d[i++];
epoll_ctl(epollfd, EPOLL_CTL_ADD, cluster_sockfd2, &e);
d[i].type = FD_TYPE_TUN; d[i].type = FD_TYPE_TUN;
e.data.ptr = &d[i++]; e.data.ptr = &d[i++];
epoll_ctl(epollfd, EPOLL_CTL_ADD, tunfd, &e); epoll_ctl(epollfd, EPOLL_CTL_ADD, tunfd, &e);
@ -6009,7 +6020,7 @@ static void mainloop(void)
int pppoesess_ready = 0; int pppoesess_ready = 0;
int pppoesess_pkts = 0; int pppoesess_pkts = 0;
int tun_ready = 0; int tun_ready = 0;
int cluster_ready = 0; int cluster_ready = 0, cluster_ready2 = 0;
int udp_pkts[MAX_UDPFD + 1] = INIT_TABUDPVAR; int udp_pkts[MAX_UDPFD + 1] = INIT_TABUDPVAR;
int tun_pkts = 0; int tun_pkts = 0;
int cluster_pkts = 0; int cluster_pkts = 0;
@ -6043,6 +6054,7 @@ static void mainloop(void)
// these are handled below, with multiple interleaved reads // these are handled below, with multiple interleaved reads
case FD_TYPE_CLUSTER: cluster_ready++; break; case FD_TYPE_CLUSTER: cluster_ready++; break;
case FD_TYPE_CLUSTER2: cluster_ready2++; break;
case FD_TYPE_TUN: tun_ready++; break; case FD_TYPE_TUN: tun_ready++; break;
case FD_TYPE_UDP: udp_ready[d->index]++; break; case FD_TYPE_UDP: udp_ready[d->index]++; break;
case FD_TYPE_PPPOESESS: pppoesess_ready++; break; case FD_TYPE_PPPOESESS: pppoesess_ready++; break;
@ -6283,6 +6295,22 @@ static void mainloop(void)
n--; n--;
} }
} }
// cluster2
if (cluster_ready2)
{
alen = sizeof(addr);
if ((s = recvfrom(cluster_sockfd2, p, size_bufp, MSG_WAITALL, (void *) &addr, &alen)) > 0)
{
processcluster(p, s, addr.sin_addr.s_addr);
cluster_pkts++;
}
else
{
cluster_ready2 = 0;
n--;
}
}
} }
if (udp_pkts[0] > 1 || tun_pkts > 1 || cluster_pkts > 1) if (udp_pkts[0] > 1 || tun_pkts > 1 || cluster_pkts > 1)

View file

@ -1119,6 +1119,7 @@ struct event_data {
enum { enum {
FD_TYPE_CLI, FD_TYPE_CLI,
FD_TYPE_CLUSTER, FD_TYPE_CLUSTER,
FD_TYPE_CLUSTER2,
FD_TYPE_TUN, FD_TYPE_TUN,
FD_TYPE_UDP, FD_TYPE_UDP,
FD_TYPE_CONTROL, FD_TYPE_CONTROL,

3
util.c
View file

@ -67,7 +67,7 @@ void *shared_malloc(unsigned int size)
} }
extern int forked; extern int forked;
extern int cluster_sockfd, tunfd, controlfd, daefd, snoopfd, ifrfd, ifr6fd, rand_fd; extern int cluster_sockfd, cluster_sockfd2, tunfd, controlfd, daefd, snoopfd, ifrfd, ifr6fd, rand_fd;
extern int pppoediscfd, pppoesessfd; extern int pppoediscfd, pppoesessfd;
extern int *radfds; extern int *radfds;
extern int udpfd[MAX_UDPFD + 1]; extern int udpfd[MAX_UDPFD + 1];
@ -103,6 +103,7 @@ pid_t fork_and_close()
// Close sockets // Close sockets
if (clifd != -1) close(clifd); if (clifd != -1) close(clifd);
if (cluster_sockfd != -1) close(cluster_sockfd); if (cluster_sockfd != -1) close(cluster_sockfd);
if (cluster_sockfd2 != -1) close(cluster_sockfd2);
if (tunfd != -1) close(tunfd); if (tunfd != -1) close(tunfd);
for (i = 0; i < config->nbudpfd; i++) for (i = 0; i < config->nbudpfd; i++)