control: Queue packets received Out-of-Order

Retransmissions are done very slowly, so we'd really better store the
Out-of-Order messages so that we can catch up quickly once we get the
missing piece.
This commit is contained in:
Samuel Thibault 2024-02-06 20:52:54 +01:00
parent 3ab80a9d66
commit 85044bc6a4
2 changed files with 91 additions and 12 deletions

101
l2tpns.c
View file

@ -3456,6 +3456,7 @@ static controlt *controlnew(uint16_t mtype)
} }
assert(c); assert(c);
c->next = 0; c->next = 0;
c->ns = 0; // only used for OoO receives
c->buf[0] = 0xC8; // flags c->buf[0] = 0xC8; // flags
c->buf[1] = 0x02; // ver c->buf[1] = 0x02; // ver
c->length = 12; c->length = 12;
@ -4033,6 +4034,7 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
uint8_t *recvchalresponse = NULL; uint8_t *recvchalresponse = NULL;
uint16_t l = len, t = 0, s = 0, ns = 0, nr = 0; uint16_t l = len, t = 0, s = 0, ns = 0, nr = 0;
uint8_t *p = buf + 2; uint8_t *p = buf + 2;
controlt *c;
CSTAT(processudp); CSTAT(processudp);
@ -4175,19 +4177,63 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
fmtaddr(htonl(tunnel[t].ip), 0), tunnel[t].port, t); fmtaddr(htonl(tunnel[t].ip), 0), tunnel[t].port, t);
} }
// If the 'ns' just received is not the 'nr' we're // If the 'ns' just received is is less than the 'nr'
// expecting, just send an ack and drop it. // we're expecting, we got a retransmitted packet.
// // Just send an ack and drop it.
// if 'ns' is less, then we got a retransmitted packet. if (ns - tunnel[t].nr >= 0x8000u)
// if 'ns' is greater than missed a packet. Either way {
// we should ignore it. if (l) // Is this not a ZLB?
controlnull(t);
return;
}
// If the 'ns' just received is greater than the 'nr'
// we're expecting, we missed a packet. If it's not too
// big and new, store this one to look after it after we
// get the retransmission of the missing piece.
if (ns != tunnel[t].nr) if (ns != tunnel[t].nr)
{ {
// is this the sequence we were expecting?
STAT(tunnel_rx_errors); STAT(tunnel_rx_errors);
LOG(1, 0, t, " Out of sequence tunnel %u, (%u is not the expected %u)\n", LOG(1, 0, t, " Out of sequence tunnel %u, (%u is not the expected %u)\n",
t, ns, tunnel[t].nr); t, ns, tunnel[t].nr);
if (tunnel[t].state == TUNNELOPEN
&& ns - tunnel[t].nr <= 10 && len <= MAXCONTROL)
{
// Not too big and not too new
controlt **curp;
LOG(2, 0, t, " Queueing it\n");
// Find where to put it in the queue
for (curp = &tunn_local[t].controlr; (c = *curp); curp = &c->next)
{
if (ns == c->ns)
{
LOG(2, 0, t, " We already had this piece\n");
break;
}
if (ns < c->ns)
{
// The rest is greater than this, put this before
c = NULL;
break;
}
}
if (curp && !c)
{
// We don't already have this piece, store it
c = controlnew(0);
c->next = *curp;
*curp = c;
c->length = len;
c->ns = ns;
memcpy(c->buf, buf, len);
}
}
// Tell peer what we have
if (l) // Is this not a ZLB? if (l) // Is this not a ZLB?
controlnull(t); controlnull(t);
return; return;
@ -4199,7 +4245,7 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
// some to clear maybe? // some to clear maybe?
while (tunnel[t].controlc > 0 && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000)) while (tunnel[t].controlc > 0 && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000))
{ {
controlt *c = tunnel[t].controls; c = tunnel[t].controls;
tunnel[t].controls = c->next; tunnel[t].controls = c->next;
tunnel[t].controlc--; tunnel[t].controlc--;
c->next = controlfree; c->next = controlfree;
@ -4215,7 +4261,7 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
{ {
// some control packets can now be sent that were previous stuck out of window // some control packets can now be sent that were previous stuck out of window
int tosend = tunnel[t].window - skip; int tosend = tunnel[t].window - skip;
controlt *c = tunnel[t].controls; c = tunnel[t].controls;
while (c && skip) while (c && skip)
{ {
c = c->next; c = c->next;
@ -4259,7 +4305,7 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
STAT(tunnel_rx_errors); STAT(tunnel_rx_errors);
free(sendchalresponse); free(sendchalresponse);
free(recvchalresponse); free(recvchalresponse);
return; goto out;
} }
p += n; // next p += n; // next
l -= n; l -= n;
@ -4815,7 +4861,7 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
} }
free(sendchalresponse); free(sendchalresponse);
free(recvchalresponse); free(recvchalresponse);
return; goto out;
case 11: // ICRP case 11: // ICRP
LOG(3, s, t, "Received ICRP\n"); LOG(3, s, t, "Received ICRP\n");
if (session[s].forwardtosession) if (session[s].forwardtosession)
@ -4878,6 +4924,37 @@ void processudp(uint8_t *buf, int len, struct sockaddr_in *addr, uint16_t indexu
free(sendchalresponse); free(sendchalresponse);
free(recvchalresponse); free(recvchalresponse);
cluster_send_tunnel(t); cluster_send_tunnel(t);
out:
// We processed a control packet, check if we can process the OoO queue
c = tunn_local[t].controlr;
while (c && c->ns - tunnel[t].nr >= 0x8000u)
{
// We received this again in the meanwhile! Drop.
LOG(2, 0, t, " We received again %u, drop\n", c->ns);
tunn_local[t].controlr = c->next;
c->next = controlfree;
controlfree = c;
c = tunn_local[t].controlr;
}
if (c && c->ns == tunnel[t].nr)
{
// We caught up with what we saved for later! Dequeue this.
LOG(2, 0, t, " We caught up with %u\n", c->ns);
tunn_local[t].controlr = c->next;
// And process it.
// Note: this might recurse for the rest of the queue, but the
// queue is bound and while processing it we are not queueing more.
processudp(c->buf, c->length, addr, indexudpfd);
c->next = controlfree;
controlfree = c;
}
} }
else else
{ {
@ -7634,7 +7711,7 @@ int load_tunnel(tunnelidt t, tunnelt *new)
// Clear tunnel control messages. These are dynamically allocated. // Clear tunnel control messages. These are dynamically allocated.
// If we get unlucky, this may cause the tunnel to drop! // If we get unlucky, this may cause the tunnel to drop!
// //
tunnel[t].controls = tunnel[t].controle = NULL; tunnel[t].controls = tunnel[t].controle = tunn_local[t].controlr = NULL;
tunnel[t].controlc = 0; tunnel[t].controlc = 0;
if (tunnel[t].state == TUNNELFREE) if (tunnel[t].state == TUNNELFREE)

View file

@ -275,6 +275,7 @@ typedef struct controls // control message
{ {
struct controls *next; // next in queue struct controls *next; // next in queue
uint16_t length; // length uint16_t length; // length
uint16_t ns; // sequence number
uint8_t buf[MAXCONTROL]; uint8_t buf[MAXCONTROL];
} }
controlt; controlt;
@ -506,6 +507,7 @@ tunnelt;
typedef struct typedef struct
{ {
controlt *controlr; // queue of OoO-received messages
int l2tp_fd; // kernel acceleration UDP socket int l2tp_fd; // kernel acceleration UDP socket
} }
tunnellocalt; tunnellocalt;