AiCPlayer
Interface of aic vm - for rendering aspect, sensors, video records
amqp_listen.c
Go to the documentation of this file.
1 
3 #include <stdint.h>
4 #include <stdio.h>
5 #include <unistd.h>
6 
7 #include "logger.h"
8 #include "config_env.h"
9 #include "amqp_listen.h"
10 
11 #include <amqp.h>
12 #include <amqp_framing.h>
13 #include <amqp_tcp_socket.h>
14 
15 #define LOG_TAG "amqp_listen"
16 
17 int amqp_listen_retry(const char* hostname, int port, const char* bindingkey,
18  amqp_connection_state_t* conn, const unsigned int tries)
19 {
20 /* Macro to retry connecting to the AMQP server */
21 #define RETRY \
22  do \
23  { \
24  sleep(backoff); \
25  backoff *= 2; \
26  current_try++; \
27  } while (0)
28  uint8_t success = 0;
29  unsigned int current_try = 0;
30  unsigned int backoff = 1;
31  amqp_rpc_reply_t reply;
32 
33  *conn = amqp_new_connection();
34 
35  while (current_try < tries && !success)
36  {
37  amqp_socket_t* socket = amqp_tcp_socket_new(*conn);
38  int status = amqp_socket_open(socket, hostname, port);
39 
40  if (status)
41  {
42  LOGC("AMQP error opening socket: %s", (char*) amqp_error_string2(status));
43  RETRY;
44  continue;
45  }
46 
47  reply = amqp_login(*conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE,
48  AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN,
49  configvar_string("AIC_PLAYER_AMQP_USERNAME"),
50  configvar_string("AIC_PLAYER_AMQP_PASSWORD"));
51 
52  if (reply.reply_type != AMQP_RESPONSE_NORMAL)
53  {
54  LOGC("AMQP login error");
55  RETRY;
56  continue;
57  }
58 
59  amqp_channel_open(*conn, 1);
60  reply = amqp_get_rpc_reply(*conn);
61 
62  if (reply.reply_type != AMQP_RESPONSE_NORMAL)
63  {
64  LOGC("AMQP Channel error");
65  amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
66  RETRY;
67  continue;
68  }
69 
70  amqp_basic_consume(*conn, 1, amqp_cstring_bytes(bindingkey), amqp_empty_bytes, 0, 0, 0,
71  amqp_empty_table);
72 
73  reply = amqp_get_rpc_reply(*conn);
74 
75  if (reply.reply_type != AMQP_RESPONSE_NORMAL)
76  {
77  LOGC("AMQP consume error");
78  amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
79  RETRY;
80  continue;
81  }
82  success = 1;
83  }
84  if (!success)
85  LOGE("Could not login to AMQP after %d tries, quitting...", current_try);
86  return 0;
87 #undef RETRY
88 }
89 
90 int amqp_consume(amqp_connection_state_t* conn, amqp_envelope_t* envelope)
91 {
92  amqp_rpc_reply_t res;
93  amqp_maybe_release_buffers(*conn);
94  res = amqp_consume_message(*conn, envelope, NULL, 0);
95 
96  if (AMQP_RESPONSE_NORMAL != res.reply_type)
97  {
98  return -1;
99  }
100  return 0;
101 }
Utilities to get config values from the environment.
Utilities for consuming RabbitMQ messages.
#define LOGE(...)
Log at ERROR level (makes the application abort)
Definition: logger.h:31
#define LOGC(...)
Log at CRITICAL level.
Definition: logger.h:29
#define RETRY
int amqp_listen_retry(const char *hostname, int port, const char *bindingkey, amqp_connection_state_t *conn, const unsigned int tries)
Setup a consumer for a specific queue.
Definition: amqp_listen.c:17
int amqp_consume(amqp_connection_state_t *conn, amqp_envelope_t *envelope)
Consume one message from a connection object.
Definition: amqp_listen.c:90
char * configvar_string(char *varname)
Get the value of a config variable from the env.
Definition: config_env.c:29
Logging macros.