AiCPlayer
Interface of aic vm - for rendering aspect, sensors, video records
amqp_send.c
Go to the documentation of this file.
1 #include <stdio.h>
2 #include <amqp.h>
3 #include <unistd.h>
4 #include "amqp_send.h"
5 #include "logger.h"
6 
7 #define LOG_TAG "amqp_send"
8 
9 static int amqp_connect(const char* hostname, int port, amqp_connection_state_t* conn,
10  const unsigned int tries)
11 {
13 #define RETRY \
14  do \
15  { \
16  sleep(backoff); \
17  backoff *= 2; \
18  current_try++; \
19  } while (0)
20  int status;
21  uint8_t success = 0;
22  unsigned int current_try = 0;
23  unsigned int backoff = 1;
24  amqp_socket_t* socket = NULL;
25  amqp_rpc_reply_t reply;
26 
27  *conn = amqp_new_connection();
28 
29  while (current_try < tries && !success)
30  {
31  socket = amqp_tcp_socket_new(*conn);
32  status = amqp_socket_open(socket, hostname, port);
33 
34  if (status)
35  {
36  LOGC("AMQP error opening socket: %s", (char*) amqp_error_string2(status));
37  RETRY;
38  continue;
39  }
40 
41  reply = amqp_login(*conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE,
42  AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN,
43  configvar_string("AIC_PLAYER_AMQP_USERNAME"),
44  configvar_string("AIC_PLAYER_AMQP_PASSWORD"));
45 
46  if (reply.reply_type != AMQP_RESPONSE_NORMAL)
47  {
48  LOGC("AMQP login error");
49  RETRY;
50  continue;
51  }
52 
53  amqp_channel_open(*conn, 1);
54  reply = amqp_get_rpc_reply(*conn);
55 
56  if (reply.reply_type != AMQP_RESPONSE_NORMAL)
57  {
58  LOGC("AMQP Channel error");
59  amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
60  RETRY;
61  continue;
62  }
63  success = 1;
64  }
65  if (!success)
66  LOGE("Could not login to AMQP after %d tries, quitting...", current_try);
67  return 0;
68 #undef RETRY
69 }
70 
71 int amqp_send(char const* hostname, int port, char const* exchange, char const* bindingkey,
72  int size, unsigned char* messagebody)
73 {
74  amqp_bytes_t messageByte;
75  amqp_basic_properties_t props;
76  messageByte.len = size; // sizeof(messagebody);
77  messageByte.bytes = messagebody;
78 
79  amqp_connection_state_t conn;
80  amqp_connect(hostname, port, &conn, 3);
81 
82  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
83  props.content_type = amqp_cstring_bytes("text/text");
84  props.delivery_mode = 2; // persistent delivery mode
85  props.content_encoding = amqp_cstring_bytes("UTF-8");
86 
87  amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes("fanout"), 0, 0,
88  0, 0, amqp_empty_table);
89 
90  amqp_queue_declare_ok_t* r =
91  amqp_queue_declare(conn, 1, amqp_cstring_bytes(bindingkey), 0, 1, 0, 0, amqp_empty_table);
92  amqp_get_rpc_reply(conn);
93  if (&(r->queue) == NULL)
94  {
95  LOGE("Unable to declare the queue");
96  return 0;
97  }
98  amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
99  if (queuename.bytes == NULL)
100  LOGE("Out of memory");
101 
102  amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
103  amqp_cstring_bytes(bindingkey), amqp_empty_table);
104 
105  amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), queuename, 0, 0, &props, messageByte);
106  amqp_destroy_connection(conn);
107  return 0;
108 }
#define RETRY
Send AMQP messages.
#define LOGE(...)
Log at ERROR level (makes the application abort)
Definition: logger.h:31
#define LOGC(...)
Log at CRITICAL level.
Definition: logger.h:29
char * configvar_string(char *varname)
Get the value of a config variable from the env.
Definition: config_env.c:29
Logging macros.
int amqp_send(char const *hostname, int port, char const *exchange, char const *bindingkey, int size, unsigned char *messagebody)
Send a payload to an AMQP exchange.
Definition: amqp_send.c:71