7 #define LOG_TAG "amqp_send" 
    9 static int amqp_connect(
const char* hostname, 
int port, amqp_connection_state_t* conn,
 
   10                         const unsigned int tries)
 
   22     unsigned int current_try = 0;
 
   23     unsigned int backoff = 1;
 
   24     amqp_socket_t* socket = NULL;
 
   25     amqp_rpc_reply_t reply;
 
   27     *conn = amqp_new_connection();
 
   29     while (current_try < tries && !success)
 
   31         socket = amqp_tcp_socket_new(*conn);
 
   32         status = amqp_socket_open(socket, hostname, port);
 
   36             LOGC(
"AMQP error opening socket: %s", (
char*) amqp_error_string2(status));
 
   41         reply = amqp_login(*conn, 
"/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE,
 
   42                            AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN,
 
   46         if (reply.reply_type != AMQP_RESPONSE_NORMAL)
 
   48             LOGC(
"AMQP login error");
 
   53         amqp_channel_open(*conn, 1);
 
   54         reply = amqp_get_rpc_reply(*conn);
 
   56         if (reply.reply_type != AMQP_RESPONSE_NORMAL)
 
   58             LOGC(
"AMQP Channel error");
 
   59             amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
 
   66         LOGE(
"Could not login to AMQP after %d tries, quitting...", current_try);
 
   71 int amqp_send(
char const* hostname, 
int port, 
char const* exchange, 
char const* bindingkey,
 
   72               int size, 
unsigned char* messagebody)
 
   74     amqp_bytes_t messageByte;
 
   75     amqp_basic_properties_t props;
 
   76     messageByte.len = size;  
 
   77     messageByte.bytes = messagebody;
 
   79     amqp_connection_state_t conn;
 
   80     amqp_connect(hostname, port, &conn, 3);
 
   82     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
 
   83     props.content_type = amqp_cstring_bytes(
"text/text");
 
   84     props.delivery_mode = 2;  
 
   85     props.content_encoding = amqp_cstring_bytes(
"UTF-8");
 
   87     amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(
"fanout"), 0, 0,
 
   88                           0, 0, amqp_empty_table);
 
   90     amqp_queue_declare_ok_t* r =
 
   91         amqp_queue_declare(conn, 1, amqp_cstring_bytes(bindingkey), 0, 1, 0, 0, amqp_empty_table);
 
   92     amqp_get_rpc_reply(conn);
 
   93     if (&(r->queue) == NULL)
 
   95         LOGE(
"Unable to declare the queue");
 
   98     amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
 
   99     if (queuename.bytes == NULL)
 
  100         LOGE(
"Out of memory");
 
  102     amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
 
  103                     amqp_cstring_bytes(bindingkey), amqp_empty_table);
 
  105     amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), queuename, 0, 0, &props, messageByte);
 
  106     amqp_destroy_connection(conn);
 
#define LOGE(...)
Log at ERROR level (makes the application abort) 
 
#define LOGC(...)
Log at CRITICAL level. 
 
char * configvar_string(char *varname)
Get the value of a config variable from the env. 
 
int amqp_send(char const *hostname, int port, char const *exchange, char const *bindingkey, int size, unsigned char *messagebody)
Send a payload to an AMQP exchange.