aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ev.c26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/core/ev.c b/src/core/ev.c
index 79b9c02c..19d4b39a 100644
--- a/src/core/ev.c
+++ b/src/core/ev.c
@@ -524,9 +524,9 @@ static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal s
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
if (soon) {
- janet_q_push_head(&janet_vm.spawn, &t, sizeof(t));
+ janet_assert(!janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
} else {
- janet_q_push(&janet_vm.spawn, &t, sizeof(t));
+ janet_assert(!janet_q_push(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
}
}
@@ -959,11 +959,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
janet_schedule(fiber, janet_wrap_nil());
}
} else if (mode != JANET_CP_MODE_CLOSE) {
- /* Fiber has already been cancelled or resumed. */
+ /* Fiber has already been canceled or resumed. */
/* Resend event to another waiting thread, depending on mode */
int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
if (is_read) {
JanetChannelPending reader;
+ int sent = 0;
while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
JanetVM *vm = reader.thread;
if (!vm) continue;
@@ -974,8 +975,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
msg.argp = channel;
msg.argj = x;
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
+ sent = 1;
break;
}
+ if (!sent) {
+ janet_chan_unpack(channel, &x, 1);
+ }
} else {
JanetChannelPending writer;
while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
@@ -1001,14 +1006,14 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) {
JanetChannelPending reader;
int is_empty;
- if (janet_chan_pack(channel, &x)) {
- janet_chan_unlock(channel);
- janet_panicf("failed to pack value for channel: %v", x);
- }
if (channel->closed) {
janet_chan_unlock(channel);
janet_panic("cannot write to closed channel");
}
+ if (janet_chan_pack(channel, &x)) {
+ janet_chan_unlock(channel);
+ janet_panicf("failed to pack value for channel: %v", x);
+ }
int is_threaded = janet_chan_is_threaded(channel);
if (is_threaded) {
/* don't dereference fiber from another thread */
@@ -1021,6 +1026,7 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
if (is_empty) {
/* No pending reader */
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
+ janet_chan_unpack(channel, &x, 1);
janet_chan_unlock(channel);
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
@@ -1054,6 +1060,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
msg.argj = x;
if (vm) {
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
+ } else {
+ /* If no vm to send to, we must clean up (unpack) the packed payload to avoid leak */
+ janet_chan_unpack(channel, &x, 1);
}
} else {
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
@@ -1458,11 +1467,12 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) {
int32_t limit = janet_unmarshal_int(ctx);
int32_t count = janet_unmarshal_int(ctx);
if (count < 0) janet_panic("invalid negative channel count");
+ if (count > limit) janet_panic("invalid channel count");
janet_chan_init(abst, limit, 0);
abst->closed = !!is_closed;
for (int32_t i = 0; i < count; i++) {
Janet item = janet_unmarshal_janet(ctx);
- janet_q_push(&abst->items, &item, sizeof(item));
+ janet_assert(!janet_q_push(&abst->items, &item, sizeof(item)), "bad unmarshal channel");
}
return abst;
}