extmod/modlwip: Implement a queue of incoming UDP/raw packets.

The bare-metal lwIP socket interface is currently quite limited when used
for UDP streams, because it only allows one outstanding incoming UDP
packet.  If one UDP packet is waiting to be socket.recv'd and another one
comes along, then the second one is simply dropped.

This commit implements a queue for incoming UDP and raw packets.  The queue
depth is fixed at compile time, and is currently 4.

This allows better use of UDP connections, eg more efficient.  It also
makes DTLS work better which sometimes has a queue of UDP packets (eg
during the connection phase).

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George
2025-05-07 13:05:13 +10:00
parent 61eedbbd11
commit 26e978e7bc

View File

@@ -286,6 +286,15 @@ static const int error_lookup_table[] = {
#define MOD_NETWORK_SOCK_DGRAM (2) #define MOD_NETWORK_SOCK_DGRAM (2)
#define MOD_NETWORK_SOCK_RAW (3) #define MOD_NETWORK_SOCK_RAW (3)
// Total queue length for buffered UDP/raw incoming packets.
#define LWIP_INCOMING_PACKET_QUEUE_LEN (4)
typedef struct _lwip_incoming_packet_t {
struct pbuf *pbuf;
ip_addr_t peer_addr;
uint16_t peer_port;
} lwip_incoming_packet_t;
typedef struct _lwip_socket_obj_t { typedef struct _lwip_socket_obj_t {
mp_obj_base_t base; mp_obj_base_t base;
@@ -294,8 +303,11 @@ typedef struct _lwip_socket_obj_t {
struct udp_pcb *udp; struct udp_pcb *udp;
struct raw_pcb *raw; struct raw_pcb *raw;
} pcb; } pcb;
// Data structure that holds incoming pbuf's.
// Each socket type has different state that it needs to keep track of.
volatile union { volatile union {
struct pbuf *pbuf; // TCP listening sockets have a queue of incoming connections, implemented as a ringbuffer.
struct { struct {
uint8_t alloc; uint8_t alloc;
uint8_t iget; uint8_t iget;
@@ -305,10 +317,23 @@ typedef struct _lwip_socket_obj_t {
struct tcp_pcb **array; // if alloc != 0 struct tcp_pcb **array; // if alloc != 0
} tcp; } tcp;
} connection; } connection;
// Connected TCP sockets have a single incoming pbuf that new data is appended to.
struct {
struct pbuf *pbuf;
} tcp;
// UDP and raw sockets have a queue of incoming pbuf's, implemented as a ringbuffer.
struct {
uint8_t iget; // ringbuffer read index
uint8_t iput; // ringbuffer write index
lwip_incoming_packet_t *array;
} udp_raw;
} incoming; } incoming;
mp_obj_t callback; mp_obj_t callback;
ip_addr_t peer; ip_addr_t tcp_peer_addr;
mp_uint_t peer_port; mp_uint_t tcp_peer_port;
mp_uint_t timeout; mp_uint_t timeout;
uint16_t recv_offset; uint16_t recv_offset;
@@ -347,9 +372,21 @@ static void lwip_socket_free_incoming(lwip_socket_obj_t *socket) {
&& socket->pcb.tcp->state == LISTEN; && socket->pcb.tcp->state == LISTEN;
if (!socket_is_listener) { if (!socket_is_listener) {
if (socket->incoming.pbuf != NULL) { if (socket->type == MOD_NETWORK_SOCK_STREAM) {
pbuf_free(socket->incoming.pbuf); if (socket->incoming.tcp.pbuf != NULL) {
socket->incoming.pbuf = NULL; pbuf_free(socket->incoming.tcp.pbuf);
socket->incoming.tcp.pbuf = NULL;
}
} else {
for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) {
lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[i];
if (slot->pbuf != NULL) {
pbuf_free(slot->pbuf);
slot->pbuf = NULL;
}
}
socket->incoming.udp_raw.iget = 0;
socket->incoming.udp_raw.iput = 0;
} }
} else { } else {
uint8_t alloc = socket->incoming.connection.alloc; uint8_t alloc = socket->incoming.connection.alloc;
@@ -407,6 +444,19 @@ static inline void exec_user_callback(lwip_socket_obj_t *socket) {
} }
} }
static void udp_raw_incoming(lwip_socket_obj_t *socket, struct pbuf *p, const ip_addr_t *addr, u16_t port) {
lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iput];
if (slot->pbuf != NULL) {
// No room in the inn, drop the packet.
pbuf_free(p);
} else {
slot->pbuf = p;
slot->peer_addr = *addr;
slot->peer_port = port;
socket->incoming.udp_raw.iput = (socket->incoming.udp_raw.iput + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN;
}
}
#if MICROPY_PY_LWIP_SOCK_RAW #if MICROPY_PY_LWIP_SOCK_RAW
// Callback for incoming raw packets. // Callback for incoming raw packets.
#if LWIP_VERSION_MAJOR < 2 #if LWIP_VERSION_MAJOR < 2
@@ -416,13 +466,7 @@ static u8_t _lwip_raw_incoming(void *arg, struct raw_pcb *pcb, struct pbuf *p, c
#endif #endif
{ {
lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg; lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg;
udp_raw_incoming(socket, p, addr, 0);
if (socket->incoming.pbuf != NULL) {
pbuf_free(p);
} else {
socket->incoming.pbuf = p;
memcpy(&socket->peer, addr, sizeof(socket->peer));
}
return 1; // we ate the packet return 1; // we ate the packet
} }
#endif #endif
@@ -436,15 +480,7 @@ static void _lwip_udp_incoming(void *arg, struct udp_pcb *upcb, struct pbuf *p,
#endif #endif
{ {
lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg; lwip_socket_obj_t *socket = (lwip_socket_obj_t *)arg;
udp_raw_incoming(socket, p, addr, port);
if (socket->incoming.pbuf != NULL) {
// That's why they call it "unreliable". No room in the inn, drop the packet.
pbuf_free(p);
} else {
socket->incoming.pbuf = p;
socket->peer_port = (mp_uint_t)port;
memcpy(&socket->peer, addr, sizeof(socket->peer));
}
} }
// Callback for general tcp errors. // Callback for general tcp errors.
@@ -562,13 +598,13 @@ static err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err
return ERR_OK; return ERR_OK;
} }
if (socket->incoming.pbuf == NULL) { if (socket->incoming.tcp.pbuf == NULL) {
socket->incoming.pbuf = p; socket->incoming.tcp.pbuf = p;
} else { } else {
#ifdef SOCKET_SINGLE_PBUF #ifdef SOCKET_SINGLE_PBUF
return ERR_BUF; return ERR_BUF;
#else #else
pbuf_cat(socket->incoming.pbuf, p); pbuf_cat(socket->incoming.tcp.pbuf, p);
#endif #endif
} }
@@ -639,7 +675,9 @@ static mp_uint_t lwip_raw_udp_send(lwip_socket_obj_t *socket, const byte *buf, m
// Helper function for recv/recvfrom to handle raw/UDP packets // Helper function for recv/recvfrom to handle raw/UDP packets
static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_t len, ip_addr_t *ip, mp_uint_t *port, int *_errno) { static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_t len, ip_addr_t *ip, mp_uint_t *port, int *_errno) {
if (socket->incoming.pbuf == NULL) { lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget];
if (slot->pbuf == NULL) {
if (socket->timeout == 0) { if (socket->timeout == 0) {
// Non-blocking socket. // Non-blocking socket.
*_errno = MP_EAGAIN; *_errno = MP_EAGAIN;
@@ -648,7 +686,7 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u
// Wait for data to arrive on UDP socket. // Wait for data to arrive on UDP socket.
mp_uint_t start = mp_hal_ticks_ms(); mp_uint_t start = mp_hal_ticks_ms();
while (socket->incoming.pbuf == NULL) { while (slot->pbuf == NULL) {
if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) { if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) {
*_errno = MP_ETIMEDOUT; *_errno = MP_ETIMEDOUT;
return -1; return -1;
@@ -658,17 +696,18 @@ static mp_uint_t lwip_raw_udp_receive(lwip_socket_obj_t *socket, byte *buf, mp_u
} }
if (ip != NULL) { if (ip != NULL) {
memcpy(ip, &socket->peer, sizeof(socket->peer)); *ip = slot->peer_addr;
*port = socket->peer_port; *port = slot->peer_port;
} }
struct pbuf *p = socket->incoming.pbuf; struct pbuf *p = slot->pbuf;
MICROPY_PY_LWIP_ENTER MICROPY_PY_LWIP_ENTER
u16_t result = pbuf_copy_partial(p, buf, ((p->tot_len > len) ? len : p->tot_len), 0); u16_t result = pbuf_copy_partial(p, buf, ((p->tot_len > len) ? len : p->tot_len), 0);
pbuf_free(p); pbuf_free(p);
socket->incoming.pbuf = NULL; slot->pbuf = NULL;
socket->incoming.udp_raw.iget = (socket->incoming.udp_raw.iget + 1) % LWIP_INCOMING_PACKET_QUEUE_LEN;
MICROPY_PY_LWIP_EXIT MICROPY_PY_LWIP_EXIT
@@ -780,7 +819,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
// Check for any pending errors // Check for any pending errors
STREAM_ERROR_CHECK(socket); STREAM_ERROR_CHECK(socket);
if (socket->incoming.pbuf == NULL) { if (socket->incoming.tcp.pbuf == NULL) {
// Non-blocking socket // Non-blocking socket
if (socket->timeout == 0) { if (socket->timeout == 0) {
@@ -792,7 +831,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
} }
mp_uint_t start = mp_hal_ticks_ms(); mp_uint_t start = mp_hal_ticks_ms();
while (socket->state == STATE_CONNECTED && socket->incoming.pbuf == NULL) { while (socket->state == STATE_CONNECTED && socket->incoming.tcp.pbuf == NULL) {
if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) { if (socket->timeout != -1 && mp_hal_ticks_ms() - start > socket->timeout) {
*_errno = MP_ETIMEDOUT; *_errno = MP_ETIMEDOUT;
return -1; return -1;
@@ -801,7 +840,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
} }
if (socket->state == STATE_PEER_CLOSED) { if (socket->state == STATE_PEER_CLOSED) {
if (socket->incoming.pbuf == NULL) { if (socket->incoming.tcp.pbuf == NULL) {
// socket closed and no data left in buffer // socket closed and no data left in buffer
return 0; return 0;
} }
@@ -819,7 +858,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
assert(socket->pcb.tcp != NULL); assert(socket->pcb.tcp != NULL);
struct pbuf *p = socket->incoming.pbuf; struct pbuf *p = socket->incoming.tcp.pbuf;
mp_uint_t remaining = p->len - socket->recv_offset; mp_uint_t remaining = p->len - socket->recv_offset;
if (len > remaining) { if (len > remaining) {
@@ -830,7 +869,7 @@ static mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
remaining -= len; remaining -= len;
if (remaining == 0) { if (remaining == 0) {
socket->incoming.pbuf = p->next; socket->incoming.tcp.pbuf = p->next;
// If we don't ref here, free() will free the entire chain, // If we don't ref here, free() will free the entire chain,
// if we ref, it does what we need: frees 1st buf, and decrements // if we ref, it does what we need: frees 1st buf, and decrements
// next buf's refcount back to 1. // next buf's refcount back to 1.
@@ -854,8 +893,18 @@ static const mp_obj_type_t lwip_socket_type;
static void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) { static void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
lwip_socket_obj_t *self = MP_OBJ_TO_PTR(self_in); lwip_socket_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_printf(print, "<socket state=%d timeout=%d incoming=%p off=%d>", self->state, self->timeout, mp_printf(print, "<socket state=%d timeout=%d incoming=", self->state, self->timeout);
self->incoming.pbuf, self->recv_offset); if (self->type == MOD_NETWORK_SOCK_STREAM) {
mp_printf(print, "%p off=%d>", self->incoming.tcp.pbuf, self->recv_offset);
} else {
int num_in_queue = 0;
for (size_t i = 0; i < LWIP_INCOMING_PACKET_QUEUE_LEN; ++i) {
if (self->incoming.udp_raw.array[i].pbuf != NULL) {
++num_in_queue;
}
}
mp_printf(print, "%d>", num_in_queue);
}
} }
// FIXME: Only supports two arguments at present // FIXME: Only supports two arguments at present
@@ -884,16 +933,22 @@ static mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, size_t n_args, s
socket->incoming.connection.tcp.item = NULL; socket->incoming.connection.tcp.item = NULL;
break; break;
case MOD_NETWORK_SOCK_DGRAM: case MOD_NETWORK_SOCK_DGRAM:
socket->pcb.udp = udp_new();
socket->incoming.pbuf = NULL;
break;
#if MICROPY_PY_LWIP_SOCK_RAW #if MICROPY_PY_LWIP_SOCK_RAW
case MOD_NETWORK_SOCK_RAW: { case MOD_NETWORK_SOCK_RAW:
#endif
if (socket->type == MOD_NETWORK_SOCK_DGRAM) {
socket->pcb.udp = udp_new();
}
#if MICROPY_PY_LWIP_SOCK_RAW
else {
mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]); mp_int_t proto = n_args <= 2 ? 0 : mp_obj_get_int(args[2]);
socket->pcb.raw = raw_new(proto); socket->pcb.raw = raw_new(proto);
break;
} }
#endif #endif
socket->incoming.udp_raw.iget = 0;
socket->incoming.udp_raw.iput = 0;
socket->incoming.udp_raw.array = m_new0(lwip_incoming_packet_t, LWIP_INCOMING_PACKET_QUEUE_LEN);
break;
default: default:
mp_raise_OSError(MP_EINVAL); mp_raise_OSError(MP_EINVAL);
} }
@@ -1075,7 +1130,7 @@ static mp_obj_t lwip_socket_accept(mp_obj_t self_in) {
// ...and set up the new socket for it. // ...and set up the new socket for it.
socket2->domain = MOD_NETWORK_AF_INET; socket2->domain = MOD_NETWORK_AF_INET;
socket2->type = MOD_NETWORK_SOCK_STREAM; socket2->type = MOD_NETWORK_SOCK_STREAM;
socket2->incoming.pbuf = NULL; socket2->incoming.tcp.pbuf = NULL;
socket2->timeout = socket->timeout; socket2->timeout = socket->timeout;
socket2->state = STATE_CONNECTED; socket2->state = STATE_CONNECTED;
socket2->recv_offset = 0; socket2->recv_offset = 0;
@@ -1130,8 +1185,8 @@ static mp_obj_t lwip_socket_connect(mp_obj_t self_in, mp_obj_t addr_in) {
socket->state = STATE_NEW; socket->state = STATE_NEW;
mp_raise_OSError(error_lookup_table[-err]); mp_raise_OSError(error_lookup_table[-err]);
} }
socket->peer_port = (mp_uint_t)port; socket->tcp_peer_addr = dest;
memcpy(&socket->peer, &dest, sizeof(socket->peer)); socket->tcp_peer_port = (mp_uint_t)port;
MICROPY_PY_LWIP_EXIT MICROPY_PY_LWIP_EXIT
// And now we wait... // And now we wait...
@@ -1299,8 +1354,8 @@ static mp_obj_t lwip_socket_recvfrom(mp_obj_t self_in, mp_obj_t len_in) {
mp_uint_t ret = 0; mp_uint_t ret = 0;
switch (socket->type) { switch (socket->type) {
case MOD_NETWORK_SOCK_STREAM: { case MOD_NETWORK_SOCK_STREAM: {
memcpy(&ip, &socket->peer, sizeof(socket->peer)); ip = socket->tcp_peer_addr;
port = (mp_uint_t)socket->peer_port; port = (mp_uint_t)socket->tcp_peer_port;
ret = lwip_tcp_receive(socket, (byte *)vstr.buf, len, &_errno); ret = lwip_tcp_receive(socket, (byte *)vstr.buf, len, &_errno);
break; break;
} }
@@ -1537,9 +1592,15 @@ static mp_uint_t lwip_socket_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_
if (lwip_socket_incoming_array(socket)[socket->incoming.connection.iget] != NULL) { if (lwip_socket_incoming_array(socket)[socket->incoming.connection.iget] != NULL) {
ret |= MP_STREAM_POLL_RD; ret |= MP_STREAM_POLL_RD;
} }
} else if (socket->type == MOD_NETWORK_SOCK_STREAM) {
// For TCP sockets there is just one slot for incoming data
if (socket->incoming.tcp.pbuf != NULL) {
ret |= MP_STREAM_POLL_RD;
}
} else { } else {
// Otherwise there is just one slot for incoming data // Otherwise for UDP/raw there is a queue of incoming data
if (socket->incoming.pbuf != NULL) { lwip_incoming_packet_t *slot = &socket->incoming.udp_raw.array[socket->incoming.udp_raw.iget];
if (slot->pbuf != NULL) {
ret |= MP_STREAM_POLL_RD; ret |= MP_STREAM_POLL_RD;
} }
} }