aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorCalvin Rose <calsrose@gmail.com>2026-02-20 14:45:32 -0600
committerCalvin Rose <calsrose@gmail.com>2026-02-20 14:47:00 -0600
commitca9ffaa5bb7ae19e158c555eb7a2dfc6c8bd66b2 (patch)
treeabeb45fff847e498e1315ebf7a27286ecdeb037d
parentRemove older extra channel unlocks. (diff)
Avoid memory leak when canceling fibers with threaded channels.
Objects in channels are sent as messages that need to be freed by the consumer. However, in certain cases, no consumer is available and the messages were being discarded without properly being freed. This should also fix `-fsanitize=address` on GCC and CLANG with the default test suite.
-rw-r--r--src/core/ev.c26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/core/ev.c b/src/core/ev.c
index 79b9c02c..19d4b39a 100644
--- a/src/core/ev.c
+++ b/src/core/ev.c
@@ -524,9 +524,9 @@ static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal s
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
if (soon) {
- janet_q_push_head(&janet_vm.spawn, &t, sizeof(t));
+ janet_assert(!janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
} else {
- janet_q_push(&janet_vm.spawn, &t, sizeof(t));
+ janet_assert(!janet_q_push(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
}
}
@@ -959,11 +959,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
janet_schedule(fiber, janet_wrap_nil());
}
} else if (mode != JANET_CP_MODE_CLOSE) {
- /* Fiber has already been cancelled or resumed. */
+ /* Fiber has already been canceled or resumed. */
/* Resend event to another waiting thread, depending on mode */
int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
if (is_read) {
JanetChannelPending reader;
+ int sent = 0;
while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
JanetVM *vm = reader.thread;
if (!vm) continue;
@@ -974,8 +975,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
msg.argp = channel;
msg.argj = x;
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
+ sent = 1;
break;
}
+ if (!sent) {
+ janet_chan_unpack(channel, &x, 1);
+ }
} else {
JanetChannelPending writer;
while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
@@ -1001,14 +1006,14 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) {
JanetChannelPending reader;
int is_empty;
- if (janet_chan_pack(channel, &x)) {
- janet_chan_unlock(channel);
- janet_panicf("failed to pack value for channel: %v", x);
- }
if (channel->closed) {
janet_chan_unlock(channel);
janet_panic("cannot write to closed channel");
}
+ if (janet_chan_pack(channel, &x)) {
+ janet_chan_unlock(channel);
+ janet_panicf("failed to pack value for channel: %v", x);
+ }
int is_threaded = janet_chan_is_threaded(channel);
if (is_threaded) {
/* don't dereference fiber from another thread */
@@ -1021,6 +1026,7 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
if (is_empty) {
/* No pending reader */
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
+ janet_chan_unpack(channel, &x, 1);
janet_chan_unlock(channel);
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
@@ -1054,6 +1060,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
msg.argj = x;
if (vm) {
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
+ } else {
+ /* If no vm to send to, we must clean up (unpack) the packed payload to avoid leak */
+ janet_chan_unpack(channel, &x, 1);
}
} else {
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
@@ -1458,11 +1467,12 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) {
int32_t limit = janet_unmarshal_int(ctx);
int32_t count = janet_unmarshal_int(ctx);
if (count < 0) janet_panic("invalid negative channel count");
+ if (count > limit) janet_panic("invalid channel count");
janet_chan_init(abst, limit, 0);
abst->closed = !!is_closed;
for (int32_t i = 0; i < count; i++) {
Janet item = janet_unmarshal_janet(ctx);
- janet_q_push(&abst->items, &item, sizeof(item));
+ janet_assert(!janet_q_push(&abst->items, &item, sizeof(item)), "bad unmarshal channel");
}
return abst;
}