Message queues raeder writer program with Epoll system call

  • In this program, you are going to learn

  • How to open a message queue ?

  • How to get/set message queue attributes ?

  • How to receive a message from a message queue?

  • How to send a message to a message queue?

Let us answer few basic questions in this socket

What does mq_open do in this context?

Why use O_CREAT | O_RDWR as flags?

What is the significance of the 0666 permission parameter?

Can mq_open open an existing queue?

What happens if the queue with queue_name does not exist?

How does mq_open handle errors during queue creation?

Why pass NULL as the last parameter?

Can multiple processes open the same message queue?

What is the lifetime of the message queue created with mq_open?

How do I unlink (delete) a message queue created with mq_open?

What is the primary purpose of the epoll system call?

What types of file descriptors can be monitored using epoll?

What data structure is used by epoll to store events?

How do you handle errors when using the epoll system call?

How does epoll handle a set of file descriptors with different states (e.g., reading, writing, exception)?

How does epoll Checking Ready File Descriptors?

What does it mean if epoll returns 0?

https://www.plantuml.com/plantuml/svg/JP7BRi8m44Nt-OhffhA5YyY6HIia46INGfMc3wcgB9DCm9Bp87P0hVhn7HjQuOKzziovzXtFhLEz6seJtRWzrWYsKmKQjOr28hgl3xBji6670GUKZQfHm4OkisIHmmz5sUej4p2PJgS2rYzfoXy2iqEddEjPVH1mwoD_ZbrhZ2nwL0vZDhawSeQTeweKa3njqbGkyrGkLYi1GGAFs3W2Jdjj45XOMCmvQ0li2ot_apefxQwq0cS5F2xUPVAQhFDd0NSn2UaAs3dvCVaSbygfSLN2R4RNSQ1MD1TPyCCxwh50VSJWQ3jK5VO2lB4nvMgxi_eR_plm3rbioa3KQAtQKOLrlQ5sNlOSHX1J-uX1OybOq_A865tv-M9scMVfg227xDZgafDiOLhhNqAOTMqNpQcO_l0N
  • There are many functions used in message queues. We can classify those functions based on functionalities.

    • mq_open

    • mq_getattr

    • epoll create1

    • epoll_ctl

    • epoll_wait

    • mq_receive

    • mq_send

    • mq_close

  • mq_open is used to open message queues. For example,

mq = mq_open(queue_name, O_CREAT | O_RDWR, 0666, NULL);
  • mq_getattr is used to get message queue attributes. For example,

mq_getattr(mq, &attr);
  • epoll_create1() creating an epoll instance using epoll_create1, The size parameter is an advisory hint for the kernel regarding the number of file descriptors expected to be monitored, For example,

epoll_fd = epoll_create1(0);
  • epoll_ctl() After creating an epoll instance, file descriptors are added to it using epoll_ctl, For example,

ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mq, &event);
  • epoll_wait() The application then enters a loop where it waits for events using epoll_wait, For example,

ready_fds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
  • mq_receive is used to receive a message from a message queue. For example,

ret = mq_receive(mq, buffer, attr.mq_msgsize, NULL);;
  • mq_send is used to send a message to a message queue. For example,

ret = mq_send(mq, message, strlen(message) + 1, 0);
  • mq_close is used to close the opened message queues. For example,

mq_close(mq);
  • See the full program below,

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>

#define MAX_EVENTS 2

mqd_t mq;
int epoll_fd;
const char* queue_name = "/my_queue";

static void sigint_handler(
int signo)
{
  int ret;

  (void)close(mq);
  (void)close(epoll_fd);
  ret = mq_close(mq);
        
  if (ret < 0) {
     perror("mq_close");
     exit(EXIT_SUCCESS);
  }  

  ret = mq_unlink(
  queue_name);
  
  if (ret < 0) {
    perror("mq_unlink");
    exit(EXIT_SUCCESS);
  }
  sleep(2);
  printf("Caught sigINT!\n");
  exit(EXIT_SUCCESS);
}

void register_signal_handler(
int signum,
void (*handler)(int))
{
  if (signal(signum, handler) == 
  SIG_ERR) {
     printf("Cannot handle signal\n");
     exit(EXIT_FAILURE);
  }
}

