Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Please read `README.md` for general information about LibUDPard, and `CONTRIBUTING.md` for development-related notes.

Keep the code and comments very brief. Be sure every significant code block is preceded with a brief comment.
DO NOT COMMENT THE CODE unless comments add critical information that is impossible to infer from reading the code (design rationale, gotchas, etc), in which case extremely terse comments are allowed.

If you need a build directory, create one in the project root named with a `build` prefix;
you can also use existing build directories if you prefer so,
Expand Down
52 changes: 36 additions & 16 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ static byte_t* header_serialize(byte_t* const buffer,
const uint32_t frame_payload_offset,
const uint32_t prefix_crc)
{
assert(meta.priority < 8U);
UDPARD_ASSERT(meta.priority < 8U);
byte_t* ptr = buffer;
*ptr++ = (byte_t)(HEADER_VERSION | (meta.priority << 5U));
*ptr++ = 0;
Expand Down Expand Up @@ -643,21 +643,29 @@ static void tx_transfer_retire(udpard_tx_t* const tx, tx_transfer_t* const tr)
/// The heuristics are subject to review and improvement.
static tx_transfer_t* tx_sacrifice(udpard_tx_t* const tx) { return LIST_TAIL(tx->agewise, tx_transfer_t, agewise); }

/// Zero if the count already reached the limit, which can happen if the limit was lowered at runtime.
static size_t tx_queue_vacancy(const udpard_tx_t* const tx)
{
return (tx->enqueued_frames_count < tx->enqueued_frames_limit)
? (tx->enqueued_frames_limit - tx->enqueued_frames_count)
: 0U;
}

/// True on success, false if not possible to reclaim enough space.
static bool tx_ensure_queue_space(udpard_tx_t* const tx, const size_t total_frames_needed)
{
if (total_frames_needed > tx->enqueued_frames_limit) {
return false; // not gonna happen
}
while (total_frames_needed > (tx->enqueued_frames_limit - tx->enqueued_frames_count)) {
while (total_frames_needed > tx_queue_vacancy(tx)) {
tx_transfer_t* const tr = tx_sacrifice(tx);
if (tr == NULL) {
break; // We may have no transfers anymore but the NIC TX driver could still be holding some frames.
}
tx_transfer_retire(tx, tr);
tx->errors_sacrifice++;
}
return total_frames_needed <= (tx->enqueued_frames_limit - tx->enqueued_frames_count);
return total_frames_needed <= tx_queue_vacancy(tx);
}

static int32_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
Expand Down Expand Up @@ -792,9 +800,11 @@ static bool tx_push(udpard_tx_t* const tx,
UDPARD_ASSERT(now <= deadline);
UDPARD_ASSERT(tx != NULL);

const uint16_t iface_bitmap = valid_ep_bitmap(endpoints);
uint16_t iface_bitmap = valid_ep_bitmap(endpoints);
UDPARD_ASSERT((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) != 0);
UDPARD_ASSERT((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) == iface_bitmap);
iface_bitmap &= tx->iface_bitmap;
UDPARD_ASSERT(iface_bitmap != 0U);

// Purge expired transfers before accepting a new one to make room in the queue.
tx_purge_expired_transfers(tx, now);
Expand Down Expand Up @@ -894,18 +904,20 @@ bool udpard_tx_new(udpard_tx_t* const self,
const uint64_t local_uid,
const uint64_t unicast_transfer_id_seed,
const size_t enqueued_frames_limit,
const uint16_t iface_bitmap,
const udpard_tx_mem_resources_t memory,
const udpard_tx_vtable_t* const vtable)
{
const bool ok = (NULL != self) && (local_uid != 0) && tx_validate_mem_resources(memory) && (vtable != NULL) &&
(vtable->eject != NULL);
const bool ok = (NULL != self) && (local_uid != 0) && ((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) == iface_bitmap) &&
tx_validate_mem_resources(memory) && (vtable != NULL) && (vtable->eject != NULL);
if (ok) {
mem_zero(sizeof(*self), self);
self->vtable = vtable;
self->local_uid = local_uid;
self->unicast_transfer_id = unicast_transfer_id_seed + local_uid; // extra entropy
self->enqueued_frames_limit = enqueued_frames_limit;
self->enqueued_frames_count = 0;
self->iface_bitmap = iface_bitmap;
self->memory = memory;
self->index_deadline = NULL;
self->agewise = (udpard_list_t){ NULL, NULL };
Expand All @@ -931,9 +943,11 @@ bool udpard_tx_push(udpard_tx_t* const self,
const udpard_bytes_scattered_t payload,
void* const user)
{
// Only the head payload fragment is validated; inner fragments of the caller-owned chain are not checked.
bool ok = (self != NULL) && (deadline >= now) && (now >= 0) && (self->local_uid != 0) &&
((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) != 0) && (priority < UDPARD_PRIORITY_COUNT) &&
udpard_is_valid_endpoint(endpoint) && ((payload.bytes.data != NULL) || (payload.bytes.size == 0U));
((iface_bitmap & UDPARD_IFACE_BITMAP_ALL & self->iface_bitmap) != 0) &&
(priority < UDPARD_PRIORITY_COUNT) && udpard_is_valid_endpoint(endpoint) &&
((payload.bytes.data != NULL) || (payload.bytes.size == 0U));
if (ok) {
const meta_t meta = {
.priority = priority,
Expand All @@ -960,8 +974,9 @@ bool udpard_tx_push_unicast(udpard_tx_t* const self,
const udpard_bytes_scattered_t payload,
void* const user)
{
// Only the head payload fragment is validated; inner fragments of the caller-owned chain are not checked.
bool ok = (self != NULL) && (deadline >= now) && (now >= 0) && (self->local_uid != 0) &&
(valid_ep_bitmap(endpoints) != 0) && (priority < UDPARD_PRIORITY_COUNT) &&
((valid_ep_bitmap(endpoints) & self->iface_bitmap) != 0) && (priority < UDPARD_PRIORITY_COUNT) &&
((payload.bytes.data != NULL) || (payload.bytes.size == 0U));
if (ok) {
const meta_t meta = {
Expand Down Expand Up @@ -1093,6 +1108,8 @@ void udpard_tx_free(udpard_tx_t* const self)
while (self->agewise.tail != NULL) {
tx_transfer_retire(self, LIST_TAIL(self->agewise, tx_transfer_t, agewise));
}
// Datagram references retained via udpard_tx_refcount_inc() must be released before discarding.
UDPARD_ASSERT(self->enqueued_frames_count == 0U);
}
}

Expand Down Expand Up @@ -1567,7 +1584,6 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
self->history_current = (self->history_current + 1U) % RX_TRANSFER_HISTORY_COUNT;
self->history[self->history_current] = slot->transfer_id;

// Construct the arguments and invoke the callback.
const udpard_rx_transfer_t transfer = {
.timestamp = slot->ts_min,
.priority = slot->priority,
Expand All @@ -1577,11 +1593,13 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
.payload_size_wire = slot->total_size,
.payload = (udpard_fragment_t*)slot->fragments,
};
self->port->vtable->on_message(rx, self->port, transfer);

// Finally, destroy the slot to reclaim memory.
// Destroy the slot before the callback: a re-entrant udpard_rx_port_push() could otherwise evict and
// free this slot, whose fragments are now owned by the application.
slot->fragments = NULL; // Transfer ownership to the application.
rx_slot_destroy(slot_ref, self->port->memory.fragment, self->port->memory.slot);

self->port->vtable->on_message(rx, self->port, transfer);
}

/// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one. Returns NULL on OOM.
Expand Down Expand Up @@ -1771,10 +1789,12 @@ void udpard_rx_new(udpard_rx_t* const self)

void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
{
// Retire at most one per poll to avoid burstiness.
rx_session_t* const ses = LIST_TAIL(self->list_session_by_animation, rx_session_t, list_by_animation);
if ((ses != NULL) && (now >= (ses->last_animated_ts + SESSION_LIFETIME))) {
rx_session_free(ses, &self->list_session_by_animation);
if (self != NULL) {
// Retire at most one per poll to avoid burstiness.
rx_session_t* const ses = LIST_TAIL(self->list_session_by_animation, rx_session_t, list_by_animation);
if ((ses != NULL) && (now >= (ses->last_animated_ts + SESSION_LIFETIME))) {
rx_session_free(ses, &self->list_session_by_animation);
}
}
}

Expand Down
15 changes: 11 additions & 4 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ typedef struct udpard_tx_mem_resources_t
} udpard_tx_mem_resources_t;

/// Request to transmit a UDP datagram over the specified interface.
/// Which interface indexes are available is determined by the user when pushing the transfer.
/// If Berkeley sockets or similar API is used, the application should use a dedicated socket per redundant interface.
typedef struct udpard_tx_ejection_t
{
Expand All @@ -309,7 +308,7 @@ typedef struct udpard_tx_ejection_t

/// Specifies when the frame should be considered expired and dropped if not yet transmitted by then;
/// it is optional to use depending on the implementation of the NIC driver (most traditional drivers ignore it).
/// The library guarantees that now >= deadline at the time of ejection -- expired frames are purged beforehand.
/// The library guarantees that now <= deadline at the time of ejection -- expired frames are purged beforehand.
udpard_us_t deadline;

udpard_udpip_ep_t destination;
Expand Down Expand Up @@ -356,6 +355,9 @@ struct udpard_tx_t
/// able to avoid frame duplication and instead reuse each frame across all interfaces.
size_t mtu[UDPARD_IFACE_COUNT_MAX];

/// Transfers are enqueued only on this subset of UDPARD_IFACE_BITMAP_ALL (applied at push); zero = listen-only.
uint16_t iface_bitmap;

/// Optional user-managed mapping from the Cyphal priority level in [0,7] (highest priority at index 0)
/// to the IP DSCP field value for use by the application when transmitting. By default, all entries are zero.
uint_least8_t dscp_value_per_priority[UDPARD_PRIORITY_COUNT];
Expand Down Expand Up @@ -403,11 +405,14 @@ struct udpard_tx_t
/// If the limit is reached, the library will apply heuristics to sacrifice some older transfers to make room
/// for the new one. This behavior allows the library to make progress even when some interfaces are stalled.
///
/// iface_bitmap must be a subset of UDPARD_IFACE_BITMAP_ALL (zero = listen-only); see its field docs.
///
/// True on success, false if any of the arguments are invalid.
bool udpard_tx_new(udpard_tx_t* const self,
const uint64_t local_uid,
const uint64_t unicast_transfer_id_seed,
const size_t enqueued_frames_limit,
const uint16_t iface_bitmap,
const udpard_tx_mem_resources_t memory,
const udpard_tx_vtable_t* const vtable);

Expand All @@ -425,7 +430,7 @@ bool udpard_tx_new(udpard_tx_t* const self,
/// Excess most significant bits are ignored.
/// Related thread on random transfer-ID init: https://forum.opencyphal.org/t/improve-the-transfer-id-timeout/2375
///
/// The enqueued transfer will be emitted over all interfaces specified in the iface_bitmap.
/// The transfer is emitted over iface_bitmap masked by udpard_tx_t.iface_bitmap; an empty result returns false.
///
/// The user context value is carried through to the callbacks.
///
Expand All @@ -448,6 +453,7 @@ bool udpard_tx_push(udpard_tx_t* const self,
/// This is a specialization of the general push function for unicast transfers.
/// The transfer-ID counter is managed automatically.
/// Endpoints may be empty (zero) for some ifaces, in which case no transmission over those ifaces will be attempted.
/// The iface set is also masked by udpard_tx_t.iface_bitmap.
bool udpard_tx_push_unicast(udpard_tx_t* const self,
const udpard_us_t now,
const udpard_us_t deadline,
Expand All @@ -466,7 +472,7 @@ void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now, const uint16

/// Returns a bitmap of interfaces that have pending transmissions. This is useful for IO multiplexing loops.
/// Zero indicates that there are no pending transmissions.
/// Which interfaces are usable is defined by the remote endpoints provided when pushing transfers.
/// Which interfaces can carry transfers is set at push time by the remote endpoints and udpard_tx_t.iface_bitmap.
uint16_t udpard_tx_pending_ifaces(const udpard_tx_t* const self);

/// When a datagram is ejected and the application opts to keep it, these functions must be used to manage the
Expand All @@ -475,6 +481,7 @@ void udpard_tx_refcount_inc(const udpard_bytes_t tx_payload_view);
void udpard_tx_refcount_dec(const udpard_bytes_t tx_payload_view);

/// Drops all enqueued items; afterward, the instance is safe to discard.
/// Any references retained via udpard_tx_refcount_inc() must be released beforehand.
void udpard_tx_free(udpard_tx_t* const self);

// =====================================================================================================================
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test_e2e_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void test_subject_roundtrip()
res = instrumented_allocator_make_resource(&tx_alloc_payload);
}
udpard_tx_t tx{};
TEST_ASSERT_TRUE(udpard_tx_new(&tx, 0x1010101010101010ULL, 123U, 32U, tx_mem, &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx, 0x1010101010101010ULL, 123U, 32U, UDPARD_IFACE_BITMAP_ALL, tx_mem, &tx_vtable));
tx.mtu[0] = 256U;
tx.mtu[1] = 256U;
tx.mtu[2] = 256U;
Expand Down
36 changes: 28 additions & 8 deletions tests/src/test_e2e_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ void test_zero_payload_transfer()

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x1111222233334444ULL, 123U, 8U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx,
0x1111222233334444ULL,
123U,
8U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(tx_alloc_transfer, tx_alloc_payload),
&tx_vtable));
tx.mtu[0] = 128U;
tx.mtu[1] = 128U;
tx.mtu[2] = 128U;
Expand Down Expand Up @@ -182,8 +187,13 @@ void test_out_of_order_multiframe_reassembly()

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0xAAAABBBBCCCCDDDDULL, 321U, 32U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx,
0xAAAABBBBCCCCDDDDULL,
321U,
32U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(tx_alloc_transfer, tx_alloc_payload),
&tx_vtable));
tx.mtu[0] = 96U;
tx.mtu[1] = 96U;
tx.mtu[2] = 96U;
Expand Down Expand Up @@ -259,8 +269,13 @@ void test_stateless_single_frame_acceptance()

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x1234123412341234ULL, 777U, 8U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx,
0x1234123412341234ULL,
777U,
8U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(tx_alloc_transfer, tx_alloc_payload),
&tx_vtable));
tx.mtu[0] = 128U;
tx.mtu[1] = 128U;
tx.mtu[2] = 128U;
Expand Down Expand Up @@ -325,8 +340,13 @@ void test_stateless_multiframe_first_frame_handling(const std::size_t extent, co

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x5555666677778888ULL, 999U, 16U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx,
0x5555666677778888ULL,
999U,
16U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(tx_alloc_transfer, tx_alloc_payload),
&tx_vtable));
tx.mtu[0] = 128U;
tx.mtu[1] = 128U;
tx.mtu[2] = 128U;
Expand Down
9 changes: 7 additions & 2 deletions tests/src/test_e2e_random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ void test_randomized_deduplication()
instrumented_allocator_new(&tx_alloc_payload);
udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x1010101010101010ULL, 123U, 512U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&tx,
0x1010101010101010ULL,
123U,
512U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(tx_alloc_transfer, tx_alloc_payload),
&tx_vtable));
tx.mtu[0] = 192U;
tx.mtu[1] = 192U;
tx.mtu[2] = 192U;
Expand Down
9 changes: 7 additions & 2 deletions tests/src/test_e2e_responses.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ void test_unicast_response_roundtrip()
instrumented_allocator_new(&b_tx_payload);
udpard_tx_t b_tx{};
std::vector<CapturedFrame> b_frames;
TEST_ASSERT_TRUE(
udpard_tx_new(&b_tx, 0xBBBBBBBBBBBBBBBBULL, 10U, 16U, make_tx_mem(b_tx_transfer, b_tx_payload), &tx_vtable));
TEST_ASSERT_TRUE(udpard_tx_new(&b_tx,
0xBBBBBBBBBBBBBBBBULL,
10U,
16U,
UDPARD_IFACE_BITMAP_ALL,
make_tx_mem(b_tx_transfer, b_tx_payload),
&tx_vtable));
b_tx.mtu[0] = 256U;
b_tx.mtu[1] = 256U;
b_tx.mtu[2] = 256U;
Expand Down
Loading
Loading