Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
feat(cache): Check cache capacity
Browse files Browse the repository at this point in the history
`cache_occupied_space()` would return the used size in caching database.

When the capacity of redis server exceeds the maximun capacity, the
health_track thread will pop the oldest UUID and the corresponding
bundle.

To avoid data racing, at the moment that health track is cleaning
redis server, all the other thread is not able to cache service.

Add the Document for buffer service.

close #488
  • Loading branch information
howjmay committed May 20, 2020
1 parent 2100122 commit ccfbaec
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 102 deletions.
4 changes: 3 additions & 1 deletion accelerator/cli_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum ta_cli_arg_value_e {
BUFFER_LIST,
DONE_LIST,
HTTP_THREADS_CLI,
CACHE_CAPACITY,

/** LOGGER */
QUIET,
Expand Down Expand Up @@ -83,14 +84,15 @@ static struct ta_cli_argument_s {
{"milestone_depth", optional_argument, NULL, MILESTONE_DEPTH_CLI, "IRI milestone depth"},
{"mwm", optional_argument, NULL, MWM_CLI, "minimum weight magnitude"},
{"seed", optional_argument, NULL, SEED_CLI, "IOTA seed"},
{"cache", no_argument, NULL, CACHE, "Enable cache server"},
{"cache", required_argument, NULL, CACHE, "Enable/Disable cache server. It defaults to off"},
{"config", required_argument, NULL, CONF_CLI, "Read configuration file"},
{"proxy_passthrough", no_argument, NULL, PROXY_API, "Pass proxy API directly to IRI without processing"},
{"health_track_period", no_argument, NULL, HEALTH_TRACK_PERIOD,
"The period for checking IRI host connection status"},
{"no-gtta", no_argument, NULL, NO_GTTA, "Disable getTransactionToConfirm (gTTA) when sending transaction"},
{"buffer_list", required_argument, NULL, BUFFER_LIST, "Set the value of `buffer_list_name`"},
{"done_list", required_argument, NULL, DONE_LIST, "Set the value of `done_list_name`"},
{"cache_capacity", required_argument, NULL, CACHE_CAPACITY, "Set the maximum capacity of caching server`"},
{"quiet", no_argument, NULL, QUIET, "Disable logger"},
{NULL, 0, NULL, 0, NULL}};

Expand Down
49 changes: 33 additions & 16 deletions accelerator/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
ta_conf->port = (int)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
break;
case HTTP_THREADS_CLI:
Expand All @@ -79,7 +79,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
ta_conf->http_tpool_size = (uint8_t)strtol_temp;
}
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
break;

Expand All @@ -98,7 +98,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
if (strtol_p != p && errno != ERANGE && strtol_temp >= 0 && strtol_temp <= USHRT_MAX) {
ta_conf->iota_port_list[idx] = (uint16_t)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
}
iota_service->http.port = ta_conf->iota_port_list[0];
Expand All @@ -108,9 +108,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
ta_conf->health_track_period = (int)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
break;
case CACHE:
ta_log_info("Initializing cache state\n");
cache->state = !cache->state;
if (cache->state) {
if (cache_init(&cache->rwlock, cache->state, cache->host, cache->port)) {
ta_log_error("%s.\n", "Lock initialization failed");
}
}

break;

#ifdef MQTT_ENABLE
// MQTT configuration
Expand All @@ -131,7 +141,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
cache->port = (int)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input character\n");
}
break;
case CACHE_MAX_CAPACITY:
strtol_temp = strtol(value, NULL, 10);
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
if (strtol_temp <= 0) {
ta_log_error("The capacity of caching service should greater then 0.\n");
break;
}
cache->capacity = strtol_temp;
} else {
ta_log_error("Malformed input\n");
}
break;

Expand All @@ -149,23 +171,20 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
iota_conf->milestone_depth = (int)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
break;
case MWM_CLI:
strtol_temp = strtol(value, NULL, 10);
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
iota_conf->mwm = (int)strtol_temp;
} else {
ta_log_error("Malformed input or illegal input character\n");
ta_log_error("Malformed input\n");
}
break;
case SEED_CLI:
iota_conf->seed = value;
break;
case CACHE:
cache->cache_state = true;
break;