int main(void) 
{
  struct mq_attr attr;
  char* buffer;
  int ret;
  int ready_fds;
  struct epoll_event events[MAX_EVENTS];
  const char* message = "Hello, Writer!";

  register_signal_handler(SIGINT,
  sigint_handler);

  mq = mq_open(queue_name, 
  O_CREAT | O_RDWR, 0666, NULL);
  
  if (mq == (mqd_t)-1) {
    perror("mq_open");
    return -1;
  }

  ret = mq_getattr(mq, &attr); 

  if (ret < 0) {
    perror("mq_getattr");
    (void)close(mq);
    return -2;
  }

  if ((epoll_fd = epoll_create1(0)) == -1) {
	  perror("Epoll creation failed");
	  exit(EXIT_FAILURE);
  }

  struct epoll_event event;
  event.events = EPOLLIN;
  event.data.fd = mq;
  ret = epoll_ctl(epoll_fd, 
  EPOLL_CTL_ADD, mq, &event);
 
  if (ret < 0) {
     perror("Epoll_ctl failed");
     exit(EXIT_FAILURE);
  }

  while(1) {
    ready_fds = epoll_wait(epoll_fd, 
    events, MAX_EVENTS, -1);

    if (ready_fds == -1) {
	perror("Epoll wait failed");
	exit(EXIT_FAILURE);
    }

    if (events[0].data.fd == mq) {
      buffer = (char*)malloc(
      attr.mq_msgsize);
      
      if (buffer == NULL) {
        perror("malloc");
        (void)close(mq);
        return -4;
      }

      ret = mq_receive(mq, 
      buffer, attr.mq_msgsize, NULL);
      
      if (ret < 0) {
        perror("mq_receive");
        (void)close(mq);
        return -5;
      }
      printf("Received message: %s\n", 
      buffer);
      free(buffer);

      ret = mq_send(mq, message, 
      strlen(message) + 1, 0);

      if (ret < 0) {
        perror("mq_send");
        (void)close(mq);
        return -6;
      } else {
        printf("message = %s\n", 
        message);
      }
    }
  }

  (void)mq_close(mq);

  return 0;
}

 1$ gcc -o reader reader.c -lrt
 2
 3$ sudo ./reader
 4
 5Received message: Hello, reader!
 6message = Hello, Writer!
 7Received message: Hello, reader!
 8message = Hello, Writer!
 9Received message: Hello, Writer!
10message = Hello, Writer!
11Received message: Hello, Writer!
12message = Hello, Writer!
13Received message: Hello, Writer!
14message = Hello, Writer!
15Received message: Hello, Writer!
16message = Hello, Writer!
17Received message: Hello, Writer!
18message = Hello, Writer!
19Received message: Hello, Writer!
20message = Hello, Writer!
21Received message: Hello, reader!
22message = Hello, Writer!
23Received message: Hello, reader!
24message = Hello, Writer!
25^CCaught sigINT!
https://www.plantuml.com/plantuml/svg/JP5DRy8m38Rl_HNU1IKYG_J2WGa91BrrC7NTXpHDKMXTY9IqfKb1c_RZvvPk8eV4oUkFnyxIUTNurfhWpX_H8hXQPMZKFkY5O6vFiggnPASMMvIbiYXW9zT9j4hXXwnayvO8cCvcCm7RbpZc3y4SiAwCaLc3ocF8fjtRV7ZpXbsj8XSGFUtYMAxJMAus6m7s9624PomzHLoEsY2m_c0Xvw0Ti2zql6ToMEQiyxVed3eGbFED8SxXpc4C8N7Tu5oKzZUr-pfEmEFgNKQlqJPz5d0VahykW5t5Z-dd95TUJOeS5Wl2uq13AWSCeDMX7D0hxvkUPjHPbEDFQp13VSPUsxT5WOs0pcL2cdK7fx_nVsne7FLRLZoWtdJHxSCCrcaIZpDgd9qhdNEApKpbkgmKvdnL1qjofa_y1G==
  • There are many functions used in message queues. We can classify those functions based on functionalities.

    • mq_open

    • mq_send

    • epoll create1

    • epoll_ctl

    • epoll_wait

    • mq_getattr

    • mq_receive

    • mq_close

  • mq_open is used to open message queues. For example,

mq = mq_open(queue_name, O_CREAT | O_RDWR, 0666, NULL);
  • mq_send is used to send a message to a message queue. For example,

ret = mq_send(mq, message, strlen(message) + 1, 0);
  • epoll_create1() creating an epoll instance using epoll_create1, The size parameter is an advisory hint for the kernel regarding the number of file descriptors expected to be monitored, For example,

epoll_fd = epoll_create1(0);
  • epoll_ctl() After creating an epoll instance, file descriptors are added to it using epoll_ctl, For example,

ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mq, &event);
  • epoll_wait() The application then enters a loop where it waits for events using epoll_wait, For example,

