LF OS
Hobby operating system for amd64 with high ambitions
Loading...
Searching...
No Matches
mq.c
Go to the documentation of this file.
1#include <mq.h>
2#include <string.h>
3#include <tpa.h>
4#include <flexarray.h>
5#include <panic.h>
6#include <errno.h>
7#include <scheduler.h>
8
10static const size_t MessageQueuePageItemsTarget = 16;
11
12static tpa_t* mqs;
13static uint64_t next_mq = 1;
14
18 size_t allocated;
19
21 size_t bytes;
22
24 size_t items;
25
28
31
34};
35
64
66 mqs = tpa_new(alloc, sizeof(struct MessageQueue), 4080, 0);
67}
68
70 if(!next_mq) {
71 panic_message("Mutex namespace overflow!");
72 }
73
74 struct MessageQueue mq = {
75 .max_items = 0,
76 .max_bytes = 0,
77 .bytes = 0,
78 .items = 0,
79 .average_message_size = 0,
80
81 .alloc = alloc,
82
83 .first_page = 0,
84 .last_page = 0,
85 .notify_teardown = new_flexarray(sizeof(mq_notifier), 0, alloc),
86 };
87
89
90 return next_mq++;
91}
92
94 struct MessageQueue* data = (struct MessageQueue*)tpa_get(mqs, mq);
95
96 if(!data) {
97 return;
98 }
99
100 size_t teardown_notifiers = flexarray_length(data->notify_teardown);
101 for(size_t i = 0; i < teardown_notifiers; ++i) {
102 mq_notifier notif = 0;
103 flexarray_get(data->notify_teardown, &notif, i);
104 if(notif) {
105 notif(mq);
106 }
107 }
108
109 struct MessageQueuePage* current = data->first_page;
110 while(current) {
111 struct MessageQueuePage* next = current->next;
112 data->alloc->dealloc(data->alloc, current);
113 current = next;
114 }
115
117
118 tpa_set(mqs, mq, 0);
119}
120
121static void mq_alloc_page(struct MessageQueue* mq, size_t min_size) {
122 size_t alloc_size = mq->average_message_size * MessageQueuePageItemsTarget;
123
124 if(alloc_size < min_size) {
125 alloc_size = min_size; // let's hope this size is uncommon and next page has average messages again
126 }
127
128 struct MessageQueuePage* page = (struct MessageQueuePage*)mq->alloc->alloc(mq->alloc, alloc_size + sizeof(struct MessageQueuePage));
129 memset(page, 0, alloc_size + sizeof(struct MessageQueuePage));
130
131 page->allocated = alloc_size;
132
133 if(!mq->first_page) {
134 mq->first_page = page;
135 }
136
137 if(mq->last_page) {
138 mq->last_page->next = page;
139 }
140
141 mq->last_page = page;
142}
143
145 struct MessageQueue* data = (struct MessageQueue*)tpa_get(mqs, mq);
146
147 if(!data) {
148 return ENOENT;
149 }
150
151 if((
152 data->max_bytes &&
153 data->bytes + message->size >= data->max_bytes
154 ) ||
155 (
156 data->max_items &&
157 data->items >= data->max_items
158 )
159 ) {
160 return ENOMEM;
161 }
162
164 if(data->average_message_size != message->size) {
165 data->average_message_size /= 2;
166 }
167
168 if(!data->last_page ||
170 mq_alloc_page(data, message->size);
171 }
172
173 void* message_pos = (void*)((uint64_t)data->last_page + data->last_page->push_position + sizeof(struct MessageQueuePage));
174 memcpy(message_pos, message, message->size); // TODO: either validate size or make sure only the process crashes
175
177 data->last_page->bytes += message->size;
178 ++data->last_page->items;
179
180 data->bytes += message->size;
181 ++data->items;
182
183 union wait_data wd;
184 wd.message_queue = mq;
186
187 return 0;
188}
189
192 if((error = mq_peek(mq, msg))) {
193 return error;
194 }
195
196 struct MessageQueue* data = (struct MessageQueue*)tpa_get(mqs, mq);
197
198 data->first_page->pop_position += msg->size;
199 data->first_page->bytes -= msg->size;
200 --data->first_page->items;
201
202 data->bytes -= msg->size;
203 data->items--;
204
205 if(!data->first_page->items) {
206 if(data->first_page == data->last_page) {
207 data->last_page = 0;
208 }
209
210 struct MessageQueuePage* next = data->first_page->next;
211 data->alloc->dealloc(data->alloc, data->first_page);
212 data->first_page = next;
213 }
214
215 return 0;
216}
217
219 struct MessageQueue* data = (struct MessageQueue*)tpa_get(mqs, mq);
220
221 if(!data) {
222 return ENOENT;
223 }
224
225 if(!data->items) {
226 return ENOMSG;
227 }
228
229 struct Message* msg_in_page = (struct Message*)((uint64_t)data->first_page + data->first_page->pop_position + sizeof(struct MessageQueuePage));
230
231 if(msg_in_page->size <= msg->size) {
232 memcpy(msg, msg_in_page, msg_in_page->size);
233 return 0;
234 }
235 else {
236 msg->size = msg_in_page->size;
237 msg->type = MT_Invalid;
238 return EMSGSIZE;
239 }
240}
241
243 struct MessageQueue* data = (struct MessageQueue*)tpa_get(mqs, mq);
244
245 if(!data) {
246 return ENOENT;
247 }
248
249 size_t teardown_notifiers = flexarray_length(data->notify_teardown);
250 for(size_t i = 0; i < teardown_notifiers; ++i) {
251 mq_notifier existing = 0;
252 flexarray_get(data->notify_teardown, &existing, i);
253 if(existing == notifier) {
254 return EEXIST;
255 }
256 }
257
258 flexarray_append(data->notify_teardown, &notifier);
259 return 0;
260}
unsigned long uint64_t
Definition arch.h:14
#define ENOENT
Definition errno-defs.h:5
#define EEXIST
Definition errno-defs.h:14
#define ENOMEM
Definition errno-defs.h:11
#define EMSGSIZE
Definition errno-defs.h:33
#define ENOMSG
Definition errno-defs.h:29
flexarray_t new_flexarray(size_t member_size, size_t initial_alloc, allocator_t *alloc)
Definition flexarray.c:14
void delete_flexarray(flexarray_t array)
Definition flexarray.c:29
uint64_t flexarray_append(flexarray_t array, void *data)
Definition flexarray.c:56
void flexarray_get(flexarray_t array, void *buffer, uint64_t idx)
Definition flexarray.c:110
size_t flexarray_length(flexarray_t array)
Definition flexarray.c:102
void * memcpy(void *dest, void const *source, size_t size)
Definition string.c:80
void * memset(void *dest, int c, size_t size)
Definition string.c:72
allocator_t * alloc
size_t size
Size of the message, including metadata.
@ MT_Invalid
Invalid message, only size is valid.
enum MessageType type
Type of the message.
size_t bytes
bytes active in this page (messages not yet popped)
Definition mq.c:21
static void mq_alloc_page(struct MessageQueue *mq, size_t min_size)
Definition mq.c:121
uint64_t mq_push(uint64_t mq, struct Message *message)
Definition mq.c:144
size_t max_items
Maximum number of items in the queue after which mq_push will return false.
Definition mq.c:39
size_t bytes
Counting active-message bytes.
Definition mq.c:45
size_t average_message_size
Average message size, for new page allocation.
Definition mq.c:51
uint64_t mq_notify_teardown(mq_id_t mq, mq_notifier notifier)
Definition mq.c:242
struct MessageQueuePage * next
Pointer to next page.
Definition mq.c:33
uint64_t mq_pop(uint64_t mq, struct Message *msg)
Definition mq.c:190
uint64_t mq_peek(uint64_t mq, struct Message *msg)
Definition mq.c:218
static tpa_t * mqs
Definition mq.c:12
size_t items
Counting active messages.
Definition mq.c:48
uint64_t mq_create(allocator_t *alloc)
Definition mq.c:69
allocator_t * alloc
Allocator for new pages.
Definition mq.c:54
struct MessageQueuePage * first_page
Pointer to first page for popping and peeking.
Definition mq.c:57
static const size_t MessageQueuePageItemsTarget
Target number of messages per page, muliplied by average message size for allocation size of new page...
Definition mq.c:10
size_t items
items active in this page (messages not yet popped)
Definition mq.c:24
struct MessageQueuePage * last_page
Pointer to last page for pushing.
Definition mq.c:60
flexarray_t notify_teardown
Definition mq.c:62
void init_mq(allocator_t *alloc)
Definition mq.c:65
size_t max_bytes
Maximum number of bytes in non-popped messsages in the queue after which mq_push will return false.
Definition mq.c:42
void mq_destroy(uint64_t mq)
Definition mq.c:93
size_t allocated
allocated bytes for the page, excluding header
Definition mq.c:18
static uint64_t next_mq
Definition mq.c:13
size_t push_position
first free byte in page
Definition mq.c:30
size_t pop_position
first byte of first non-popped message
Definition mq.c:27
This is the implementation data for a message queue.
Definition mq.c:37
This is the header for a message queue page.
Definition mq.c:16
void(* mq_notifier)(mq_id_t mq)
Definition mq.h:11
uint64_t mq_id_t
Definition mq.h:9
void panic_message(const char *message)
Definition panic.c:64
void scheduler_waitable_done(enum wait_reason reason, union wait_data data, size_t max_amount)
Definition scheduler.c:363
uint64_t message_queue
Definition scheduler.h:28
@ wait_reason_message
Definition scheduler.h:16
void(* dealloc)(struct allocator *alloc, void *mem)
Definition allocator.h:10
static bool size_t size_t uint64_t * mq
Definition syscalls.h:303
static bool struct Message * message
Definition syscalls.h:338
static uint16_t bool uint64_t * error
Definition syscalls.h:126
tpa_t * tpa_new(allocator_t *alloc, uint64_t entry_size, uint64_t page_size, tpa_t *tpa)
Definition tpa.c:48
void * tpa_get(tpa_t *tpa, uint64_t idx)
Definition tpa.c:144
void tpa_set(tpa_t *tpa, uint64_t idx, void *data)
Definition tpa.c:174
Header of a TPA.
Definition tpa.c:34