// Quiet mode configuration
case QUIET:
Expand Down Expand Up @@ -234,7 +253,7 @@ status_t ta_core_default_init(ta_core_t* const core) {
}
ta_conf->http_tpool_size = DEFAULT_HTTP_TPOOL_SIZE;
ta_conf->proxy_passthrough = false;
ta_conf->health_track_period = IRI_HEALTH_TRACK_PERIOD;
ta_conf->health_track_period = HEALTH_TRACK_PERIOD;
ta_conf->gtta = true;
#ifdef MQTT_ENABLE
ta_conf->mqtt_host = MQTT_HOST;
Expand All @@ -243,9 +262,10 @@ status_t ta_core_default_init(ta_core_t* const core) {
ta_log_info("Initializing Redis information\n");
cache->host = REDIS_HOST;
cache->port = REDIS_PORT;
cache->cache_state = false;
cache->state = false;
cache->buffer_list_name = BUFFER_LIST_NAME;
cache->done_list_name = DONE_LIST_NAME;
cache->capacity = CACHE_MAX_CAPACITY;

ta_log_info("Initializing IRI configuration\n");
iota_conf->milestone_depth = MILESTONE_DEPTH;
Expand Down Expand Up @@ -415,7 +435,6 @@ status_t ta_core_cli_init(ta_core_t* const core, int argc, char** argv) {
status_t ta_core_set(ta_core_t* core) {
status_t ret = SC_OK;

ta_cache_t* const cache = &core->cache;
iota_client_service_t* const iota_service = &core->iota_service;
#ifdef DB_ENABLE
db_client_service_t* const db_service = &core->db_service;
Expand All @@ -430,8 +449,6 @@ status_t ta_core_set(ta_core_t* core) {
ta_log_info("Initializing PoW implementation context\n");
pow_init();

ta_log_info("Initializing cache state\n");
cache_init(cache->cache_state, cache->host, cache->port);
#ifdef DB_ENABLE
ta_log_info("Initializing db client service\n");
if ((ret = db_client_service_init(db_service, DB_USAGE_REATTACH)) != SC_OK) {
Expand All @@ -452,7 +469,7 @@ void ta_core_destroy(ta_core_t* const core) {
db_client_service_free(&core->db_service);
#endif
pow_destroy();
cache_stop();
cache_stop(&core->cache.rwlock);
logger_helper_release(logger_id);
br_logger_release();
}
19 changes: 12 additions & 7 deletions accelerator/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "cclient/serialization/json/json_serializer.h"
#include "common/logger.h"
#include "utils/cache/cache.h"
#include "utils/handles/lock.h"

#define FILE_PATH_SIZE 128

Expand Down Expand Up @@ -59,7 +60,8 @@ extern "C" {
#define MAM_FILE_PREFIX "/tmp/mam_bin_XXXXXX"
#define BUFFER_LIST_NAME "txn_buff_list"
#define DONE_LIST_NAME "done_txn_buff_list"
#define IRI_HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
#define CACHE_MAX_CAPACITY 170 * 1024 * 1024 // default to 170MB
#define HEALTH_TRACK_PERIOD 1800 // Check every half hour in default

/** @name Redis connection config */
/** @{ */
Expand Down Expand Up @@ -94,12 +96,14 @@ typedef struct iota_config_s {

/** struct type of accelerator cache */
typedef struct ta_cache_s {
char* host; /**< Binding address of redis server */
uint64_t timeout; /**< Timeout for keys in redis */
char* buffer_list_name; /**< Name of the list to buffer transactions */
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
uint16_t port; /**< Binding port of redis server */
bool cache_state; /**< Set it true to turn on cache server */
char* host; /**< Binding address of redis server */
uint64_t timeout; /**< Timeout for keys in cache server */
char* buffer_list_name; /**< Name of the list to buffer transactions */
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
uint16_t port; /**< Binding port of redis server */
bool state; /**< Set it true to turn on cache server */
long int capacity; /**< The maximum capacity of cache server */
pthread_rwlock_t* rwlock; /**< Read/Write lock to avoid data racing in buffering */
} ta_cache_t;

/** struct type of accelerator core */
Expand All @@ -108,6 +112,7 @@ typedef struct ta_core_s {
ta_cache_t cache; /**< redis configuration structure */
iota_config_t iota_conf; /**< iota configuration structure */
iota_client_service_t iota_service; /**< iota connection structure */

#ifdef DB_ENABLE
db_client_service_t db_service; /**< db connection structure */
#endif
Expand Down
18 changes: 0 additions & 18 deletions accelerator/core/apis.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,6 @@ int apis_logger_release();
status_t api_get_ta_info(ta_config_t* const info, iota_config_t* const tangle, ta_cache_t* const cache,
char** json_result);

/**
* Initialize lock
*
* @return
* - zero on success
* - SC_CONF_LOCK_INIT on error
*/
status_t apis_lock_init();

/**
* Destroy lock
*
* @return
* - zero on success
* - SC_CONF_LOCK_DESTROY on error
*/
status_t apis_lock_destroy();

/**
* @brief Generate an unused address.
*
Expand Down
37 changes: 36 additions & 1 deletion accelerator/core/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
goto done;
}

if (uuid_list_len == 0) {
ta_log_debug("No buffered requests\n");
goto done;
}

ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid);
if (ret) {
ta_log_error("%s\n", ta_error_to_string(ret));
Expand Down Expand Up @@ -755,6 +760,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
}
}

if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
ret = SC_CACHE_LOCK;
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}
// Pop transaction from buffered list
ret = cache_list_pop(core->cache.buffer_list_name, (char*)uuid);
if (ret) {
Expand All @@ -768,6 +778,12 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}
if (pthread_rwlock_unlock(core->cache.rwlock)) {
ret = SC_CACHE_LOCK;
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}

get_trytes_req_free(&req);
get_trytes_res_free(&res);
} while (!uuid_list_len);
Expand All @@ -782,15 +798,26 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const uuid,
ta_fetch_txn_with_uuid_res_t* res) {
status_t ret = SC_OK;
char pop_uuid[UUID_STR_LEN];
if (pthread_rwlock_tryrdlock(cache->rwlock)) {
ret = SC_CACHE_LOCK;
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}

bool exist = false;
ret = cache_list_exist(cache->buffer_list_name, uuid, UUID_STR_LEN - 1, &exist);
if (ret) {
ta_log_error("%s\n", ta_error_to_string(ret));
if (pthread_rwlock_unlock(cache->rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
}
goto done;
}
if (exist) {
res->status = UNSENT;
if (pthread_rwlock_unlock(cache->rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
}
goto done;
}

Expand All @@ -808,6 +835,9 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}
if (pthread_rwlock_unlock(cache->rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
}

for (int i = 0; i < len; ++i) {
flex_trit_t txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1];
Expand All @@ -833,11 +863,16 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
goto done;
}

char pop_uuid[UUID_STR_LEN];
ret = cache_list_pop(cache->done_list_name, pop_uuid);
if (ret) {
ta_log_error("%s\n", ta_error_to_string(ret));
goto done;
}
} else {
if (pthread_rwlock_unlock(cache->rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
}
}

done:
Expand Down
26 changes: 24 additions & 2 deletions accelerator/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void ta_stop(int signal) {

static void* health_track(void* arg) {
ta_core_t* core = (ta_core_t*)arg;
while (true) {
while (core->cache.state) {
status_t ret = ta_get_iri_status(&core->iota_service);
if (ret == SC_CORE_IRI_UNSYNC || ret == SC_CCLIENT_FAILED_RESPONSE) {
ta_log_error("IRI status error %d. Try to connect to another IRI host on priority list\n", ret);
Expand All @@ -40,6 +40,28 @@ static void* health_track(void* arg) {
}
}

char uuid[UUID_STR_LEN] = {};
// The usage exceeds the maximum redis capacity
while (core->cache.capacity < cache_occupied_space()) {
if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
break;
}

ret = cache_list_pop(core->cache.done_list_name, uuid);
if (ret) {
ta_log_error("%s\n", ta_error_to_string(ret));
}
ret = cache_del(uuid);
if (ret) {
ta_log_error("%s\n", ta_error_to_string(ret));
}

if (pthread_rwlock_unlock(core->cache.rwlock)) {
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK));
}
}

sleep(core->ta_conf.health_track_period);
}
return ((void*)NULL);
Expand Down Expand Up @@ -78,7 +100,7 @@ int main(int argc, char* argv[]) {
}

pthread_t thread;
pthread_create(&thread, NULL, health_track, &ta_core);
pthread_create(&thread, NULL, health_track, (void*)&ta_core);

if (ta_http_init(&ta_http, &ta_core) != SC_OK) {
ta_log_error("HTTP initialization failed %s.\n", MAIN_LOGGER);
Expand Down
4 changes: 4 additions & 0 deletions common/ta_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ typedef enum {
/**< Failed in cache operations */
SC_CACHE_OFF = 0x03 | SC_MODULE_CACHE | SC_SEVERITY_MINOR,
/**< Cache server is not turned on */
SC_CACHE_LOCK_ERROR = 0x04 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
/**< Failed to initialize or destroy lock in cache */
SC_CACHE_LOCK = 0x05 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
/**< Failed to lock or unlock cache operations */

// MAM module
SC_MAM_NULL = 0x01 | SC_MODULE_MAM | SC_SEVERITY_FATAL,
Expand Down
Loading

0 comments on commit ccfbaec

Please sign in to comment.