7 #define LOG_TAG "amqp_send"
9 static int amqp_connect(
const char* hostname,
int port, amqp_connection_state_t* conn,
10 const unsigned int tries)
22 unsigned int current_try = 0;
23 unsigned int backoff = 1;
24 amqp_socket_t* socket = NULL;
25 amqp_rpc_reply_t reply;
27 *conn = amqp_new_connection();
29 while (current_try < tries && !success)
31 socket = amqp_tcp_socket_new(*conn);
32 status = amqp_socket_open(socket, hostname, port);
36 LOGC(
"AMQP error opening socket: %s", (
char*) amqp_error_string2(status));
41 reply = amqp_login(*conn,
"/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE,
42 AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN,
46 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
48 LOGC(
"AMQP login error");
53 amqp_channel_open(*conn, 1);
54 reply = amqp_get_rpc_reply(*conn);
56 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
58 LOGC(
"AMQP Channel error");
59 amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
66 LOGE(
"Could not login to AMQP after %d tries, quitting...", current_try);
71 int amqp_send(
char const* hostname,
int port,
char const* exchange,
char const* bindingkey,
72 int size,
unsigned char* messagebody)
74 amqp_bytes_t messageByte;
75 amqp_basic_properties_t props;
76 messageByte.len = size;
77 messageByte.bytes = messagebody;
79 amqp_connection_state_t conn;
80 amqp_connect(hostname, port, &conn, 3);
82 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
83 props.content_type = amqp_cstring_bytes(
"text/text");
84 props.delivery_mode = 2;
85 props.content_encoding = amqp_cstring_bytes(
"UTF-8");
87 amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(
"fanout"), 0, 0,
88 0, 0, amqp_empty_table);
90 amqp_queue_declare_ok_t* r =
91 amqp_queue_declare(conn, 1, amqp_cstring_bytes(bindingkey), 0, 1, 0, 0, amqp_empty_table);
92 amqp_get_rpc_reply(conn);
93 if (&(r->queue) == NULL)
95 LOGE(
"Unable to declare the queue");
98 amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
99 if (queuename.bytes == NULL)
100 LOGE(
"Out of memory");
102 amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
103 amqp_cstring_bytes(bindingkey), amqp_empty_table);
105 amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), queuename, 0, 0, &props, messageByte);
106 amqp_destroy_connection(conn);
#define LOGE(...)
Log at ERROR level (makes the application abort)
#define LOGC(...)
Log at CRITICAL level.
char * configvar_string(char *varname)
Get the value of a config variable from the env.
int amqp_send(char const *hostname, int port, char const *exchange, char const *bindingkey, int size, unsigned char *messagebody)
Send a payload to an AMQP exchange.