Message Queue

Message Queue is a FIFO Linked List# of message structures stored inside the kernel’s memory space and could be simultaneously accessed by multiple processes with the synchronisation provided by the kernel. Each message has a long variable named type which identify the message. New messages are appended to the end of the queue.

A typical Message Queue is defined as follows:

struct msqid_ds {
  struct ipc_perm msg_perm; // permissions
  struct msg* msg_first;    // pointer to the first message on the queue
  struct msg* msg_last;     // pointer to the last message on the queue
  msglen_t msg_cbytes;      // current number of bytes on the queue
  msgqnum_t msg_qnum;       // current number of messages on the queue
  msglen_t msg_qbytes;      // maximum number of bytes allowed on queue
  pid_t msg_lspid;          // pid of last msgsnd()
  pid_t msg_lrpid;          // pid of last msgrcv()
  time_t msg_stime;         // time of last msgsnd()
  time_t msg_rtime;         // time of last msgrcv()
  time_t msg_ctime;         // time of last msgctl()
};

We can access (with flag values of 0) and/or create a Message Queue (with IPC_CREAT flag) by msgget() to gain the message queue ID by passing the associated IPC Facilities Key#. To get or alter the msqid_ds, we can utilise msgctl() system call by passing the message queue ID and command flag (IPC_STAT to get msqid_ds, IPC_SET to alter msg_perm.uid, msg_perm.gid, msg_perm.mode and msg_qbytes) needed. Sending message to the message queue could be done by calling msgsnd() with parameters of target message queue’s ID, a pointer to a message structure (shown below), the length of the message, and a flag. If msgsnd()’s flag is set as IPC_NOWAIT, it makes the call to return immediately if there is no room from the message queue to accept new messages. Receiving message from the message queue could be done by calling msgrcv() with almost identical parameters as msgsnd() except it needs an additional type long variable. A more detail explanation on how msgrcv() handles type will be explored below. By default, msgrcv() is in blocking state, meaning it will either wait until a message is available, the message queue is removed or interrupted by signal. If IPC_NOWAIT is set, then msgrcv() will return immediately with errno set as ENOMSG if the requested type is not on the queue. To remove a message queue, call msgctl() with the flag IPC_RMID.

#include <sys/msg.h>

key_t key = ftok(...); // key initialisation

int msqid;
struct msqid_ds buff;
struct mymsg {
  long mtype;     // message type
  char mtext[N];  // message text
};

struct mymsg msg = {
  ...
};

msqid = msgget(key, IPC_CREAT | IPC_EXCL);  // or
msqid = msgget(IPC_PRIVATE, IPC_CREAT);     // to create a unique message queue
msqid = msgget(key, 0);                     // just to access the message queue

if (msqid == -1) {      // error handling
  if (errno == ...) {
  } ...
}

msgctl(msqid, IPC_STAT, &buff); // get current status of the message queue with
                                // the ID

msgsnd(msqid, &msg, N, 0);      // or IPC_NOWAIT as flag, sending message to the
                                // message queue
msgrcv(msqid, &msg, N, 0, 0);   // or IPC_NOWAIT as flag, receiving message from
                                // the message queue

msgctl(msqid, IPC_RMID, NULL);  // remove the message queue with the ID

If the type value passed to the function msgrcv() is 0, then the function will return the first message on the queue. If it is more than 0, then the first message with equal type value is returned. type less than 0 will tell msgrcv() to return the first message of the lowest type value that is less than or equal to the absolute value of type.

Links to this page
#operating-system #list #parallelism