ready_fds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
  • mq_getattr is used to get message queue attributes. For example,

ret = mq_getattr(mq, &attr);
  • mq_receive is used to receive a message from a message queue. For example,

ret = mq_receive(mq, buffer, attr.mq_msgsize, NULL);;
  • mq_close is used to close the opened message queues. For example,

mq_close(mq);
  • See the full program below,

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>

#define MAX_EVENTS 2

mqd_t mq;
int epoll_fd;

static void sigint_handler(
int signo)
{
  int ret;

  (void)close(mq);
  (void)close(epoll_fd);
  ret = mq_close(mq);
        
  if (ret < 0) {
   perror("mq_close");
   exit(EXIT_SUCCESS);
  }
  sleep(2);
  printf("Caught sigINT!\n");
  exit(EXIT_SUCCESS);
}

void register_signal_handler(
int signum,
void (*handler)(int))
{
  if (signal(signum, handler) ==
  SIG_ERR) {
     printf("Cannot handle signal\n");
     exit(EXIT_FAILURE);
  }
}

int main(void) 
{
  const char* queue_name = "/my_queue";
  const char* message = "Hello, reader!";
  int ret;
  int ready_fds;
  char *buffer;
  struct mq_attr attr;
  struct epoll_event events[MAX_EVENTS];

  register_signal_handler(SIGINT,
  sigint_handler);

  mq = mq_open(queue_name, 
  O_CREAT | O_RDWR, 0666, NULL);

  if (mq == (mqd_t)-1) {
    perror("mq_open");
    return -1;
  }

  if ((epoll_fd = epoll_create1(0)) == -1) {
          perror("Epoll creation failed");
          exit(EXIT_FAILURE);
  }

  struct epoll_event event;
  event.events = EPOLLIN;
  event.data.fd = mq;
  ret = epoll_ctl(epoll_fd,
  EPOLL_CTL_ADD, mq, &event);

  if (ret < 0) {
     perror("Epoll_ctl failed");
     exit(EXIT_FAILURE);
  }

  while (1) {
    ret = mq_send(mq, message, 
    strlen(message) + 1, 0);
   
    if (ret < 0) {
      perror("mq_send");
      (void)close(mq);
      return -2;
    } else {
      printf("message = %s\n", 
      message);
    }

    ready_fds = epoll_wait(epoll_fd,
    events, MAX_EVENTS, -1);

    if (ready_fds < 0) {
      perror("ready_fds");
      (void)close(mq);
      return -3;
    }
    
    if (events[0].data.fd == mq) {
      ret = mq_getattr(mq, &attr);

      if (ret < 0) {
        perror("mq_getattr");
        (void)close(mq);
        return -4;
      }  

      buffer = (char*)malloc(
      attr.mq_msgsize);
      
      if (buffer == NULL) {
        perror("malloc");
        (void)close(mq);
        return -5;
      }

      memset(buffer, 0, 
      sizeof(buffer));
      ret = mq_receive(mq, buffer, 
      attr.mq_msgsize, NULL);
      
      if (ret < 0) {
        perror("mq_receive");
        (void)close(mq);
        return -6;
      }
      
      printf("Received message: %s\n", 
      buffer);
      free(buffer);     
    }
  }

  (void)mq_close(mq);

  return 0;
}

 1$ gcc -o writer writer.c -lrt
 2
 3$ sudo ./writer
 4
 5messageSent = Hello, reader!
 6Received message: Hello, Writer!
 7message = Hello, reader!
 8Received message: Hello, reader!
 9message = Hello, reader!
10Received message: Hello, reader!
11message = Hello, reader!
12Received message: Hello, reader!
13message = Hello, reader!
14Received message: Hello, reader!
15message = Hello, reader!
16Received message: Hello, reader!
17message = Hello, reader!
18Received message: Hello, reader!
19message = Hello, reader!
20Received message: Hello, reader!
21message = Hello, reader!
22Received message: Hello, Writer!
23message = Hello, reader!
24^CCaught sigINT!

Message queues API

Learning

mq_open

creates a new message queue or opens an existing one, depending on the specified flags and parameters.

mq_getattr

is employed to obtain the attributes of an open message queue, including its current state and configuration.

epoll

handles a set of file descriptors with different states, such as reading, writing, and exceptions, by using the struct epoll_event structure and the associated event flags..

mq_receive

receiving (reading) messages from a message queue.

mq_send

sending (writing) messages to a message queue.

mq_close

used to close a message queue descriptor.