The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

Использование select() для построения очереди сообщений (select gcc file io syscall linux)


<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>
Ключевые слова: select, gcc, file, io, syscall, linux,  (найти похожие документы)
From: Андрей Киселев <kis_an [at] linuxgazette [dot] ru> Newsgroups: Russian Linux Gazette Date: Mon, 20 Sep 2004 18:21:07 +0000 (UTC) Subject: Использование select() для построения очереди сообщений Оригинал: http://gazette.linux.ru.net/lg92/hawk.html select() для очереди сообщений Автор: Hyouck "Hawk" Kim Перевод: Андрей Киселев Введение Самое неудобное, при работе с файлами или сокетами и одновременно с очередями сообщений, заключается в отсутствии поддержки системным вызовом select() очередей сообщений. Поэтому, Unix-программисты обычно решают проблему примерно таким, довольно "корявым", способом while(1) { select для сокета с таймаутом; ... wait для очереди сообщений с флагом IPC_NOWAIT } Безусловно, такая реализация выглядит довольно уродливо. И мне она не нравится. Как один из вариантов решения проблемы можно было бы рассмотреть многопоточность. Но в данной статье я хотел бы продемонстрировать вам не совсем обычный подход, суть которого сводится к реализации нового системного вызова с именем msgqToFd(). Я вовсе не пытаюсь продемонстрировать вам завершенный и безошибочный код ядра. Я лишь хочу представить на ваш суд результаты моих экспериментов. Эта статья могла бы быть полезной для любителей поиграть с исходными текстами ядра GNU/Linux. msgqToFd() - Новый, нестандартный системный вызов Сигнатура вызова: int msgqToFd(int msgq_id) Возвращает файловый дескриптор, соответствующий очереди сообщений, который может быть использован системным вызовом select(). При возникновении какой либо ошибки -- возвращается значение -1. Приложение может использовать этот вызов примерно таким образом: ... q_fd = msgqToFd(msgq_id); while(1) { FD_ZERO(&rset); FD_SET(0, &rset); FD_SET(q_fd, &rset); select(q_fd + 1, &rset, NULL, NULL, NULL); if(FD_ISSET(0, &rset)) { ... } if(FD_ISSET(q_fd, &rset)) { r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); ... } } Как работает select() Дескриптор файла связан с соответствующей структурой типа file (include/linux/fs.h). В структуре file имеется поле struct file_operations, которое определяет набор операций, которые могут выполняться над данным файлом. В структуре file_operations имеется поле с именем poll. Которое, в общем случае, содержит ссылку на системный вызов select(), который в свою очередь вызывает функцию poll(), чтобы получить состояние файла (или сокета, или чего-то еще). В общих чертах системный вызов select() работает примерно так: while(1) { для каждого файлового дескриптора из данного набора { вызов функции poll(), чтобы получить маску -- mask. if(mask & can_read or mask & can_write or mask & exception) { установить бит для этого fd т.к. этот файл доступен на чтение/запись либо возникло исключение. retval++; } } if(retval != 0) break; schedule_timeout(__timeout); } Саму реализацию системного вызова select() вы найдете в виде функций sys_select() и do_select() в файле fs/select.c. в исходном коде ядра. Еще необходимо упомянуть о функции poll_wait(). Она помещает текущий процесс в очередь ожидания, которая предусмотрена теми или иными средствами ядра, такими как файлы, каналы, сокеты или, как в нашем случае, очередью сообщений. Обратите внимание -- текущий процесс может быть размещен в нескольких очередях ожидания вызовом select() long sys_msgqToFd(long msqid) Системный вызов должен возвращать файловый дескриптор, соответствующий очереди сообщений. Файловый дескриптор должен указывать на структуру file, которая содержит file_operations -- список операций для манипуляций с очередью сообщений. Что должна делать функция sys_msgqToFd() 1. для заданного msqid, отыскать соответствующую struct msg_queue 2. разместить новый inode вызовом get_msgq_inode() 3. разместить новый файловый дескриптор вызовом get_unused_fd() 4. разместить новую структуру file вызовом get_empty_filp() 5. инициировать inode и структуру file 6. записать ссылку на msgq_file_ops в поле file_operations 7. записать в поле private_data, структуры file, msq->q_perm.key 8. установить fd (дескриптор) и структуру file, вызовом fd_install() 9. вернуть новый fd Реализацию этого системного вызова вы найдете в файлах msg.c и msg.h, которые должны сопровождать эту статью. А так же загляните в файл sys_i386.c msgq_poll() реализация msgq_poll() очень проста. Что она делает 1. Находит очередь сообщений по ключу file->private_data. 2. помещает текущий процесс в очередь ожидания, соответствующую данной очереди сообщений, вызовом poll_wait() 3. если очередь сообщений пуста (msq->q_qnum == 0), установить маску, как доступную для записи (этому есть свое объяснение, но давайте пока забудем об этом). Иначе -- как доступную на чтение 4. вернуть маску Изменения существующего исходного кода очередей сообщений Чтобы добавить поддержку poll() для очередей сообщений, нам необходимо внести некоторые изменения в существующий исходный код. Эти изменения включают в себя 1. добавление поля wait_queue_head в struct msg_queue, которое будет использоваться при размещении процесса в очереди ожидания вызовом select(). Дополнительно необходимо предусмотреть инициализацию этого поля при создании очереди сообщений. Обратите внимание на struct msg_queue и newque() в файле msg.c. 2. Всякий раз, когда в очередь помещается новое сообщение, необходимо предусмотреть активизацию ожидающего процесса, который был размещен в очереди ожидания системным вызовом select(). Смотрите функцию sys_msgsnd() в файле msg.c. 3. При удалении очереди сообщений или при изменении ее характеристик, все ожидающие процессы должны быть "разбужены". Смотрите функции sys_msgctl() и freeque() в файле msg.c. 4. Чтобы иметь возможность размещать новые inode и структуры file, необходимо установить соответствующую файловую систему 5. VFS подходит для этого как нельзя лучше. Нам необходимо лишь добавить код, который будет регистрировать новую файловую систему и выполнять некоторые настройки. Смотрите функцию msg_init() в файле msg.c. Все изменения заключены в команду препроцессора "ifdef MSGQ_POLL_SUPPORT". Так что вы достаточно легко их найдете. Сведения, касающиеся файловой системы Прежде чем разместить новую структуру file, необходимо прежде инициировать поля f_vfsmnt и f_dentry. В противном случае вы получите сообщение OOPS на консоли. Для того, чтобы VFS могла корректно работать с этой новой структурой file, необходимо выполнить дополнительные настройки, которые уже упоминались ранее. Так как в file_operations поддерживается только вызов poll(), для нас достаточно должным образом установить поля f_dentry и f_vfsmnt. Большая часть кода была скопирована из файла pipe.c. Добавление нового системного вызова Чтобы добавить новый системный вызов, необходимо сделать две вещи. Первое -- добавить реализацию системного вызова в ядро, которая будет работать на уровне ядра, что мы уже сделали (sys_msgqToFd()). В ядре GNU/Linux, все вызовы, имеющие отношение к system V IPC диспетчеризуются через функцию sys_ipc(), размещенную в файле arch/i386/kernel/sys_i386.c. Для идентификации системного вызова, sys_ipc() использует номер этого вызова. Прежде чем мы сможем обращаться к новому системному вызову, необходимо определиться с его номером (для sys_msgqToFd() это будет число 25) и добавить его обработку в функцию sys_ipc(). Просто загляните в файл arch/i386/kernel/entry.S (в исходных текстах ядра) и посмотрите на код функции sys_ipc() в файле sys_i386.c, который должен следовать с данной статьей. Второе -- добавить функцию-заглушку, которая будет вызываться пользовательскими приложениями. Практически все такие функции-заглушки, выполняющие системные вызовы, реализованы в GLIBC. Чтобы добавить новый системный вызов, вам потребуется внести изменения в GLIBC, пересобрать и установить ее. О черт!!! Нет уж, СПАСИБО!!! Меня как то не прельщает делать все это, к тому же я не думаю что и вам эта идея понравится. Чтобы как то разрешить эту проблему, я скопировал часть кода из GLIBC. Если вы заглянете в файл user/syscall_stuff.c, поставляемый с данной статьей, то вы обнаружите функцию с именем msgqToFd(), которая и является функцией-заглушкой для системного вызова msgqToFd(). Она содержит единственную строку кода return INLINE_SYSCALL(ipc, 5, 25, key, 0, 0, NULL); Ниже приводится краткое описание этого макроопределения. ipc : номер системного вызова для sys_ipc(). ipc описано как __NR_ipc и равно 117. 5 : число аргументов этого макроопределения. 25 : номер системного вызова для sys_msgqToFd() key : аргумент функции sys_msgqToFd() Макрос INLINE_SYSCALL подготавливает аргументы к передаче и вызывает прерывание 0x80 для перехода в режим ядра и осуществления системного вызова. Заключение Я не уверен в практической полезности приведенных мною изменений. Я лишь хотел убедиться в том, что такие изменения возможны. Кроме того, я хотел бы сообщить о некоторых проблемах. 1. Если два потока/процесса пытаются получить доступ к очереди сообщений, и один из них ожидает на msgrcv(), а другой -- на select(), то новое сообщение получит процесс/поток, который встал на ожидание раньше. Взгляните на pipelined_send() в msg.c. 2. Маска доступности на запись, в функции msgq_poll() устанавливается только если очередь пуста. Фактически, эту маску можно устанавливать для случая, когда очередь не заполнена до конца, и тут не будет большой разницы. Но я выбрал такую реализацию для простоты. 3. А теперь рассмотрим такой сценарий. 1. Очередь создана 2. Создан файловый дескриптор для очереди 3. Очередь удалена Что следует сделать в этом случае? Корректно было бы закрыть файловый дескриптор во время удаления очереди. Но это невозможно, поскольку очередь сообщений может быть удалена любым процессом, который имеет права на это. Это означает, что процесс, удаляющий очередь сообщений, мог и не связывать очередь с файловым дескриптором, не смотря на то, что файловый дескриптор для очереди мог быть создан другим процессом. И еще, если после удаления вновь будет создана очередь с таким же ключом, то она будет обслуживать ранее размещенный файловый дескриптор. 4. Проблема эффективности. При поступлении нового сообщения в очередь будут "разбужены" все процессы, ожидающие на вызове select(). Но сообщение получит только один из них, а остальные опять "заснут".. 5. Отсутствует поддержка типов сообщений. Независимо от типа сообщения, если оно поступило, то select() вернет управление процессу. Ошибки и исправления Найдите и исправьте сами :-) Исходный код msg.c Измененная реализация очередей сообщений http://gazette.linux.ru.net/lg92/misc/hawk/msg.c.txt msg.h Заголовочный файл для исходного кода с реализацией очередей сообщений http://gazette.linux.ru.net/lg92/misc/hawk/msg.h.txt sys_i386.c Изменения, с целью добавления нового системного вызова http://gazette.linux.ru.net/lg92/misc/hawk/sys_i386.c.txt user/Makefile Makefile для сборки тестовой программы (переименуйте из Makefile.txt в Makefile) http://gazette.linux.ru.net/lg92/misc/hawk/user/Makefile.txt user/syscall_stuff.c Функция-заглушка для msgqToFd() http://gazette.linux.ru.net/lg92/misc/hawk/user/syscall_stuff.c.txt user/msg_test.h Заголовочный файл для msgqToFd() http://gazette.linux.ru.net/lg92/misc/hawk/user/msg_test.h.txt user/msgq.c Исходный код тестовой программы http://gazette.linux.ru.net/lg92/misc/hawk/user/msgq.c.txt user/msgq2.c Еще одна тестовая программа http://gazette.linux.ru.net/lg92/misc/hawk/user/msgq2.c.txt В своих экспериментах я использовал ядро GNU/Linux 2-4-20 на платформе x86. Для сборки тестового ядра просто скопируйте msg.c в ipc/msg.c msg.h в include/linux/msg.h sys_i386.c в arch/i386/kernel/sys_i386.c пересоберите ядро и установите его!!!! Перед запуском тестовых программ необходимо создать файлы ключей: touch .msgq_key1 touch .msgq_key2 Copyright (C) 2003, Hyouck "Hawk" Kim. Copying license http://www.linuxgazette.com/copying.html Published in Issue 92 of Linux Gazette, July 2003
msg.c /* * linux/ipc/msg.c * Copyright (C) 1992 Krishna Balasubramanian * * Removed all the remaining kerneld mess * Catch the -EFAULT stuff properly * Use GFP_KERNEL for messages as in 1.2 * Fixed up the unchecked user space derefs * Copyright (C) 1998 Alan Cox & Andi Kleen * * /proc/sysvipc/msg support (c) 1999 Dragos Acostachioaie <dragos@iname.com.> * * mostly rewritten, threaded and wake-one semantics added * MSGMAX limit removed, sysctl's added * (c) 1999 Manfred Spraul <manfreds@colorfullife.com.> */ /* * Modified By Hyouck Kim for poll support. * peakhunt@yahoo.com * * The modification is totally experimental and use it at your own risk. */ #include <linux/config.h> #include <linux/slab.h> #include <linux/msg.h> #include <linux/spinlock.h> #include <linux/poll.h> #include <linux/file.h> #include <linux/fs.h> #include <linux/mount.h> #include <linux/init.h> #include <linux/module.h> #include <linux/proc_fs.h> #include <linux/list.h> #include <asm/uaccess.h> #include "util.h" #define MSGQ_POLL_SUPPORT /* sysctl: */ int msg_ctlmax = MSGMAX; int msg_ctlmnb = MSGMNB; int msg_ctlmni = MSGMNI; /* one msg_receiver structure for each sleeping receiver */ struct msg_receiver { struct list_head r_list; struct task_struct* r_tsk; int r_mode; long r_msgtype; long r_maxsize; struct msg_msg* volatile r_msg; }; /* one msg_sender for each sleeping sender */ struct msg_sender { struct list_head list; struct task_struct* tsk; }; struct msg_msgseg { struct msg_msgseg* next; /* the next part of the message follows immediately */ }; /* one msg_msg structure for each message */ struct msg_msg { struct list_head m_list; long m_type; int m_ts; /* message text size */ struct msg_msgseg* next; /* the actual message follows immediately */ }; #define DATALEN_MSG (PAGE_SIZE-sizeof(struct msg_msg)) #define DATALEN_SEG (PAGE_SIZE-sizeof(struct msg_msgseg)) /* one msq_queue structure for each present queue on the system */ struct msg_queue { struct kern_ipc_perm q_perm; time_t q_stime; /* last msgsnd time */ time_t q_rtime; /* last msgrcv time */ time_t q_ctime; /* last change time */ unsigned long q_cbytes; /* current number of bytes on queue */ unsigned long q_qnum; /* number of messages in queue */ unsigned long q_qbytes; /* max number of bytes on queue */ pid_t q_lspid; /* pid of last msgsnd */ pid_t q_lrpid; /* last receive pid */ struct list_head q_messages; struct list_head q_receivers; struct list_head q_senders; #ifdef MSGQ_POLL_SUPPORT wait_queue_head_t poll_wait; #endif }; #define SEARCH_ANY 1 #define SEARCH_EQUAL 2 #define SEARCH_NOTEQUAL 3 #define SEARCH_LESSEQUAL 4 static atomic_t msg_bytes = ATOMIC_INIT(0); static atomic_t msg_hdrs = ATOMIC_INIT(0); static struct ipc_ids msg_ids; #define msg_lock(id) ((struct msg_queue*)ipc_lock(&msg_ids,id)) #define msg_unlock(id) ipc_unlock(&msg_ids,id) #define msg_rmid(id) ((struct msg_queue*)ipc_rmid(&msg_ids,id)) #define msg_checkid(msq, msgid) \ ipc_checkid(&msg_ids,&msq->q_perm,msgid) #define msg_buildid(id, seq) \ ipc_buildid(&msg_ids, id, seq) static void freeque (int id); static int newque (key_t key, int msgflg); #ifdef CONFIG_PROC_FS static int sysvipc_msg_read_proc(char *buffer, char **start, off_t offset, int length, int *eof, void *data); #endif #ifdef MSGQ_POLL_SUPPORT static int init_msgq_fs(void); static void exit_msgq_fs(void); static void poll_wakeup_all(wait_queue_head_t* wq); #endif /* MSGQ_POLL_SUPPORT */ void __init msg_init (void) { ipc_init_ids(&msg_ids,msg_ctlmni); #ifdef MSGQ_POLL_SUPPORT init_msgq_fs(); #endif /* MSGQ_POLL_SUPPORT*/ #ifdef CONFIG_PROC_FS create_proc_read_entry("sysvipc/msg", 0, 0, sysvipc_msg_read_proc, NULL); #endif } static int newque (key_t key, int msgflg) { int id; struct msg_queue *msq; msq = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL); if (!msq) return -ENOMEM; id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni); if(id == -1) { kfree(msq); return -ENOSPC; } msq->q_perm.mode = (msgflg & S_IRWXUGO); msq->q_perm.key = key; msq->q_stime = msq->q_rtime = 0; msq->q_ctime = CURRENT_TIME; msq->q_cbytes = msq->q_qnum = 0; msq->q_qbytes = msg_ctlmnb; msq->q_lspid = msq->q_lrpid = 0; INIT_LIST_HEAD(&msq->q_messages); INIT_LIST_HEAD(&msq->q_receivers); INIT_LIST_HEAD(&msq->q_senders); #ifdef MSGQ_POLL_SUPPORT init_waitqueue_head(&msq->poll_wait); #endif /* MSGQ_POLL_SUPPORT */ msg_unlock(id); return msg_buildid(id,msq->q_perm.seq); } static void free_msg(struct msg_msg* msg) { struct msg_msgseg* seg; seg = msg->next; kfree(msg); while(seg != NULL) { struct msg_msgseg* tmp = seg->next; kfree(seg); seg = tmp; } } static struct msg_msg* load_msg(void* src, int len) { struct msg_msg* msg; struct msg_msgseg** pseg; int err; int alen; alen = len; if(alen > DATALEN_MSG) alen = DATALEN_MSG; msg = (struct msg_msg *) kmalloc (sizeof(*msg) + alen, GFP_KERNEL); if(msg==NULL) return ERR_PTR(-ENOMEM); msg->next = NULL; if (copy_from_user(msg+1, src, alen)) { err = -EFAULT; goto out_err; } len -= alen; src = ((char*)src)+alen; pseg = &msg->next; while(len > 0) { struct msg_msgseg* seg; alen = len; if(alen > DATALEN_SEG) alen = DATALEN_SEG; seg = (struct msg_msgseg *) kmalloc (sizeof(*seg) + alen, GFP_KERNEL); if(seg==NULL) { err=-ENOMEM; goto out_err; } *pseg = seg; seg->next = NULL; if(copy_from_user (seg+1, src, alen)) { err = -EFAULT; goto out_err; } pseg = &seg->next; len -= alen; src = ((char*)src)+alen; } return msg; out_err: free_msg(msg); return ERR_PTR(err); } static int store_msg(void* dest, struct msg_msg* msg, int len) { int alen; struct msg_msgseg *seg; alen = len; if(alen > DATALEN_MSG) alen = DATALEN_MSG; if(copy_to_user (dest, msg+1, alen)) return -1; len -= alen; dest = ((char*)dest)+alen; seg = msg->next; while(len > 0) { alen = len; if(alen > DATALEN_SEG) alen = DATALEN_SEG; if(copy_to_user (dest, seg+1, alen)) return -1; len -= alen; dest = ((char*)dest)+alen; seg=seg->next; } return 0; } static inline void ss_add(struct msg_queue* msq, struct msg_sender* mss) { mss->tsk=current; current->state=TASK_INTERRUPTIBLE; list_add_tail(&mss->list,&msq->q_senders); } static inline void ss_del(struct msg_sender* mss) { if(mss->list.next != NULL) list_del(&mss->list); } static void ss_wakeup(struct list_head* h, int kill) { struct list_head *tmp; tmp = h->next; while (tmp != h) { struct msg_sender* mss; mss = list_entry(tmp,struct msg_sender,list); tmp = tmp->next; if(kill) mss->list.next=NULL; wake_up_process(mss->tsk); } } static void expunge_all(struct msg_queue* msq, int res) { struct list_head *tmp; tmp = msq->q_receivers.next; while (tmp != &msq->q_receivers) { struct msg_receiver* msr; msr = list_entry(tmp,struct msg_receiver,r_list); tmp = tmp->next; msr->r_msg = ERR_PTR(res); wake_up_process(msr->r_tsk); } } static void freeque (int id) { struct msg_queue *msq; struct list_head *tmp; msq = msg_rmid(id); expunge_all(msq,-EIDRM); ss_wakeup(&msq->q_senders,1); #ifdef MSGQ_POLL_SUPPORT poll_wakeup_all(&msq->poll_wait); #endif /*MSGQ_POLL_SUPPORT*/ msg_unlock(id); tmp = msq->q_messages.next; while(tmp != &msq->q_messages) { struct msg_msg* msg = list_entry(tmp,struct msg_msg,m_list); tmp = tmp->next; atomic_dec(&msg_hdrs); free_msg(msg); } atomic_sub(msq->q_cbytes, &msg_bytes); kfree(msq); } asmlinkage long sys_msgget (key_t key, int msgflg) { int id, ret = -EPERM; struct msg_queue *msq; down(&msg_ids.sem); if (key == IPC_PRIVATE) ret = newque(key, msgflg); else if ((id = ipc_findkey(&msg_ids, key)) == -1) { /* key not used */ if (!(msgflg & IPC_CREAT)) ret = -ENOENT; else ret = newque(key, msgflg); } else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) { ret = -EEXIST; } else { msq = msg_lock(id); if(msq==NULL) BUG(); if (ipcperms(&msq->q_perm, msgflg)) ret = -EACCES; else ret = msg_buildid(id, msq->q_perm.seq); msg_unlock(id); } up(&msg_ids.sem); return ret; } static inline unsigned long copy_msqid_to_user(void *buf, struct msqid64_ds *in, int version) { switch(version) { case IPC_64: return copy_to_user (buf, in, sizeof(*in)); case IPC_OLD: { struct msqid_ds out; memset(&out,0,sizeof(out)); ipc64_perm_to_ipc_perm(&in->msg_perm, &out.msg_perm); out.msg_stime = in->msg_stime; out.msg_rtime = in->msg_rtime; out.msg_ctime = in->msg_ctime; if(in->msg_cbytes > USHRT_MAX) out.msg_cbytes = USHRT_MAX; else out.msg_cbytes = in->msg_cbytes; out.msg_lcbytes = in->msg_cbytes; if(in->msg_qnum > USHRT_MAX) out.msg_qnum = USHRT_MAX; else out.msg_qnum = in->msg_qnum; if(in->msg_qbytes > USHRT_MAX) out.msg_qbytes = USHRT_MAX; else out.msg_qbytes = in->msg_qbytes; out.msg_lqbytes = in->msg_qbytes; out.msg_lspid = in->msg_lspid; out.msg_lrpid = in->msg_lrpid; return copy_to_user (buf, &out, sizeof(out)); } default: return -EINVAL; } } struct msq_setbuf { unsigned long qbytes; uid_t uid; gid_t gid; mode_t mode; }; static inline unsigned long copy_msqid_from_user(struct msq_setbuf *out, void *buf, int version) { switch(version) { case IPC_64: { struct msqid64_ds tbuf; if (copy_from_user (&tbuf, buf, sizeof (tbuf))) return -EFAULT; out->qbytes = tbuf.msg_qbytes; out->uid = tbuf.msg_perm.uid; out->gid = tbuf.msg_perm.gid; out->mode = tbuf.msg_perm.mode; return 0; } case IPC_OLD: { struct msqid_ds tbuf_old; if (copy_from_user (&tbuf_old, buf, sizeof (tbuf_old))) return -EFAULT; out->uid = tbuf_old.msg_perm.uid; out->gid = tbuf_old.msg_perm.gid; out->mode = tbuf_old.msg_perm.mode; if(tbuf_old.msg_qbytes == 0) out->qbytes = tbuf_old.msg_lqbytes; else out->qbytes = tbuf_old.msg_qbytes; return 0; } default: return -EINVAL; } } asmlinkage long sys_msgctl (int msqid, int cmd, struct msqid_ds *buf) { int err, version; struct msg_queue *msq; struct msq_setbuf setbuf; struct kern_ipc_perm *ipcp; if (msqid < 0 || cmd < 0) return -EINVAL; version = ipc_parse_version(&cmd); switch (cmd) { case IPC_INFO: case MSG_INFO: { struct msginfo msginfo; int max_id; if (!buf) return -EFAULT; /* We must not return kernel stack data. * due to padding, it's not enough * to set all member fields. */ memset(&msginfo,0,sizeof(msginfo)); msginfo.msgmni = msg_ctlmni; msginfo.msgmax = msg_ctlmax; msginfo.msgmnb = msg_ctlmnb; msginfo.msgssz = MSGSSZ; msginfo.msgseg = MSGSEG; down(&msg_ids.sem); if (cmd == MSG_INFO) { msginfo.msgpool = msg_ids.in_use; msginfo.msgmap = atomic_read(&msg_hdrs); msginfo.msgtql = atomic_read(&msg_bytes); } else { msginfo.msgmap = MSGMAP; msginfo.msgpool = MSGPOOL; msginfo.msgtql = MSGTQL; } max_id = msg_ids.max_id; up(&msg_ids.sem); if (copy_to_user (buf, &msginfo, sizeof(struct msginfo))) return -EFAULT; return (max_id < 0) ? 0: max_id; } case MSG_STAT: case IPC_STAT: { struct msqid64_ds tbuf; int success_return; if (!buf) return -EFAULT; if(cmd == MSG_STAT && msqid >= msg_ids.size) return -EINVAL; memset(&tbuf,0,sizeof(tbuf)); msq = msg_lock(msqid); if (msq == NULL) return -EINVAL; if(cmd == MSG_STAT) { success_return = msg_buildid(msqid, msq->q_perm.seq); } else { err = -EIDRM; if (msg_checkid(msq,msqid)) goto out_unlock; success_return = 0; } err = -EACCES; if (ipcperms (&msq->q_perm, S_IRUGO)) goto out_unlock; kernel_to_ipc64_perm(&msq->q_perm, &tbuf.msg_perm); tbuf.msg_stime = msq->q_stime; tbuf.msg_rtime = msq->q_rtime; tbuf.msg_ctime = msq->q_ctime; tbuf.msg_cbytes = msq->q_cbytes; tbuf.msg_qnum = msq->q_qnum; tbuf.msg_qbytes = msq->q_qbytes; tbuf.msg_lspid = msq->q_lspid; tbuf.msg_lrpid = msq->q_lrpid; msg_unlock(msqid); if (copy_msqid_to_user(buf, &tbuf, version)) return -EFAULT; return success_return; } case IPC_SET: if (!buf) return -EFAULT; if (copy_msqid_from_user (&setbuf, buf, version)) return -EFAULT; break; case IPC_RMID: break; default: return -EINVAL; } down(&msg_ids.sem); msq = msg_lock(msqid); err=-EINVAL; if (msq == NULL) goto out_up; err = -EIDRM; if (msg_checkid(msq,msqid)) goto out_unlock_up; ipcp = &msq->q_perm; err = -EPERM; if (current->euid != ipcp->cuid && current->euid != ipcp->uid && !capable(CAP_SYS_ADMIN)) /* We _could_ check for CAP_CHOWN above, but we don't */ goto out_unlock_up; switch (cmd) { case IPC_SET: { if (setbuf.qbytes > msg_ctlmnb && !capable(CAP_SYS_RESOURCE)) goto out_unlock_up; msq->q_qbytes = setbuf.qbytes; ipcp->uid = setbuf.uid; ipcp->gid = setbuf.gid; ipcp->mode = (ipcp->mode & ~S_IRWXUGO) | (S_IRWXUGO & setbuf.mode); msq->q_ctime = CURRENT_TIME; /* sleeping receivers might be excluded by * stricter permissions. */ expunge_all(msq,-EAGAIN); /* sleeping senders might be able to send * due to a larger queue size. */ ss_wakeup(&msq->q_senders,0); #ifdef MSGQ_POLL_SUPPORT poll_wakeup_all(&msq->poll_wait); #endif /*MSGQ_POLL_SUPPORT*/ msg_unlock(msqid); break; } case IPC_RMID: freeque (msqid); break; } err = 0; out_up: up(&msg_ids.sem); return err; out_unlock_up: msg_unlock(msqid); goto out_up; out_unlock: msg_unlock(msqid); return err; } static int testmsg(struct msg_msg* msg,long type,int mode) { switch(mode) { case SEARCH_ANY: return 1; case SEARCH_LESSEQUAL: if(msg->m_type <=type) return 1; break; case SEARCH_EQUAL: if(msg->m_type == type) return 1; break; case SEARCH_NOTEQUAL: if(msg->m_type != type) return 1; break; } return 0; } static int inline pipelined_send(struct msg_queue* msq, struct msg_msg* msg) { struct list_head* tmp; tmp = msq->q_receivers.next; while (tmp != &msq->q_receivers) { struct msg_receiver* msr; msr = list_entry(tmp,struct msg_receiver,r_list); tmp = tmp->next; if(testmsg(msg,msr->r_msgtype,msr->r_mode)) { list_del(&msr->r_list); if(msr->r_maxsize < msg->m_ts) { msr->r_msg = ERR_PTR(-E2BIG); wake_up_process(msr->r_tsk); } else { msr->r_msg = msg; msq->q_lrpid = msr->r_tsk->pid; msq->q_rtime = CURRENT_TIME; wake_up_process(msr->r_tsk); return 1; } } } return 0; } asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg) { struct msg_queue *msq; struct msg_msg *msg; long mtype; int err; if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0) return -EINVAL; if (get_user(mtype, &msgp->mtype)) return -EFAULT; if (mtype < 1) return -EINVAL; msg = load_msg(msgp->mtext, msgsz); if(IS_ERR(msg)) return PTR_ERR(msg); msg->m_type = mtype; msg->m_ts = msgsz; msq = msg_lock(msqid); err=-EINVAL; if(msq==NULL) goto out_free; retry: err= -EIDRM; if (msg_checkid(msq,msqid)) goto out_unlock_free; err=-EACCES; if (ipcperms(&msq->q_perm, S_IWUGO)) goto out_unlock_free; if(msgsz + msq->q_cbytes > msq->q_qbytes || 1 + msq->q_qnum > msq->q_qbytes) { struct msg_sender s; if(msgflg&IPC_NOWAIT) { err=-EAGAIN; goto out_unlock_free; } ss_add(msq, &s); msg_unlock(msqid); schedule(); current->state= TASK_RUNNING; msq = msg_lock(msqid); err = -EIDRM; if(msq==NULL) goto out_free; ss_del(&s); if (signal_pending(current)) { err=-EINTR; goto out_unlock_free; } goto retry; } msq->q_lspid = current->pid; msq->q_stime = CURRENT_TIME; if(!pipelined_send(msq,msg)) { /* noone is waiting for this message, enqueue it */ list_add_tail(&msg->m_list,&msq->q_messages); msq->q_cbytes += msgsz; msq->q_qnum++; atomic_add(msgsz,&msg_bytes); atomic_inc(&msg_hdrs); #ifdef MSGQ_POLL_SUPPORT poll_wakeup_all(&msq->poll_wait); #endif } err = 0; msg = NULL; out_unlock_free: msg_unlock(msqid); out_free: if(msg!=NULL) free_msg(msg); return err; } static int inline convert_mode(long* msgtyp, int msgflg) { /* * find message of correct type. * msgtyp = 0 => get first. * msgtyp > 0 => get first message of matching type. * msgtyp < 0 => get message with least type must be < abs(msgtype). */ if(*msgtyp==0) return SEARCH_ANY; if(*msgtyp<0) { *msgtyp=-(*msgtyp); return SEARCH_LESSEQUAL; } if(msgflg & MSG_EXCEPT) return SEARCH_NOTEQUAL; return SEARCH_EQUAL; } asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg) { struct msg_queue *msq; struct msg_receiver msr_d; struct list_head* tmp; struct msg_msg* msg, *found_msg; int err; int mode; if (msqid < 0 || (long) msgsz < 0) return -EINVAL; mode = convert_mode(&msgtyp,msgflg); msq = msg_lock(msqid); if(msq==NULL) return -EINVAL; retry: err = -EIDRM; if (msg_checkid(msq,msqid)) goto out_unlock; err=-EACCES; if (ipcperms (&msq->q_perm, S_IRUGO)) goto out_unlock; tmp = msq->q_messages.next; found_msg=NULL; while (tmp != &msq->q_messages) { msg = list_entry(tmp,struct msg_msg,m_list); if(testmsg(msg,msgtyp,mode)) { found_msg = msg; if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) { found_msg=msg; msgtyp=msg->m_type-1; } else { found_msg=msg; break; } } tmp = tmp->next; } if(found_msg) { msg=found_msg; if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) { err=-E2BIG; goto out_unlock; } list_del(&msg->m_list); msq->q_qnum--; msq->q_rtime = CURRENT_TIME; msq->q_lrpid = current->pid; msq->q_cbytes -= msg->m_ts; atomic_sub(msg->m_ts,&msg_bytes); atomic_dec(&msg_hdrs); ss_wakeup(&msq->q_senders,0); #ifdef MSGQ_POLL_SUPPORT poll_wakeup_all(&msq->poll_wait); #endif msg_unlock(msqid); out_success: msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz; if (put_user (msg->m_type, &msgp->mtype) || store_msg(msgp->mtext, msg, msgsz)) { msgsz = -EFAULT; } free_msg(msg); return msgsz; } else { struct msg_queue *t; /* no message waiting. Prepare for pipelined * receive. */ if (msgflg & IPC_NOWAIT) { err=-ENOMSG; goto out_unlock; } list_add_tail(&msr_d.r_list,&msq->q_receivers); msr_d.r_tsk = current; msr_d.r_msgtype = msgtyp; msr_d.r_mode = mode; if(msgflg & MSG_NOERROR) msr_d.r_maxsize = INT_MAX; else msr_d.r_maxsize = msgsz; msr_d.r_msg = ERR_PTR(-EAGAIN); current->state = TASK_INTERRUPTIBLE; msg_unlock(msqid); schedule(); current->state = TASK_RUNNING; msg = (struct msg_msg*) msr_d.r_msg; if(!IS_ERR(msg)) goto out_success; t = msg_lock(msqid); if(t==NULL) msqid=-1; msg = (struct msg_msg*)msr_d.r_msg; if(!IS_ERR(msg)) { /* our message arived while we waited for * the spinlock. Process it. */ if(msqid!=-1) msg_unlock(msqid); goto out_success; } err = PTR_ERR(msg); if(err == -EAGAIN) { if(msqid==-1) BUG(); list_del(&msr_d.r_list); if (signal_pending(current)) err=-EINTR; else goto retry; } } out_unlock: if(msqid!=-1) msg_unlock(msqid); return err; } #ifdef CONFIG_PROC_FS static int sysvipc_msg_read_proc(char *buffer, char **start, off_t offset, int length, int *eof, void *data) { off_t pos = 0; off_t begin = 0; int i, len = 0; down(&msg_ids.sem); len += sprintf(buffer, " key msqid perms cbytes qnum lspid lrpid uid gid cuid cgid stime rtime ctime\n"); for(i = 0; i <= msg_ids.max_id; i++) { struct msg_queue * msq; msq = msg_lock(i); if(msq != NULL) { len += sprintf(buffer + len, "%10d %10d %4o %10lu %10lu %5u %5u %5u %5u %5u %5u %10lu %10lu %10lu\n", msq->q_perm.key, msg_buildid(i,msq->q_perm.seq), msq->q_perm.mode, msq->q_cbytes, msq->q_qnum, msq->q_lspid, msq->q_lrpid, msq->q_perm.uid, msq->q_perm.gid, msq->q_perm.cuid, msq->q_perm.cgid, msq->q_stime, msq->q_rtime, msq->q_ctime); msg_unlock(i); pos += len; if(pos < offset) { len = 0; begin = pos; } if(pos > offset + length) goto done; } } *eof = 1; done: up(&msg_ids.sem); *start = buffer + (offset - begin); len -= (offset - begin); if(len > length) len = length; if(len < 0) len = 0; return len; } #endif #ifdef MSGQ_POLL_SUPPORT static unsigned int msgq_poll(struct file* file, struct poll_table_struct* wait); /* file operation for a msg Q */ static struct file_operations msgq_file_ops = { poll: msgq_poll, }; static unsigned int msgq_poll(struct file* file, struct poll_table_struct* wait) { long msqid; struct msg_queue *msq; unsigned int mask = 0; key_t key = (key_t)file->private_data; /* first with key, find ID */ /* this is a kind of sanity check */ /* just think about this scenario. * 1. a fd is mapped to a Message Q * 2. The Q is removed * 3. A new Q is created. It uses the same mseesage Q id. * 4. app uses this fd. eventually the fd will point to * this new but wrong message queue. * Bottom line is message Q id is reused. That means two different * messages queues may use a message queue ID. * But key is unique. * If the key is equal, we can safely map the fd to the queue * even if the message q is recreated. */ down(&msg_ids.sem); msqid = ipc_findkey(&msg_ids, key); up(&msg_ids.sem); if(msqid == -1) return (POLLERR | POLLPRI); /* maybe this is an exception too */ msq = msg_lock(msqid); if(msq == NULL) return (POLLERR | POLLPRI); /* exception!!! */ /* wait */ poll_wait(file, &msq->poll_wait, wait); /* check whether msg Q is empty */ /* this is pretty simple implementation to check writability */ if(msq->q_qnum == 0) mask = POLLOUT | POLLWRNORM; else mask = POLLIN | POLLRDNORM; msg_unlock(msqid); return mask; } static struct super_block* msgqfs_read_super(struct super_block *sb, void *data, int silent) { struct inode *root = new_inode(sb); if (!root) return NULL; root->i_mode = S_IFDIR | S_IRUSR | S_IWUSR; root->i_uid = root->i_gid = 0; root->i_atime = root->i_mtime = root->i_ctime = CURRENT_TIME; sb->s_blocksize = 1024; sb->s_blocksize_bits = 10; sb->s_magic = 0x12345678; sb->s_op = NULL; sb->s_root = d_alloc(NULL, &(const struct qstr) { "msgq:", 5, 0 }); if (!sb->s_root) { iput(root); return NULL; } sb->s_root->d_sb = sb; sb->s_root->d_parent = sb->s_root; d_instantiate(sb->s_root, root); return sb; } /* MSGQ file system */ static int msgqfs_delete_dentry(struct dentry *dentry) { return 1; } static struct dentry_operations msgqfs_dentry_operations = { d_delete: msgqfs_delete_dentry, }; static DECLARE_FSTYPE(msgq_fs_type, "msgqfs", msgqfs_read_super, FS_NOMOUNT); static struct vfsmount* msgq_mnt; static struct inode* get_msgq_inode(void) { struct inode* inode = new_inode(msgq_mnt->mnt_sb); if(!inode) return NULL; inode->i_fop = &msgq_file_ops; inode->i_state = I_DIRTY; inode->i_mode = S_IRUSR | S_IWUSR; inode->i_uid = current->fsuid; inode->i_gid = current->fsgid; inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; inode->i_blksize = PAGE_SIZE; return inode; } /* * sys_msgqToFd() * * Converts a message Q to file descriptor. * */ asmlinkage long sys_msgqToFd(long msqid) { struct msg_queue* msq; struct file* f; struct inode* inode; int fd = -1; struct qstr this; char name[32]; struct dentry* dentry; printk(KERN_DEBUG"entering\n"); if(msqid < 0 ) { printk(KERN_DEBUG"invalid msgqid %lx\n", msqid); return -EINVAL; } /* get msg_queue for this msqid */ msq = msg_lock(msqid); if(msq == NULL) { printk(KERN_DEBUG"msg_lock failed\n"); return -EINVAL; } inode = get_msgq_inode(); if(!inode) { printk(KERN_DEBUG"get_msgq_inode failed\n"); msg_unlock(msqid); return -1; } /* now * we gotta allocate a file structure for this message queue and * initialize the file structure. */ /* step 1: * allocate an unused fd */ fd = get_unused_fd(); if(fd < 0) { printk(KERN_DEBUG"get_unused_fd failed\n"); goto fd_alloc_fail; } /* step 2: * allocate an file structure for this fd */ f = get_empty_filp(); if(f == NULL) { printk(KERN_DEBUG"get_empty_filp failed\n"); goto filp_alloc_fail; } /* step 3. initialize*/ sprintf(name, "[%lu]", inode->i_ino); this.name = name; this.len = strlen(name); this.hash = inode->i_ino; dentry = d_alloc(msgq_mnt->mnt_sb->s_root, &this); if(!dentry) { printk(KERN_DEBUG"d_alloc failed\n"); goto dentry_alloc_fail; } dentry->d_op = &msgqfs_dentry_operations; d_add(dentry, inode); f->f_vfsmnt = mntget(msgq_mnt); f->f_dentry = dget(dentry); f->f_pos = 0; f->f_reada = 0; f->f_op = &msgq_file_ops; f->f_iobuf = NULL; f->f_iobuf_lock = 0; f->f_flags = O_RDWR; f->f_mode = S_IRUSR | S_IWUSR; f->private_data = (void*)msq->q_perm.key; /* step 4. * install the filp */ fd_install(fd, f); msg_unlock(msqid); return fd; dentry_alloc_fail: put_filp(f); filp_alloc_fail: put_unused_fd(fd); fd_alloc_fail: iput(inode); msg_unlock(msqid); return -ENOMEM; } static int init_msgq_fs(void) { int err = register_filesystem(&msgq_fs_type); if(!err) { msgq_mnt = kern_mount(&msgq_fs_type); err = PTR_ERR(msgq_mnt); if(IS_ERR(msgq_mnt)) unregister_filesystem(&msgq_fs_type); else err = 0; } return err; } static void exit_msgq_fs(void) { unregister_filesystem(&msgq_fs_type); mntput(msgq_mnt); } static void poll_wakeup_all(wait_queue_head_t* wq) { wake_up_all(wq); } #endif
msg.h #ifndef _LINUX_MSG_H #define _LINUX_MSG_H #include <linux/ipc.h> /* ipcs ctl commands */ #define MSG_STAT 11 #define MSG_INFO 12 /* msgrcv options */ #define MSG_NOERROR 010000 /* no error if message is too big */ #define MSG_EXCEPT 020000 /* recv any msg except of specified type.*/ /* Obsolete, used only for backwards compatibility and libc5 compiles */ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ }; /* Include the definition of msqid64_ds */ #include <asm/msgbuf.h> /* message buffer for msgsnd and msgrcv calls */ struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ }; /* buffer for msgctl calls IPC_INFO, MSG_INFO */ struct msginfo { int msgpool; int msgmap; int msgmax; int msgmnb; int msgmni; int msgssz; int msgtql; unsigned short msgseg; }; #define MSGMNI 16 /* <= IPCMNI */ /* max # of msg queue identifiers */ #define MSGMAX 8192 /* <= INT_MAX */ /* max size of message (bytes) */ #define MSGMNB 16384 /* <= INT_MAX */ /* default max size of a message queue */ /* unused */ #define MSGPOOL (MSGMNI*MSGMNB/1024) /* size in kilobytes of message pool */ #define MSGTQL MSGMNB /* number of system message headers */ #define MSGMAP MSGMNB /* number of entries in message map */ #define MSGSSZ 16 /* message segment size */ #define __MSGSEG ((MSGPOOL*1024)/ MSGSSZ) /* max no. of segments */ #define MSGSEG (__MSGSEG <= 0xffff ? __MSGSEG : 0xffff) #ifdef __KERNEL__ asmlinkage long sys_msgget (key_t key, int msgflg); asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg); asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg); asmlinkage long sys_msgctl (int msqid, int cmd, struct msqid_ds *buf); /* MSGQ_POLL_SUPPORT */ asmlinkage long sys_msgqToFd(long msqid); /* end of MSGQ_POLL_SUPPORT */ #endif /* __KERNEL__ */ #endif /* _LINUX_MSG_H */
sys_i386.c /* * linux/arch/i386/kernel/sys_i386.c * * This file contains various random system calls that * have a non-standard calling sequence on the Linux/i386 * platform. */ /* * Modified By Hyouck Kim for sys_msgqToFd(). * peakhunt@yahoo.com * */ #include <linux/errno.h> #include <linux/sched.h> #include <linux/mm.h> #include <linux/smp.h> #include <linux/smp_lock.h> #include <linux/sem.h> #include <linux/msg.h> #include <linux/shm.h> #include <linux/stat.h> #include <linux/mman.h> #include <linux/file.h> #include <linux/utsname.h> #include <asm/uaccess.h> #include <asm/ipc.h> /* * sys_pipe() is the normal C calling standard for creating * a pipe. It's not the way Unix traditionally does this, though. */ asmlinkage int sys_pipe(unsigned long * fildes) { int fd[2]; int error; error = do_pipe(fd); if (!error) { if (copy_to_user(fildes, fd, 2*sizeof(int))) error = -EFAULT; } return error; } /* common code for old and new mmaps */ static inline long do_mmap2( unsigned long addr, unsigned long len, unsigned long prot, unsigned long flags, unsigned long fd, unsigned long pgoff) { int error = -EBADF; struct file * file = NULL; flags &= ~(MAP_EXECUTABLE | MAP_DENYWRITE); if (!(flags & MAP_ANONYMOUS)) { file = fget(fd); if (!file) goto out; } down_write(&current->mm->mmap_sem); error = do_mmap_pgoff(file, addr, len, prot, flags, pgoff); up_write(&current->mm->mmap_sem); if (file) fput(file); out: return error; } asmlinkage long sys_mmap2(unsigned long addr, unsigned long len, unsigned long prot, unsigned long flags, unsigned long fd, unsigned long pgoff) { return do_mmap2(addr, len, prot, flags, fd, pgoff); } /* * Perform the select(nd, in, out, ex, tv) and mmap() system * calls. Linux/i386 didn't use to be able to handle more than * 4 system call parameters, so these system calls used a memory * block for parameter passing.. */ struct mmap_arg_struct { unsigned long addr; unsigned long len; unsigned long prot; unsigned long flags; unsigned long fd; unsigned long offset; }; asmlinkage int old_mmap(struct mmap_arg_struct *arg) { struct mmap_arg_struct a; int err = -EFAULT; if (copy_from_user(&a, arg, sizeof(a))) goto out; err = -EINVAL; if (a.offset & ~PAGE_MASK) goto out; err = do_mmap2(a.addr, a.len, a.prot, a.flags, a.fd, a.offset >> PAGE_SHIFT); out: return err; } extern asmlinkage int sys_select(int, fd_set *, fd_set *, fd_set *, struct timeval *); struct sel_arg_struct { unsigned long n; fd_set *inp, *outp, *exp; struct timeval *tvp; }; asmlinkage int old_select(struct sel_arg_struct *arg) { struct sel_arg_struct a; if (copy_from_user(&a, arg, sizeof(a))) return -EFAULT; /* sys_select() does the appropriate kernel locking */ return sys_select(a.n, a.inp, a.outp, a.exp, a.tvp); } /* * sys_ipc() is the de-multiplexer for the SysV IPC calls.. * * This is really horribly ugly. */ asmlinkage int sys_ipc (uint call, int first, int second, int third, void *ptr, long fifth) { int version, ret; version = call >> 16; /* hack for backward compatibility */ call &= 0xffff; switch (call) { case SEMOP: return sys_semop (first, (struct sembuf *)ptr, second); case SEMGET: return sys_semget (first, second, third); case SEMCTL: { union semun fourth; if (!ptr) return -EINVAL; if (get_user(fourth.__pad, (void **) ptr)) return -EFAULT; return sys_semctl (first, second, third, fourth); } case MSGSND: return sys_msgsnd (first, (struct msgbuf *) ptr, second, third); case MSGRCV: switch (version) { case 0: { struct ipc_kludge tmp; if (!ptr) return -EINVAL; if (copy_from_user(&tmp, (struct ipc_kludge *) ptr, sizeof (tmp))) return -EFAULT; return sys_msgrcv (first, tmp.msgp, second, tmp.msgtyp, third); } default: return sys_msgrcv (first, (struct msgbuf *) ptr, second, fifth, third); } case MSGGET: return sys_msgget ((key_t) first, second); case MSGCTL: return sys_msgctl (first, second, (struct msqid_ds *) ptr); case SHMAT: switch (version) { default: { ulong raddr; ret = sys_shmat (first, (char *) ptr, second, &raddr); if (ret) return ret; return put_user (raddr, (ulong *) third); } case 1: /* iBCS2 emulator entry point */ if (!segment_eq(get_fs(), get_ds())) return -EINVAL; return sys_shmat (first, (char *) ptr, second, (ulong *) third); } case SHMDT: return sys_shmdt ((char *)ptr); case SHMGET: return sys_shmget (first, second, third); case SHMCTL: return sys_shmctl (first, second, (struct shmid_ds *) ptr); /* MSGQ_POLL_SUPPORT */ case 25: printk(KERN_ALERT"case # 25\n"); return sys_msgqToFd((key_t)first); default: return -EINVAL; } } /* * Old cruft */ asmlinkage int sys_uname(struct old_utsname * name) { int err; if (!name) return -EFAULT; down_read(&uts_sem); err=copy_to_user(name, &system_utsname, sizeof (*name)); up_read(&uts_sem); return err?-EFAULT:0; } asmlinkage int sys_olduname(struct oldold_utsname * name) { int error; if (!name) return -EFAULT; if (!access_ok(VERIFY_WRITE,name,sizeof(struct oldold_utsname))) return -EFAULT; down_read(&uts_sem); error = __copy_to_user(&name->sysname,&system_utsname.sysname,__OLD_UTS_LEN); error |= __put_user(0,name->sysname+__OLD_UTS_LEN); error |= __copy_to_user(&name->nodename,&system_utsname.nodename,__OLD_UTS_LEN); error |= __put_user(0,name->nodename+__OLD_UTS_LEN); error |= __copy_to_user(&name->release,&system_utsname.release,__OLD_UTS_LEN); error |= __put_user(0,name->release+__OLD_UTS_LEN); error |= __copy_to_user(&name->version,&system_utsname.version,__OLD_UTS_LEN); error |= __put_user(0,name->version+__OLD_UTS_LEN); error |= __copy_to_user(&name->machine,&system_utsname.machine,__OLD_UTS_LEN); error |= __put_user(0,name->machine+__OLD_UTS_LEN); up_read(&uts_sem); error = error ? -EFAULT : 0; return error; } asmlinkage int sys_pause(void) { current->state = TASK_INTERRUPTIBLE; schedule(); return -ERESTARTNOHAND; }
Makefile all: msg_test msg_test2 OBJS= msgq.o\ syscall_stuff.o CC=gcc CFLAGS=-c -g LIBS=-lpthread %.o:%.c ${CC} ${CFLAGS} $^ msg_test: ${OBJS} ${CC} -o $@ $^ ${LIBS} msg_test2: msgq2.o syscall_stuff.o ${CC} -o $@ $^ ${LIBS} clean: rm -f msg_test msg_test2 *.o
syscall_stuff.c /* * азчъьЮчрпщч ь тчъчшщущч Hyouck Kim, peakhunt@yahoo.com, 2003 * * ╥псшЦХзп тшО АьАБуэщчсч рКвчрп msgqToFd() * */ #include <errno.h> #include <sys/msg.h> #include <stdlib.h> #include <sys/syscall.h> /* * АшутЦНИьу эпзЮччъЮутушущьО АзчъьЮчрпщК ьв ьАЕчтщКЕ БузАБчр glibc. * БчГщуу ьв sysdeps/unix/sysv/linux/i386/i386/sysdep.h */ /* ╢шО сущуЮпФьь ячшуу чъБьэпшЛщчсч зчтп ъЮьяусщуэ з ЦАшЦспэ пААуэяшуЮп. ╥туАЛ чъЮутушОуБАО ЮОт эпзЮчАчр, зчБчЮКу яЦтЦБ ьАъчшЛвчрпБЛАО щьжу. */ asm (".L__X'%ebx = 1\n\t" ".L__X'%ecx = 2\n\t" ".L__X'%edx = 2\n\t" ".L__X'%eax = 3\n\t" ".L__X'%esi = 3\n\t" ".L__X'%edi = 3\n\t" ".L__X'%ebp = 3\n\t" ".L__X'%esp = 3\n\t" ".macro bpushl name reg\n\t" ".if 1 - \\name\n\t" ".if 2 - \\name\n\t" "pushl %ebx\n\t" ".else\n\t" "xchgl \\reg, %ebx\n\t" ".endif\n\t" ".endif\n\t" ".endm\n\t" ".macro bpopl name reg\n\t" ".if 1 - \\name\n\t" ".if 2 - \\name\n\t" "popl %ebx\n\t" ".else\n\t" "xchgl \\reg, %ebx\n\t" ".endif\n\t" ".endif\n\t" ".endm\n\t" ".macro bmovl name reg\n\t" ".if 1 - \\name\n\t" ".if 2 - \\name\n\t" "movl \\reg, %ebx\n\t" ".endif\n\t" ".endif\n\t" ".endm\n\t"); #define INLINE_SYSCALL(name, nr, args...) \ ({ \ unsigned int resultvar; \ asm volatile ( \ LOADARGS_##nr \ "movl %1, %%eax\n\t" \ "int $0x80\n\t" \ RESTOREARGS_##nr \ : "=a" (resultvar) \ : "i" (__NR_##name) ASMFMT_##nr(args) : "memory", "cc"); \ if (resultvar >= 0xfffff001) \ { \ __set_errno (-resultvar); \ resultvar = 0xffffffff; \ } \ (int) resultvar; }) #define LOADARGS_0 #define LOADARGS_1 \ "bpushl .L__X'%k2, %k2\n\t" \ "bmovl .L__X'%k2, %k2\n\t" #define LOADARGS_2 LOADARGS_1 #define LOADARGS_3 LOADARGS_1 #define LOADARGS_4 LOADARGS_1 #define LOADARGS_5 LOADARGS_1 #define RESTOREARGS_0 #define RESTOREARGS_1 \ "bpopl .L__X'%k2, %k2\n\t" #define RESTOREARGS_2 RESTOREARGS_1 #define RESTOREARGS_3 RESTOREARGS_1 #define RESTOREARGS_4 RESTOREARGS_1 #define RESTOREARGS_5 RESTOREARGS_1 #define ASMFMT_0() #define ASMFMT_1(arg1) \ , "acdSD" (arg1) #define ASMFMT_2(arg1, arg2) \ , "adCD" (arg1), "c" (arg2) #define ASMFMT_3(arg1, arg2, arg3) \ , "aCD" (arg1), "c" (arg2), "d" (arg3) #define ASMFMT_4(arg1, arg2, arg3, arg4) \ , "aD" (arg1), "c" (arg2), "d" (arg3), "S" (arg4) #define ASMFMT_5(arg1, arg2, arg3, arg4, arg5) \ , "a" (arg1), "c" (arg2), "d" (arg3), "S" (arg4), "D" (arg5) /* ьв include/errno.h яьяшьчБузь glibc */ # define __set_errno(val) (errno = (val)) /* ╨╬╫╣ф а╨╬©╦ю╬╡╟╫ке ╪╟╨ю╬а╬╡ */ int msgqToFd(key_t key) { /* * ipc : ╫чэуЮ АьАБуэщчсч рКвчрп IPC (__NR_ipc) * include/asm/unistd.h * 5 : ГьАшч пЮсЦэущБчр * 25 : щчэуЮ АьАБуэщчсч рКвчрп msgqToFd. * впсшОщьБу р include/i386/ipc.h ь * arch/i386/kernel/sys_i386.c */ return INLINE_SYSCALL(ipc, 5, 25, key, 0, 0, NULL); }
msg_test.h /* * Written By Hyouck Kim, peakhunt@yahoo.com, 2003 * * A simple system call stuff for msgqToFd() * */ #ifndef __MSG_TEST_H #define __MSG_TEST_H extern int msgqToFd(int msgq_id); #endif /*!__MSG_TEST_H*/
msgq.c /* * Written By Hyouck Kim, peakhunt@yahoo.com, 2003 * * A simple message queue test * */ #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> #include "msg_test.h" #include <sys/select.h> #include <unistd.h> #include <sys/time.h> #define MSGQ_KEY1 "./.msgq_key1" #define MSGQ_KEY2 "./.msgq_key2" void* msgq_thread(void* data); typedef struct _msg { long mtype; char buffer[1024+1]; } MSG; main(int argc, char** argv) { key_t key_mine, key_dest; int msgq_id; int msgq_id_dest; pthread_t thrd; int q_fd; int r; fd_set rset; if(argc != 2) { printf("ysage : msgq 1|2\n"); return -1; } if(argv[1][0] == '1') { key_mine = ftok(MSGQ_KEY1, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY2, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } else { key_mine = ftok(MSGQ_KEY2, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY1, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } msgq_id = msgget(key_mine, IPC_CREAT | S_IRWXU); if(msgq_id == -1) { perror("msgget:"); return -1; } msgq_id_dest = msgget(key_dest, IPC_CREAT | S_IRWXU); if(msgq_id_dest == -1) { perror("msgget:"); return -1; } /* r = pthread_create(&thrd, NULL, msgq_thread, (void*)msgq_id); if(r != 0) { perror("pthread_create:"); msgctl(msgq_id, IPC_RMID, NULL); return -1; } */ q_fd = msgqToFd(msgq_id); if(q_fd < 0) { perror("msgqToFd:"); return -1; } while(1) { char buffer[1024+1]; int len; MSG msg; FD_ZERO(&rset); FD_SET(0, &rset); FD_SET(q_fd, &rset); /* XXX: no timeout */ if(select(q_fd + 1, &rset, NULL, NULL, NULL) <= 0) { perror("select error:"); exit(-1); } if(FD_ISSET(0, &rset)) { len = read(0, buffer, 1024); if(len <= 0) { perror("read:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } buffer[len] = '\0'; strcpy(msg.buffer,buffer); msg.mtype = 1; r = msgsnd(msgq_id_dest, &msg, sizeof(msg.buffer), 0); if( r == -1) { perror("msgsnd:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } } if(FD_ISSET(q_fd, &rset)) { MSG msg; int r; r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); exit(-1); } printf("%s\n", msg.buffer); } } } void* msgq_thread(void* data) { int msgq_id = (int)data; MSG msg; int r; while(1) { r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); return NULL; } printf("%s\n", msg.buffer); } }
msgq2.c /* * Written By Hyouck Kim, peakhunt@yahoo.com, 2003 * * A simple message queue test * */ #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> #include "msg_test.h" #include <sys/select.h> #include <unistd.h> #include <sys/time.h> #define MSGQ_KEY1 "./.msgq_key1" #define MSGQ_KEY2 "./.msgq_key2" void* msgq_thread(void* data); typedef struct _msg { long mtype; char buffer[1024+1]; } MSG; main(int argc, char** argv) { key_t key_mine, key_dest; int msgq_id; int msgq_id_dest; pthread_t thrd; int q_fd, q_fd2; int r; fd_set rset, wset; if(argc != 2) { printf("ysage : msgq 1|2\n"); return -1; } if(argv[1][0] == '1') { key_mine = ftok(MSGQ_KEY1, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY2, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } else { key_mine = ftok(MSGQ_KEY2, 0); if((int)key_mine == -1) { perror("ftok:"); return -1; } key_dest = ftok(MSGQ_KEY1, 0); if((int)key_dest == -1) { perror("ftok:"); return -1; } } msgq_id = msgget(key_mine, IPC_CREAT | S_IRWXU); if(msgq_id == -1) { perror("msgget:"); return -1; } msgq_id_dest = msgget(key_dest, IPC_CREAT | S_IRWXU); if(msgq_id_dest == -1) { perror("msgget:"); return -1; } /* r = pthread_create(&thrd, NULL, msgq_thread, (void*)msgq_id); if(r != 0) { perror("pthread_create:"); msgctl(msgq_id, IPC_RMID, NULL); return -1; } */ q_fd = msgqToFd(msgq_id); q_fd2 = msgqToFd(msgq_id_dest); if(q_fd < 0 || q_fd2 < 0) { perror("msgqToFd:"); return -1; } while(1) { char buffer[1024+1]; int len; MSG msg; FD_ZERO(&rset); FD_ZERO(&wset); FD_SET(0, &rset); FD_SET(q_fd, &rset); FD_SET(q_fd2, &wset); /* XXX: no timeout */ printf("waiting on select\n"); if(select(q_fd2 + 1, &rset, &wset, NULL, NULL) <= 0) { perror("select error:"); exit(-1); } if(FD_ISSET(0, &rset)) { len = read(0, buffer, 1024); if(len <= 0) { perror("read:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } buffer[len] = '\0'; strcpy(msg.buffer,buffer); msg.mtype = 1; r = msgsnd(msgq_id_dest, &msg, sizeof(msg.buffer), 0); if( r == -1) { perror("msgsnd:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } } if(FD_ISSET(q_fd, &rset)) { MSG msg; int r; printf("read to receive\n"); r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); exit(-1); } printf("%s\n", msg.buffer); } if(FD_ISSET(q_fd2, &wset)) { strcpy(msg.buffer,"ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ\n"); msg.mtype = 1; printf("before writing\n"); r = msgsnd(msgq_id_dest, &msg, sizeof(msg.buffer), IPC_NOWAIT); if( r == -1 && errno != EAGAIN) { perror("msgsnd:"); msgctl(msgq_id, IPC_RMID, NULL); return 0; } else if(errno == EAGAIN) { perror("queue full"); } } } } void* msgq_thread(void* data) { int msgq_id = (int)data; MSG msg; int r; while(1) { r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0); if( r == -1) { perror("msgrcv:"); return NULL; } printf("%s\n", msg.buffer); } }

<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>

Обсуждение [ RSS ]
  • 1, Vladislav Lazarenko (?), 13:17, 07/04/2005 [ответить]  
  • +/
    Как решение - очень интересно.
    Но проще все сделать с помощью контейнера сообщений, мютекса для синхронизации доступа к нему и семафора.
     

     Добавить комментарий
    Имя:
    E-Mail:
    Заголовок:
    Текст:




    Партнёры:
    PostgresPro
    Inferno Solutions
    Hosting by Hoster.ru
    Хостинг:

    Закладки на сайте
    Проследить за страницей
    Created 1996-2025 by Maxim Chirkov
    Добавить, Поддержать, Вебмастеру