AiCPlayer
Interface of aic vm - for rendering aspect, sensors, video records
sensors.c
Go to the documentation of this file.
1 
6 #include <pthread.h> // for pthread_join, pthread_t
7 #include <stdint.h> // for int32_t
8 #include <stdio.h> // for NULL
9 #include <string.h> // for strncmp
10 #include <stdlib.h> // for calloc
11 #include <signal.h> // for SIGPIPE, SIG_IGN, signal
12 #include <unistd.h> // for sleep, close
13 #include <time.h> // for nanosleep
14 
15 #include "amqp_listen.h"
16 #include "config_env.h"
17 #include "logger.h"
18 #include "buffer_sizes.h"
19 #include "player_nfc.h"
20 #include "protobuf_framing.h"
21 #include "sensors.h"
22 #include "socket.h"
23 
24 #define LOG_TAG "sensors"
25 
26 #ifdef WITH_TESTING
27 
30 static int write_protobuf_for_test(socket_t sock, amqp_envelope_t* envelope)
31 {
32  return send(sock, envelope->message.body.bytes, envelope->message.body.len, 0);
33 }
34 #endif
35 
36 static void* amqp_generic(void* args)
37 {
38  amqp_envelope_t envelope;
39  amqp_connection_state_t conn;
40  int err_amqlisten = 0;
41  int IS_SENSORS = 0;
42  socket_t sock = -1;
43 
44  const sensor_params* params = (sensor_params*) args;
45  if (!strncmp(params->sensor, "sensors", 7))
46  IS_SENSORS = 1;
47  else
48  IS_SENSORS = 0;
49 
50  LOGM("listen_GPS_or_BATT_app - %s %s %s %s", params->exchange, params->queue, params->sensor,
51  params->queue);
52 
53  amqp_listen_retry(params->amqp_host, 5672, params->queue, &conn, 5);
54  if (IS_SENSORS)
55  sock = open_socket_reuseaddr(params->gvmip, params->port);
56 
57  while (1)
58  {
59  if (!IS_SENSORS)
60  sock = open_socket(params->gvmip, params->port);
61  LOGD("FREQ - %s sock=%d", params->sensor, sock);
62  if (sock != SOCKET_ERROR)
63  {
64  err_amqlisten = amqp_consume(&conn, &envelope);
65  if (err_amqlisten == 0)
66  {
67 #ifndef WITH_TESTING
68  LOGM("Sending %d bytes to %s hardware device (:%d)", envelope.message.body.len + 4,
69  params->sensor, params->port);
70  unsigned int size = write_protobuf(sock, &envelope);
71  if (size != envelope.message.body.len + 4)
72  LOGW("Failed to send %d bytes to %s hardware device (:%d), error %d",
73  envelope.message.body.len + 4, params->sensor, params->port, size);
74 #else
75  unsigned int size = write_protobuf_for_test(sock, &envelope);
76  LOGM("Sending %d bytes ", size);
77 #endif
78  }
79  }
80  else
81  {
82  LOGW("Unable to connect to hardware device %s (:%d)", params->sensor, params->port);
83 
84  if (IS_SENSORS)
85  {
86  close(sock);
87  sleep(3);
88  sock = open_socket_reuseaddr(params->gvmip, params->port);
89  }
90  else
91  {
92  sleep(3);
93  }
94  }
95 
96  struct timespec duration = {0, params->frequency * 1000};
97  nanosleep(&duration, NULL);
98  if (!IS_SENSORS)
99  close(sock);
100  }
101 
102  return NULL;
103 }
104 
105 sensor_params* ParamEventsWorker(const char* vmip, const char* vmid, const char* sensor_name,
106  const char* amqp_host)
107 {
108  const int32_t str_length = BUF_SIZE;
109  sensor_params* paramListener = (sensor_params*) calloc(sizeof(sensor_params), 1);
110  if (!paramListener)
111  LOGE("ParamEventsWorker: out of memory");
112 
113  paramListener->amqp_host = amqp_host;
114  paramListener->gvmip = vmip;
115 
116  g_strlcpy(paramListener->sensor, sensor_name, str_length);
117  g_strlcpy(paramListener->exchange, sensor_name, str_length);
118  snprintf(paramListener->queue, str_length, "android-events.%s.%s", vmid, sensor_name);
119 
120  if (!strncmp(sensor_name, "battery", 7))
121  {
122  paramListener->port = PORT_BAT;
123  paramListener->frequency = FREQ_BAT;
124  }
125  else if (!strncmp(sensor_name, "sensors", 7))
126  {
127  paramListener->port = PORT_SENSORS;
128  paramListener->frequency = FREQ_SENSORS;
129  }
130  else if (!strncmp(sensor_name, "gps", 3))
131  {
132  paramListener->port = PORT_GPS;
133  paramListener->frequency = FREQ_GPS;
134  }
135  else if (!strncmp(sensor_name, "gsm", 3))
136  {
137  paramListener->port = PORT_GSM;
138  paramListener->frequency = FREQ_DEFAULT;
139  }
140  else if (!strncmp(sensor_name, "nfc", 3))
141  {
142  paramListener->port = PORT_NFC;
143  paramListener->frequency = FREQ_DEFAULT;
144  }
145  else
146  LOGE("Unkwnown sensor type: %s", sensor_name);
147 
148  LOGD("ParamEventsWorker - %d ; %s ; %s ; %s ", paramListener->port, paramListener->gvmip,
149  paramListener->exchange, paramListener->queue);
150 
151  return paramListener;
152 }
153 
154 void start_sensor(sensor_params* params, pthread_t* thread)
155 {
156  pthread_create(thread, 0, &amqp_generic, params);
157 }
158 
159 #ifndef WITH_TESTING
160 int main()
161 {
162  char* amqp_host = NULL;
163  char* vmid = NULL;
164  char* vmip = NULL;
165 
166  int gps = 0;
167  int gsm = 0;
168  int nfc = 0;
169  int battery = 0;
170  int sensors = 0;
171 
172  pthread_t threadbat;
173  pthread_t threadsens;
174  pthread_t threadgps;
175  pthread_t threadgsm;
176 
177  signal(SIGPIPE, SIG_IGN);
178  LOGI("Starting sensor listening");
179 
180  amqp_host = configvar_string("AIC_PLAYER_AMQP_HOST");
181  vmid = configvar_string("AIC_PLAYER_VM_ID");
182  vmip = configvar_string("AIC_PLAYER_VM_HOST");
183  sensors = configvar_bool("AIC_PLAYER_ENABLE_SENSORS");
184  battery = configvar_bool("AIC_PLAYER_ENABLE_BATTERY");
185  gps = configvar_bool("AIC_PLAYER_ENABLE_GPS");
186  gsm = configvar_bool("AIC_PLAYER_ENABLE_GSM");
187  nfc = configvar_bool("AIC_PLAYER_ENABLE_NFC");
188 
189  // ensure the variables are there
190  configvar_string("AIC_PLAYER_AMQP_PASSWORD");
191  configvar_string("AIC_PLAYER_AMQP_USERNAME");
192 
193  if (battery)
194  start_sensor(ParamEventsWorker(vmip, vmid, "battery", amqp_host), &threadbat);
195  if (sensors)
196  start_sensor(ParamEventsWorker(vmip, vmid, "sensors", amqp_host), &threadsens);
197  if (gps)
198  start_sensor(ParamEventsWorker(vmip, vmid, "gps", amqp_host), &threadgps);
199  if (gsm)
200  start_sensor(ParamEventsWorker(vmip, vmid, "gsm", amqp_host), &threadgsm);
201  if (nfc)
202  listen_NFC(ParamEventsWorker(vmip, vmid, "nfc", amqp_host));
203 
204  if (battery)
205  pthread_join(threadbat, NULL);
206  if (sensors)
207  pthread_join(threadsens, NULL);
208  if (gps)
209  pthread_join(threadgps, NULL);
210  if (gsm)
211  pthread_join(threadgsm, NULL);
212 
213  return 0;
214 }
215 #endif // UNIT_TESTING
Utilities to get config values from the environment.
#define LOGD(...)
Log at DEBUG level.
Definition: logger.h:21
Utilities for consuming RabbitMQ messages.
#define PORT_GSM
Port for the GSM command in the VM.
Definition: sensors.h:16
char sensor[BUF_SIZE]
Sensor name.
Definition: sensors.h:36
Parameter for sensor threads.
Definition: sensors.h:31
int configvar_bool(char *varname)
Get the value of a boolean config variable from the env.
Definition: config_env.c:45
#define LOGE(...)
Log at ERROR level (makes the application abort)
Definition: logger.h:31
#define BUF_SIZE
Small, fixed-size buffers.
Definition: buffer_sizes.h:9
#define FREQ_GPS
Definition: sensors.h:22
sensor_params * ParamEventsWorker(const char *vmip, const char *vmid, const char *sensor_name, const char *amqp_host)
Creates the data structure for sensor threads.
Definition: sensors.c:105
#define PORT_GPS
Port for the GPS command in the VM.
Definition: sensors.h:14
const char * gvmip
VM IP.
Definition: sensors.h:42
int amqp_listen_retry(const char *hostname, int port, const char *bindingkey, amqp_connection_state_t *conn, const unsigned int tries)
Setup a consumer for a specific queue.
Definition: amqp_listen.c:17
int socket_t
Alias to differenciate between regular ints and socket fds.
Definition: socket.h:13
int main()
Definition: sensors.c:160
int write_protobuf(socket_t sock, amqp_envelope_t *envelope)
Write a protobuf, with a varint32 framing, and padding at the end from an envelope.
Nfc player.
socket_t open_socket(const char *ip, short port)
Connect to a host:port couple.
Definition: socket.c:29
#define FREQ_BAT
Definition: sensors.h:21
char queue[BUF_SIZE]
Queue name.
Definition: sensors.h:40
void * listen_NFC(void *args)
Listen to AMQP and send data to the NFC sensor in the VM.
Definition: player_nfc.c:16
Defines ports and structures for sensor threads.
char * configvar_string(char *varname)
Get the value of a config variable from the env.
Definition: config_env.c:29
char exchange[BUF_SIZE]
Exchange name.
Definition: sensors.h:38
#define PORT_SENSORS
Port for the "sensors" command in the VM.
Definition: sensors.h:10
#define PORT_NFC
Port for the NFC command in the VM.
Definition: sensors.h:18
int amqp_consume(amqp_connection_state_t *conn, amqp_envelope_t *envelope)
Consume one message from a connection object.
Definition: amqp_listen.c:90
#define PORT_BAT
Port for the battery command in the VM.
Definition: sensors.h:12
void start_sensor(sensor_params *params, pthread_t *thread)
Start the sensor listener thread.
Definition: sensors.c:154
#define FREQ_DEFAULT
Definition: sensors.h:23
Logging macros.
#define FREQ_SENSORS
Definition: sensors.h:20
#define LOGM(...)
Log at MESSAGE level.
Definition: logger.h:25
int32_t frequency
Sensor throttling.
Definition: sensors.h:46
#define LOGW(...)
Log at WARNING level.
Definition: logger.h:27
socket_t open_socket_reuseaddr(const char *ip, short port)
Open a socket with SO_REUSEADDR.
Definition: socket.c:39
#define SOCKET_ERROR
Alias for the recv() return value in case of error.
Definition: socket.h:10
Define common buffer sizes.
Define socket utilities to simplify networking.
Utility to convert framing to varint32 for google’s parser.
#define LOGI(...)
Log at INFO level.
Definition: logger.h:23
const char * amqp_host
AMQP host.
Definition: sensors.h:44
int32_t port
Remote port.
Definition: sensors.h:34