12 #include <amqp_framing.h>
13 #include <amqp_tcp_socket.h>
15 #define LOG_TAG "amqp_listen"
18 amqp_connection_state_t* conn,
const unsigned int tries)
29 unsigned int current_try = 0;
30 unsigned int backoff = 1;
31 amqp_rpc_reply_t reply;
33 *conn = amqp_new_connection();
35 while (current_try < tries && !success)
37 amqp_socket_t* socket = amqp_tcp_socket_new(*conn);
38 int status = amqp_socket_open(socket, hostname, port);
42 LOGC(
"AMQP error opening socket: %s", (
char*) amqp_error_string2(status));
47 reply = amqp_login(*conn,
"/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE,
48 AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN,
52 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
54 LOGC(
"AMQP login error");
59 amqp_channel_open(*conn, 1);
60 reply = amqp_get_rpc_reply(*conn);
62 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
64 LOGC(
"AMQP Channel error");
65 amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
70 amqp_basic_consume(*conn, 1, amqp_cstring_bytes(bindingkey), amqp_empty_bytes, 0, 0, 0,
73 reply = amqp_get_rpc_reply(*conn);
75 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
77 LOGC(
"AMQP consume error");
78 amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS);
85 LOGE(
"Could not login to AMQP after %d tries, quitting...", current_try);
90 int amqp_consume(amqp_connection_state_t* conn, amqp_envelope_t* envelope)
93 amqp_maybe_release_buffers(*conn);
94 res = amqp_consume_message(*conn, envelope, NULL, 0);
96 if (AMQP_RESPONSE_NORMAL != res.reply_type)
Utilities to get config values from the environment.
Utilities for consuming RabbitMQ messages.
#define LOGE(...)
Log at ERROR level (makes the application abort)
#define LOGC(...)
Log at CRITICAL level.
int amqp_listen_retry(const char *hostname, int port, const char *bindingkey, amqp_connection_state_t *conn, const unsigned int tries)
Setup a consumer for a specific queue.
int amqp_consume(amqp_connection_state_t *conn, amqp_envelope_t *envelope)
Consume one message from a connection object.
char * configvar_string(char *varname)
Get the value of a config variable from the env.