diff --git a/cluster.c b/cluster.c index 866b49c..5c662c2 100644 --- a/cluster.c +++ b/cluster.c @@ -38,7 +38,8 @@ */ // Module variables. -extern int cluster_sockfd; // The filedescriptor for the cluster communications port. +extern int cluster_sockfd; // The filedescriptor for the cluster communications multicast. +extern int cluster_sockfd2; // The filedescriptor for the cluster communications p2p. in_addr_t my_address = 0; // The network address of my ethernet port. static int walk_session_number = 0; // The next session to send when doing the slow table walk. @@ -103,9 +104,10 @@ int cluster_init() memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(config->cluster_port); - addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_addr.s_addr = htonl(INADDR_ANY); opt = 1; setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(cluster_sockfd, SOL_IP, IP_PKTINFO, &opt, sizeof(opt)); // recvfromto opt = fcntl(cluster_sockfd, F_GETFL, 0); fcntl(cluster_sockfd, F_SETFL, opt | O_NONBLOCK); @@ -117,21 +119,33 @@ int cluster_init() } strcpy(ifr.ifr_name, config->cluster_interface); - if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) < 0) + if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) == 0) { - LOG(0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno)); - return -1; + memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr)); + my_address = interface_addr.sin_addr.s_addr; + } + else + { + struct in_addr myaddr; + if (inet_aton(config->cluster_interface, &myaddr) != 0) + { + memset(&interface_addr, 0, sizeof(interface_addr)); + interface_addr.sin_family = AF_INET; + interface_addr.sin_addr = myaddr; + my_address = myaddr.s_addr; + } + else + { + LOG(0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno)); + return -1; + } } - - memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr)); - my_address = interface_addr.sin_addr.s_addr; // Join multicast group. mreq.imr_multiaddr.s_addr = config->cluster_address; mreq.imr_interface = interface_addr.sin_addr; - - opt = 0; // Turn off multicast loopback. + opt = 1; // Turn on multicast loopback. setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt)); opt = 7; // Highest priority to avoid getting hit by high-traffic @@ -161,6 +175,21 @@ int cluster_init() config->cluster_last_hb = TIME; config->cluster_seq_number = -1; + cluster_sockfd2 = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + addr.sin_addr.s_addr = my_address; + opt = 1; + setsockopt(cluster_sockfd2, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + if (bind(cluster_sockfd2, (void *) &addr, sizeof(addr)) < 0) + { + LOG(0, 0, 0, "Failed to bind cluster socket2: %s\n", strerror(errno)); + return -1; + } + + opt = fcntl(cluster_sockfd2, F_GETFL, 0); + fcntl(cluster_sockfd2, F_SETFL, opt | O_NONBLOCK); + return cluster_sockfd; } @@ -173,6 +202,7 @@ int cluster_init() static int cluster_send_data(void *data, int datalen) { struct sockaddr_in addr = {0}; + struct in_addr myaddr = { .s_addr = my_address }; if (!cluster_sockfd) return -1; if (!config->cluster_address) return 0; @@ -183,7 +213,7 @@ static int cluster_send_data(void *data, int datalen) LOG(5, 0, 0, "Cluster send data: %d bytes\n", datalen); - if (sendto(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) + if (sendtofrom(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr), &myaddr) < 0) { LOG(0, 0, 0, "sendto: %s\n", strerror(errno)); return -1; @@ -254,8 +284,7 @@ static int peer_send_data(in_addr_t peer, uint8_t *data, int size) { struct sockaddr_in addr = {0}; - if (!cluster_sockfd) return -1; - if (!config->cluster_address) return 0; + if (!cluster_sockfd2) return -1; if (!peer) // Odd?? return -1; @@ -266,7 +295,7 @@ static int peer_send_data(in_addr_t peer, uint8_t *data, int size) LOG_HEX(5, "Peer send", data, size); - if (sendto(cluster_sockfd, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) + if (sendto(cluster_sockfd2, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) { LOG(0, 0, 0, "sendto: %s\n", strerror(errno)); return -1; diff --git a/docs/src/html/manual.md b/docs/src/html/manual.md index 41d0f36..cff4b6a 100644 --- a/docs/src/html/manual.md +++ b/docs/src/html/manual.md @@ -348,7 +348,12 @@ should be set by a line like: set configstring \"value\" set ipaddress `cluster_interface` (string) -: Interface for cluster packets (default: eth0) +: Interface for cluster packets (default: eth0). + An IPv4 address can also be specified, e.g. to run several l2tpns on the + same host (but beware of enabling kernel acceleration only on one instance, + to set a different `bind_portremotelns`, and to set a different + `route_metric`. It is also useful to set a different `route_protocol` + number) `cluster_mcast_ttl` (int) diff --git a/l2tpns.c b/l2tpns.c index 9b343de..2e0d89b 100644 --- a/l2tpns.c +++ b/l2tpns.c @@ -92,7 +92,8 @@ int daefd = -1; // Socket listening for DAE connections. int snoopfd = -1; // UDP file handle for sending out intercept data int *radfds = NULL; // RADIUS requests file handles int rand_fd = -1; // Random data source -int cluster_sockfd = -1; // Intra-cluster communications socket. +int cluster_sockfd = -1; // Intra-cluster multicast communications socket. +int cluster_sockfd2 = -1; // Intra-cluster p2p communications socket. int epollfd = -1; // event polling time_t basetime = 0; // base clock char hostname[MAXHOSTNAME] = ""; // us. @@ -2386,6 +2387,9 @@ static void initdae(void) addr.sin_family = AF_INET; addr.sin_port = htons(config->radius_dae_port); daefd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +#ifdef SO_REUSEPORT + setsockopt(daefd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); +#endif setsockopt(daefd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); setsockopt(daefd, SOL_IP, IP_PKTINFO, &on, sizeof(on)); // recvfromto if (bind(daefd, (struct sockaddr *) &addr, sizeof(addr)) < 0) @@ -2407,6 +2411,9 @@ static int initudp(int * pudpfd, in_addr_t ip_bind, in_addr_t ip_dest, uint16_t addr.sin_port = htons(L2TPPORT); addr.sin_addr.s_addr = ip_bind; (*pudpfd) = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +#ifdef SO_REUSEPORT + setsockopt((*pudpfd), SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); +#endif setsockopt((*pudpfd), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); { int flags = fcntl((*pudpfd), F_GETFL, 0); @@ -5845,8 +5852,8 @@ static int still_busy(void) return 0; } -// the base set of fds polled: cli, cluster, tun, udp (MAX_UDPFD), control, dae, netlink, udplac, pppoedisc, pppoesess, dhcpv6, icmpv6 -#define BASE_FDS (11 + MAX_UDPFD) +// the base set of fds polled: cli, cluster, cluster2, tun, udp (MAX_UDPFD), control, dae, netlink, udplac, pppoedisc, pppoesess, dhcpv6, icmpv6 +#define BASE_FDS (12 + MAX_UDPFD) // additional polled fds #ifdef BGP @@ -5883,8 +5890,8 @@ static void mainloop(void) exit(1); } - LOG(4, 0, 0, "Beginning of main loop. clifd=%d, cluster_sockfd=%d, tunfd=%d, udpfd=%d, controlfd=%d, daefd=%d, rtnlfd=%d , udplacfd=%d, pppoefd=%d, pppoesessfd=%d\n", - clifd, cluster_sockfd, tunfd, udpfd[0], controlfd, daefd, rtnlfd, udplacfd, pppoediscfd, pppoesessfd); + LOG(4, 0, 0, "Beginning of main loop. clifd=%d, cluster_sockfd=%d, cluster_sockfd=%d, tunfd=%d, udpfd=%d, controlfd=%d, daefd=%d, rtnlfd=%d , udplacfd=%d, pppoefd=%d, pppoesessfd=%d\n", + clifd, cluster_sockfd, cluster_sockfd2, tunfd, udpfd[0], controlfd, daefd, rtnlfd, udplacfd, pppoediscfd, pppoesessfd); /* setup our fds to poll for input */ { @@ -5905,6 +5912,10 @@ static void mainloop(void) e.data.ptr = &d[i++]; epoll_ctl(epollfd, EPOLL_CTL_ADD, cluster_sockfd, &e); + d[i].type = FD_TYPE_CLUSTER2; + e.data.ptr = &d[i++]; + epoll_ctl(epollfd, EPOLL_CTL_ADD, cluster_sockfd2, &e); + d[i].type = FD_TYPE_TUN; e.data.ptr = &d[i++]; epoll_ctl(epollfd, EPOLL_CTL_ADD, tunfd, &e); @@ -6009,7 +6020,7 @@ static void mainloop(void) int pppoesess_ready = 0; int pppoesess_pkts = 0; int tun_ready = 0; - int cluster_ready = 0; + int cluster_ready = 0, cluster_ready2 = 0; int udp_pkts[MAX_UDPFD + 1] = INIT_TABUDPVAR; int tun_pkts = 0; int cluster_pkts = 0; @@ -6043,6 +6054,7 @@ static void mainloop(void) // these are handled below, with multiple interleaved reads case FD_TYPE_CLUSTER: cluster_ready++; break; + case FD_TYPE_CLUSTER2: cluster_ready2++; break; case FD_TYPE_TUN: tun_ready++; break; case FD_TYPE_UDP: udp_ready[d->index]++; break; case FD_TYPE_PPPOESESS: pppoesess_ready++; break; @@ -6283,6 +6295,22 @@ static void mainloop(void) n--; } } + + // cluster2 + if (cluster_ready2) + { + alen = sizeof(addr); + if ((s = recvfrom(cluster_sockfd2, p, size_bufp, MSG_WAITALL, (void *) &addr, &alen)) > 0) + { + processcluster(p, s, addr.sin_addr.s_addr); + cluster_pkts++; + } + else + { + cluster_ready2 = 0; + n--; + } + } } if (udp_pkts[0] > 1 || tun_pkts > 1 || cluster_pkts > 1) diff --git a/l2tpns.h b/l2tpns.h index 4affa5c..efaf836 100644 --- a/l2tpns.h +++ b/l2tpns.h @@ -1119,6 +1119,7 @@ struct event_data { enum { FD_TYPE_CLI, FD_TYPE_CLUSTER, + FD_TYPE_CLUSTER2, FD_TYPE_TUN, FD_TYPE_UDP, FD_TYPE_CONTROL, diff --git a/util.c b/util.c index 6829203..1af31e8 100644 --- a/util.c +++ b/util.c @@ -67,7 +67,7 @@ void *shared_malloc(unsigned int size) } extern int forked; -extern int cluster_sockfd, tunfd, controlfd, daefd, snoopfd, ifrfd, ifr6fd, rand_fd; +extern int cluster_sockfd, cluster_sockfd2, tunfd, controlfd, daefd, snoopfd, ifrfd, ifr6fd, rand_fd; extern int pppoediscfd, pppoesessfd; extern int *radfds; extern int udpfd[MAX_UDPFD + 1]; @@ -103,6 +103,7 @@ pid_t fork_and_close() // Close sockets if (clifd != -1) close(clifd); if (cluster_sockfd != -1) close(cluster_sockfd); + if (cluster_sockfd2 != -1) close(cluster_sockfd2); if (tunfd != -1) close(tunfd); for (i = 0; i < config->nbudpfd; i++)