diff --git a/examples/examples.h b/examples/examples.h index 83670b5f3..4a765830c 100644 --- a/examples/examples.h +++ b/examples/examples.h @@ -15,6 +15,7 @@ #define EXAMPLES_H_ #include +#include #include #include diff --git a/examples/microservice.c b/examples/microservice.c index 9be7c1f03..0617b6b14 100644 --- a/examples/microservice.c +++ b/examples/microservice.c @@ -11,82 +11,570 @@ // See the License for the specific language governing permissions and // limitations under the License. +#ifdef _WIN32 +#error "This example does not work on Windows" +#endif + #include +#include #include "examples.h" -static void -onMsg(natsMicroservice *m, natsMicroserviceRequest *req, void *closure) +static int fakeClosure = 0; + +// The "main" functions for the services, each is run as a pthread. +static void *run_arithmetics(void *closure); +static void *run_functions(void *closure); +static void *run_sequence(void *closure); + +// a generic service runner, used by the services' "main" functions. +static void *run_service(natsConnection *conn, natsMicroserviceConfig *svc, + natsMicroserviceEndpointConfig **endpoints, const char **ep_names, int len_endpoints); + +// Callers and handlers for operations (2 floating point args, and a single int arg). +static natsMicroserviceError * + call_arithmetics(long double *result, natsConnection *nc, const char *subject, long double a1, long double a2); +typedef natsMicroserviceError *(*arithmeticsOP)( + long double *result, natsConnection *conn, long double a1, long double a2); +static void handle_arithmetics_op(natsMicroserviceRequest *req, arithmeticsOP op); + +static natsMicroserviceError * + call_function(long double *result, natsConnection *nc, const char *subject, int n); +typedef natsMicroserviceError *(*functionOP)(long double *result, natsConnection *conn, int n); +static void handle_function_op(natsMicroserviceRequest *req, functionOP op); + +// Stop handler is the same for all services. +static void handle_stop(natsMicroservice *m, natsMicroserviceRequest *req); + +// Handler for "sequence", the main endpoint of the sequence service. +static void handle_sequence(natsMicroservice *m, natsMicroserviceRequest *req); + +// Math operations, wrapped as handlers. +static natsMicroserviceError *add(long double *result, natsConnection *nc, long double a1, long double a2); +static natsMicroserviceError *divide(long double *result, natsConnection *nc, long double a1, long double a2); +static natsMicroserviceError *multiply(long double *result, natsConnection *nc, long double a1, long double a2); + +static void handle_add(natsMicroservice *m, natsMicroserviceRequest *req) { handle_arithmetics_op(req, add); } +static void handle_divide(natsMicroservice *m, natsMicroserviceRequest *req) { handle_arithmetics_op(req, divide); } +static void handle_multiply(natsMicroservice *m, natsMicroserviceRequest *req) { handle_arithmetics_op(req, multiply); } + +static natsMicroserviceError *factorial(long double *result, natsConnection *nc, int n); +static natsMicroserviceError *fibonacci(long double *result, natsConnection *nc, int n); +static natsMicroserviceError *power2(long double *result, natsConnection *nc, int n); +static void handle_factorial(natsMicroservice *m, natsMicroserviceRequest *req) { handle_function_op(req, factorial); } +static void handle_fibonacci(natsMicroservice *m, natsMicroserviceRequest *req) { handle_function_op(req, fibonacci); } +static void handle_power2(natsMicroservice *m, natsMicroserviceRequest *req) { handle_function_op(req, power2); } + +static natsMicroserviceEndpointConfig stop_cfg = { + .subject = "stop", + .handler = handle_stop, + .closure = &fakeClosure, + .schema = NULL, +}; + +int main(int argc, char **argv) { - char buf[1024]; - snprintf(buf, sizeof(buf), "c-example-microservice: OK: %.*s", - natsMicroserviceRequest_GetDataLength(req), - natsMicroserviceRequest_GetData(req)); + natsStatus s = NATS_OK; + natsConnection *conn = NULL; + natsOptions *opts = NULL; + pthread_t arithmetics_th; + void *a_val = NULL; + pthread_t functions_th; + void *f_val = NULL; + pthread_t sequence_th; + void *s_val = NULL; + int errno; - if (print) - printf("%s\n", buf); + // Connect and start the services + opts = parseArgs(argc, argv, ""); + s = natsConnection_Connect(&conn, opts); + if (s == NATS_OK) + { + errno = pthread_create(&arithmetics_th, NULL, run_arithmetics, conn); + if (errno != 0) + { + printf("Error creating arithmetics thread: %d: %s\n", errno, strerror(errno)); + return 1; + } + + errno = pthread_create(&functions_th, NULL, run_functions, conn); + if (errno != 0) + { + printf("Error creating functions thread: %d: %s\n", errno, strerror(errno)); + return 1; + } + + errno = pthread_create(&sequence_th, NULL, run_sequence, conn); + if (errno != 0) + { + printf("Error creating sequence thread: %d: %s\n", errno, strerror(errno)); + return 1; + } + } + + // Wait for the services to stop and self-destruct. + if (s == NATS_OK) + { + pthread_join(arithmetics_th, &a_val); + s = (natsStatus)(uintptr_t)a_val; + } + if (s == NATS_OK) + { + pthread_join(functions_th, &f_val); + s = (natsStatus)(uintptr_t)f_val; + } + if (s == NATS_OK) + { + pthread_join(sequence_th, &s_val); + s = (natsStatus)(uintptr_t)s_val; + } - natsMicroservice_Respond(m, req, buf, strlen(buf)); + if (s == NATS_OK) + { + return 0; + } + else + { + printf("Error: %u - %s\n", s, natsStatus_GetText(s)); + nats_PrintLastErrorStack(stderr); + return 1; + } } -static void -asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure) +static void *run_sequence(void *closure) { - printf("Async error: %u - %s\n", err, natsStatus_GetText(err)); + natsConnection *conn = (natsConnection *)closure; + natsMicroserviceConfig cfg = { + .description = "Sequence adder - NATS microservice example in C", + .name = "c-sequence", + .version = "1.0.0", + }; + natsMicroserviceEndpointConfig sequence_cfg = { + .subject = "sequence", + .handler = handle_sequence, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig *endpoints[] = {&sequence_cfg}; + const char *names[] = {"sequence"}; - natsSubscription_GetDropped(sub, (int64_t *)&dropped); + return run_service(conn, &cfg, endpoints, names, 1); } -int main(int argc, char **argv) +// calculates the sum of X/f(1) + X/f(2)... up to N (included). The inputs are X +// (float), f name (string), and N (int). E.g.: '10.0 "power2" 10' will +// calculate 10/2 + 10/4 + 10/8 + 10/16 + 10/32 + 10/64 + 10/128 + 10/256 + +// 10/512 + 10/1024 = 20.998046875 +static void handle_sequence(natsMicroservice *m, natsMicroserviceRequest *req) { - natsConnection *conn = NULL; - natsOptions *opts = NULL; - natsMicroservice *m = NULL; - natsStatus s; - int fakeClosure = 0; - natsMicroserviceEndpointConfig default_endpoint_cfg = { - .subject = "c-test", - .handler = onMsg, + natsMicroserviceError *err = NULL; + natsConnection *nc = natsMicroservice_GetConnection(m); + natsArgs *args = NULL; + int n = 0; + int i; + const char *function; + long double initialValue = 1.0; + long double value = 1.0; + long double denominator = 0; + char result[64]; + + err = natsParseAsArgs(&args, natsMicroserviceRequest_GetData(req), natsMicroserviceRequest_GetDataLength(req)); + if ((err == NULL) && + (natsArgs_Count(args) != 2)) + { + err = nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "Invalid number of arguments"); + } + + if (err == NULL) + { + err = natsArgs_GetString(&function, args, 0); + } + if (err == NULL) + { + err = natsArgs_GetInt(&n, args, 1); + } + if ((err == NULL) && + (strcmp(function, "factorial") != 0) && + (strcmp(function, "power2") != 0) && + (strcmp(function, "fibonacci") != 0)) + { + err = nats_NewMicroserviceError( + NATS_INVALID_ARG, 400, + "Invalid function name, must be 'factorial', 'power2', or 'fibonacci'"); + } + if ((err == NULL) && + (n < 1)) + { + err = nats_NewMicroserviceError( + NATS_INVALID_ARG, 400, + "Invalid number of iterations, must be at least 1"); + } + + for (i = 1; (err == NULL) && (i <= n); i++) + { + err = call_function(&denominator, nc, function, i); + if (err == NULL && denominator == 0) + { + err = nats_NewMicroserviceError(0, 500, "division by zero"); + } + if (err == NULL) + { + value = value + initialValue / denominator; + } + } + + if (err == NULL) + { + snprintf(result, sizeof(result), "%Lf", value); + err = natsMicroserviceRequest_Respond(req, result, strlen(result)); + } + + if (err != NULL) + { + natsMicroserviceRequest_Error(req, &err); + } + natsArgs_Destroy(args); +} + +static void *run_arithmetics(void *closure) +{ + natsConnection *conn = (natsConnection *)closure; + natsMicroserviceConfig cfg = { + .description = "Arithmetic operations - NATS microservice example in C", + .name = "c-arithmetics", + .version = "1.0.0", + }; + natsMicroserviceEndpointConfig add_cfg = { + .subject = "add", + .handler = handle_add, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig divide_cfg = { + .subject = "divide", + .handler = handle_divide, .closure = &fakeClosure, .schema = NULL, }; + natsMicroserviceEndpointConfig multiply_cfg = { + .subject = "multiply", + .handler = handle_multiply, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig *endpoints[] = + {&add_cfg, ÷_cfg, &multiply_cfg}; + const char *names[] = + {"add", "divide", "multiply"}; + + return run_service(conn, &cfg, endpoints, names, 3); +} +static void *run_functions(void *closure) +{ + natsConnection *conn = (natsConnection *)closure; natsMicroserviceConfig cfg = { - .description = "NATS microservice example in C", - .name = "c-example-microservice", + .description = "Functions - NATS microservice example in C", + .name = "c-functions", .version = "1.0.0", - .endpoint = &default_endpoint_cfg, }; + natsMicroserviceEndpointConfig factorial_cfg = { + .subject = "factorial", + .handler = handle_factorial, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig fibonacci_cfg = { + .subject = "fibonacci", + .handler = handle_fibonacci, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig power2_cfg = { + .subject = "power2", + .handler = handle_power2, + .closure = &fakeClosure, + .schema = NULL, + }; + natsMicroserviceEndpointConfig *endpoints[] = + {&factorial_cfg, &fibonacci_cfg, &power2_cfg}; + const char *names[] = + {"factorial", "fibonacci", "power2"}; - opts = parseArgs(argc, argv, ""); + return run_service(conn, &cfg, endpoints, names, 3); +} - s = natsOptions_SetErrorHandler(opts, asyncCb, NULL); - if (s == NATS_OK) +static void +handle_arithmetics_op(natsMicroserviceRequest *req, arithmeticsOP op) +{ + natsMicroserviceError *err = NULL; + natsArgs *args = NULL; + long double a1, a2, result; + char buf[1024]; + int len; + + err = natsParseAsArgs(&args, natsMicroserviceRequest_GetData(req), natsMicroserviceRequest_GetDataLength(req)); + if ((err == NULL) && (natsArgs_Count(args) != 2)) { - s = natsConnection_Connect(&conn, opts); + err = nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "Invalid number of arguments, must be 2"); } - if (s == NATS_OK) + if (err == NULL) { - s = nats_AddMicroservice(&m, conn, &cfg); + err = natsArgs_GetFloat(&a1, args, 0); } - if (s == NATS_OK) + if (err == NULL) { - s = natsMicroservice_Run(m); + err = natsArgs_GetFloat(&a2, args, 1); } - if (s == NATS_OK) + if (err == NULL) + { + err = op(&result, natsMicroserviceRequest_GetConnection(req), a1, a2); + } + if (err == NULL) { - // Destroy all our objects to avoid report of memory leak - natsMicroservice_Destroy(m); - natsConnection_Destroy(conn); - natsOptions_Destroy(opts); + len = snprintf(buf, sizeof(buf), "%Lf", result); + err = natsMicroserviceRequest_Respond(req, buf, len); + } - // To silence reports of memory still in used with valgrind - nats_Close(); + if (err != NULL) + { + natsMicroserviceRequest_Error(req, &err); + } + natsArgs_Destroy(args); +} - return 0; +static void +handle_function_op(natsMicroserviceRequest *req, functionOP op) +{ + natsMicroserviceError *err = NULL; + natsArgs *args = NULL; + int n; + long double result; + char buf[1024]; + int len; + + err = natsParseAsArgs(&args, natsMicroserviceRequest_GetData(req), natsMicroserviceRequest_GetDataLength(req)); + if ((err == NULL) && (natsArgs_Count(args) != 1)) + { + err = nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "Invalid number of arguments, must be 1"); + } + if (err == NULL) + { + err = natsArgs_GetInt(&n, args, 0); + } + if (err == NULL) + { + err = op(&result, natsMicroserviceRequest_GetConnection(req), n); + } + if (err == NULL) + { + len = snprintf(buf, sizeof(buf), "%Lf", result); + err = natsMicroserviceRequest_Respond(req, buf, len); + } + + if (err != NULL) + { + natsMicroserviceRequest_Error(req, &err); + } + natsArgs_Destroy(args); +} + +static natsMicroserviceError * +call_arithmetics(long double *result, natsConnection *nc, const char *subject, long double a1, long double a2) +{ + natsMicroserviceError *err = NULL; + natsMicroserviceClient *client = NULL; + natsMsg *response = NULL; + natsArgs *args = NULL; + char buf[1024]; + int len; + + err = nats_NewMicroserviceClient(&client, nc, NULL); + if (err == NULL) + { + len = snprintf(buf, sizeof(buf), "%Lf %Lf", a1, a2); + err = natsMicroserviceClient_DoRequest(client, &response, subject, buf, len); + } + if (err == NULL) + { + err = natsParseAsArgs(&args, natsMsg_GetData(response), natsMsg_GetDataLength(response)); + } + if (err == NULL) + { + err = natsArgs_GetFloat(result, args, 0); + } + + natsMicroserviceClient_Destroy(client); + natsMsg_Destroy(response); + return err; +} + +static natsMicroserviceError * +call_function(long double *result, natsConnection *nc, const char *subject, int n) +{ + natsMicroserviceError *err = NULL; + natsMicroserviceClient *client = NULL; + natsMsg *response = NULL; + natsArgs *args = NULL; + char buf[1024]; + int len; + + err = nats_NewMicroserviceClient(&client, nc, NULL); + if (err == NULL) + { + len = snprintf(buf, sizeof(buf), "%d", n); + err = natsMicroserviceClient_DoRequest(client, &response, subject, buf, len); + } + if (err == NULL) + { + err = natsParseAsArgs(&args, natsMsg_GetData(response), natsMsg_GetDataLength(response)); + } + if (err == NULL) + { + err = natsArgs_GetFloat(result, args, 0); + } + + natsMicroserviceClient_Destroy(client); + natsMsg_Destroy(response); + return err; +} + +static void handle_stop(natsMicroservice *m, natsMicroserviceRequest *req) +{ + natsMicroserviceError *err; + + err = natsMicroservice_Stop(m); + if (err == NULL) + { + err = natsMicroserviceRequest_Respond(req, "OK", 2); } - printf("Error: %u - %s\n", s, natsStatus_GetText(s)); - nats_PrintLastErrorStack(stderr); - return 1; + if (err == NULL) + { + pthread_exit((void *)(NATS_OK)); + } + else + { + natsMicroserviceRequest_Error(req, &err); + pthread_exit((void *)(err->status)); + } +} + +static void *run_service(natsConnection *conn, natsMicroserviceConfig *svc, + natsMicroserviceEndpointConfig **endpoints, const char **ep_names, int len_endpoints) +{ + natsMicroserviceError *err = NULL; + natsMicroservice *m = NULL; + char errbuf[1024]; + int i; + + err = nats_AddMicroservice(&m, conn, svc); + for (i = 0; (err == NULL) && (i < len_endpoints); i++) + { + err = natsMicroservice_AddEndpoint(NULL, m, ep_names[i], endpoints[i]); + if (err != NULL) + { + break; + } + } + if (err == NULL) + { + err = natsMicroservice_AddEndpoint(NULL, m, "stop", &stop_cfg); + } + if (err == NULL) + { + err = natsMicroservice_Run(m); + } + + natsMicroservice_Destroy(m); + if (err != NULL) + { + printf("Error: %s\n", err->String(err, errbuf, sizeof(errbuf))); + return (void *)(err->status); + } + return (void *)NATS_OK; +} + +static natsMicroserviceError * +add(long double *result, natsConnection *nc, long double a1, long double a2) +{ + *result = a1 + a2; + return NULL; +} + +static natsMicroserviceError * +divide(long double *result, natsConnection *nc, long double a1, long double a2) +{ + *result = a1 / a2; + return NULL; +} + +static natsMicroserviceError *multiply(long double *result, natsConnection *nc, long double a1, long double a2) +{ + *result = a1 * a2; + return NULL; +} + +static natsMicroserviceError * +factorial(long double *result, natsConnection *nc, int n) +{ + natsMicroserviceError *err = NULL; + int i; + + if (n < 1) + return nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "n must be greater than 0"); + + *result = 1; + for (i = 1; i <= n; i++) + { + err = call_arithmetics(result, nc, "multiply", *result, i); + if (err != NULL) + return err; + } + return NULL; +} + +static natsMicroserviceError * +fibonacci(long double *result, natsConnection *nc, int n) +{ + natsMicroserviceError *err = NULL; + int i; + long double n1, n2; + + if (n < 0) + return nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "n must be greater than 0"); + + if (n < 2) + { + *result = n; + return NULL; + } + + for (i = 1, n1 = 0, n2 = 1; i <= n; i++) + { + err = call_arithmetics(result, nc, "add", n1, n2); + if (err != NULL) + return err; + n1 = n2; + n2 = *result; + } + return NULL; +} + +static natsMicroserviceError *power2(long double *result, natsConnection *nc, int n) +{ + natsMicroserviceError *err = NULL; + int i; + + if (n < 1) + return nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "n must be greater than 0"); + + *result = 1; + for (i = 1; i <= n; i++) + { + err = call_arithmetics(result, nc, "multiply", *result, 2); + if (err != NULL) + return err; + } + return NULL; } diff --git a/src/mem.h b/src/mem.h index a73f43070..6ef6e2e66 100644 --- a/src/mem.h +++ b/src/mem.h @@ -27,6 +27,14 @@ #endif #define NATS_FREE(p) free((p)) +#define NATS_CALLOCS(dupe, count, size) ( \ + *(dupe) = NATS_CALLOC((count), (size)), \ + *(dupe) != NULL ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY)) + +#define NATS_STRDUPS(dupe, str) ( \ + *(dupe) = NATS_STRDUP(str), \ + *(dupe) != NULL ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY)) + // GNU C Library version 2.25 or later. #if defined(__GLIBC__) && \ (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25)) diff --git a/src/micro.c b/src/micro.c index c8039aacc..2259c4aa4 100644 --- a/src/micro.c +++ b/src/micro.c @@ -11,64 +11,57 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO review includes -#include -#include - +// TODO <>/<> review includes +#include "micro.h" #include "microp.h" -#include "mem.h" #include "conn.h" +#include "mem.h" -static natsStatus +static natsMicroserviceError * _createMicroservice(natsMicroservice **new_microservice, natsConnection *nc, natsMicroserviceConfig *cfg); static void -_freeMicroservice(natsMicroservice *m); -static void _retainMicroservice(natsMicroservice *m); -static void +static natsMicroserviceError * _releaseMicroservice(natsMicroservice *m); +static void +_markMicroserviceStopped(natsMicroservice *m, bool stopped); -////////////////////////////////////////////////////////////////////////////// -// microservice management APIs -////////////////////////////////////////////////////////////////////////////// - -natsStatus +natsMicroserviceError * nats_AddMicroservice(natsMicroservice **new_m, natsConnection *nc, natsMicroserviceConfig *cfg) { - natsStatus s; - if ((new_m == NULL) || (nc == NULL) || (cfg == NULL)) - return nats_setDefaultError(NATS_INVALID_ARG); - - s = _createMicroservice(new_m, nc, cfg); - if (s != NATS_OK) - return NATS_UPDATE_ERR_STACK(s); + return natsMicroserviceErrorInvalidArg; - return NATS_OK; + return _createMicroservice(new_m, nc, cfg); } -void natsMicroservice_Destroy(natsMicroservice *m) +natsMicroserviceError * +natsMicroservice_Destroy(natsMicroservice *m) { - _releaseMicroservice(m); + return _releaseMicroservice(m); } -// TODO update from Go -natsStatus +// TODO <>/<> update from Go +natsMicroserviceError * natsMicroservice_Stop(natsMicroservice *m) { - if ((m == NULL) || (m->mu == NULL)) - return nats_setDefaultError(NATS_INVALID_ARG); - - natsMutex_Lock(m->mu); - - if (m->stopped) - goto OK; + natsStatus s = NATS_OK; - m->stopped = true; + if (m == NULL) + return natsMicroserviceErrorInvalidArg; + if (natsMicroservice_IsStopped(m)) + return NULL; -OK: - natsMutex_Unlock(m->mu); - return NATS_OK; + s = natsMicroserviceEndpoint_Stop(m->root); + if (s == NATS_OK) + { + _markMicroserviceStopped(m, true); + return NULL; + } + else + { + return nats_NewMicroserviceError(NATS_UPDATE_ERR_STACK(s), 500, "failed to stop microservice"); + } } bool natsMicroservice_IsStopped(natsMicroservice *m) @@ -84,38 +77,71 @@ bool natsMicroservice_IsStopped(natsMicroservice *m) return stopped; } -// TODO: eliminate sleep -natsStatus +// TODO: <>/<> eliminate sleep +natsMicroserviceError * natsMicroservice_Run(natsMicroservice *m) { if ((m == NULL) || (m->mu == NULL)) - return nats_setDefaultError(NATS_INVALID_ARG); + return natsMicroserviceErrorInvalidArg; for (; !natsMicroservice_IsStopped(m);) { nats_Sleep(50); } - return NATS_OK; + + return NULL; } -natsStatus -natsMicroservice_Respond(natsMicroservice *m, natsMicroserviceRequest *r, const char *data, int len) +NATS_EXTERN natsConnection * +natsMicroservice_GetConnection(natsMicroservice *m) { - natsStatus s = NATS_OK; + if (m == NULL) + return NULL; + return m->nc; +} - if ((m == NULL) || (m->mu == NULL)) - return nats_setDefaultError(NATS_INVALID_ARG); +natsMicroserviceError * +natsMicroservice_AddEndpoint(natsMicroserviceEndpoint **new_ep, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg) +{ + natsStatus s = NATS_OK; + if ((m == NULL) || (m->root == NULL)) + { + return natsMicroserviceErrorInvalidArg; + } + s = natsMicroserviceEndpoint_AddEndpoint(new_ep, m->root, name, cfg); - s = natsConnection_Publish(m->nc, natsMsg_GetReply(r->msg), data, len); - return NATS_UPDATE_ERR_STACK(s); + if (s == NATS_OK) + return NULL; + else + return nats_NewMicroserviceError(NATS_UPDATE_ERR_STACK(s), 500, "failed to add endpoint"); } -// Implementation functions. +static natsMicroserviceError * +_destroyMicroservice(natsMicroservice *m) +{ + natsStatus s = NATS_OK; -static natsStatus + if ((m == NULL) || (m->root == NULL)) + return NULL; + + s = natsMicroserviceEndpoint_Destroy(m->root); + if (s != NATS_OK) + { + return nats_NewMicroserviceError(NATS_UPDATE_ERR_STACK(s), 500, "failed to destroy microservice"); + } + + natsConn_release(m->nc); + natsMutex_Destroy(m->mu); + NATS_FREE(m->identity.id); + NATS_FREE(m); + return NULL; +} + +static natsMicroserviceError * _createMicroservice(natsMicroservice **new_microservice, natsConnection *nc, natsMicroserviceConfig *cfg) { natsStatus s = NATS_OK; + natsMicroserviceError *err = NULL; natsMicroservice *m = NULL; natsMutex *mu = NULL; char tmpNUID[NUID_BUFFER_LEN + 1]; @@ -123,9 +149,12 @@ _createMicroservice(natsMicroservice **new_microservice, natsConnection *nc, nat // Make a microservice object, with a reference to a natsConnection. m = (natsMicroservice *)NATS_CALLOC(1, sizeof(natsMicroservice)); if (m == NULL) - return nats_setDefaultError(NATS_NO_MEMORY); + return natsMicroserviceErrorOutOfMemory; + + // TODO: <>/<> separate PR, make a natsConn_retain return a natsConnection* natsConn_retain(nc); m->nc = nc; + m->refs = 1; // Need a mutex for concurrent operations. s = natsMutex_Create(&mu); @@ -136,56 +165,46 @@ _createMicroservice(natsMicroservice **new_microservice, natsConnection *nc, nat // Generate a unique ID for this microservice. IFOK(s, natsNUID_Next(tmpNUID, NUID_BUFFER_LEN + 1)); - IF_OK_DUP_STRING(s, m->identity.id, tmpNUID); + IF_OK_DUP_STRING(s, m->identity.id, tmpNUID); // Copy the config data. if (s == NATS_OK) { m->identity.name = cfg->name; m->identity.version = cfg->version; - m->cfg = *cfg; + m->cfg = cfg; + } + + // Setup the root endpoint. + if (s == NATS_OK) + { + s = _newMicroserviceEndpoint(&m->root, m, "", NULL); } // Set up the default endpoint. if (s == NATS_OK && cfg->endpoint != NULL) { - s = natsMicroservice_AddEndpoint(m, natsMicroserviceDefaultEndpointName, cfg->endpoint); + err = natsMicroservice_AddEndpoint(NULL, m, natsMicroserviceDefaultEndpointName, cfg->endpoint); + if (err != NULL) + { + // status is always set in AddEndpoint errors. + s = err->status; + } } // Set up monitoring (PING, STATS, etc.) responders. IFOK(s, _initMicroserviceMonitoring(m)); if (s == NATS_OK) - *new_microservice = m; - else { - _freeMicroservice(m); + *new_microservice = m; + return NULL; } - return NATS_UPDATE_ERR_STACK(s); -} - -static void -_freeMicroservice(natsMicroservice *m) -{ - int i; - natsStatus s; - - if (m == NULL) - return; - - for (i = 0; i < m->num_endpoints; i++) + else { - s = _stopMicroserviceEndpoint(m->endpoints[i]); - - if(s == NATS_OK) - _freeMicroserviceEndpoint(m->endpoints[i]); + _destroyMicroservice(m); + return nats_NewMicroserviceError(NATS_UPDATE_ERR_STACK(s), 500, "failed to create microservice"); } - NATS_FREE(m->endpoints); - - natsConn_release(m->nc); - natsMutex_Destroy(m->mu); - NATS_FREE(m->identity.id); - NATS_FREE(m); } static void @@ -196,19 +215,28 @@ _retainMicroservice(natsMicroservice *m) natsMutex_Unlock(m->mu); } -static void +static natsMicroserviceError * _releaseMicroservice(natsMicroservice *m) { bool doFree; if (m == NULL) - return; + return NULL; natsMutex_Lock(m->mu); doFree = (--(m->refs) == 0); natsMutex_Unlock(m->mu); - if (doFree) - _freeMicroservice(m); + if (!doFree) + return NULL; + + return _destroyMicroservice(m); } +static void +_markMicroserviceStopped(natsMicroservice *m, bool stopped) +{ + natsMutex_Lock(m->mu); + m->stopped = stopped; + natsMutex_Unlock(m->mu); +} diff --git a/src/micro.h b/src/micro.h index 4807325f6..26d715aff 100644 --- a/src/micro.h +++ b/src/micro.h @@ -14,8 +14,14 @@ #ifndef MICRO_H_ #define MICRO_H_ +#include "nats.h" + #define natsMicroserviceAPIPrefix "$SRV" +#define NATS_MICROSERVICE_STATUS_HDR "Nats-Status" +#define NATS_MICROSERVICE_ERROR_HDR "Nats-Service-Error" +#define NATS_MICROSERVICE_ERROR_CODE_HDR "Nats-Service-Error-Code" + enum natsMicroserviceVerb { natsMicroserviceVerbPing = 0, @@ -27,32 +33,271 @@ enum natsMicroserviceVerb typedef enum natsMicroserviceVerb natsMicroserviceVerb; -static natsStatus -natsMicroserviceVerb_String(const char **new_subject, natsMicroserviceVerb verb) +/** + * The Microservice error. Note that code and description do not have a + * well-defined lifespan and should be copied if needed to be used later. + */ +typedef struct natsMicroserviceError +{ + natsStatus status; + int code; + const char *description; + + const char *(*String)(struct natsMicroserviceError *err, char *buf, int size); +} natsMicroserviceError; + +/** + * The Microservice request object. + */ +typedef struct __microserviceRequest natsMicroserviceRequest; + +/** + * The Microservice object. Create and start with #nats_AddMicroservice. + */ +typedef struct __microservice natsMicroservice; + +/** + * The Microservice configuration object. + */ +typedef struct natsMicroserviceConfig +{ + const char *name; + const char *version; + const char *description; + struct natsMicroserviceEndpointConfig *endpoint; +} natsMicroserviceConfig; + +/** + * The Microservice endpoint object. + * TODO document the interface. + */ +typedef struct __microserviceEndpoint natsMicroserviceEndpoint; + +/** \brief Callback used to deliver messages to a microservice. + * + * This is the callback that one provides when creating a microservice endpoint. + * The library will invoke this callback for each message arriving to the + * specified subject. + * + * @see natsMicroservice_AddEndpoint() + */ +typedef void (*natsMicroserviceRequestHandler)(natsMicroservice *m, natsMicroserviceRequest *req); + +/** + * The Microservice endpoint configuration object. + */ +typedef struct natsMicroserviceEndpointConfig +{ + const char *subject; + natsMicroserviceRequestHandler handler; + void *closure; + struct natsMicroserviceSchema *schema; +} natsMicroserviceEndpointConfig; + +/** + * The Microservice endpoint stats struct. + */ +typedef struct natsMicroserviceEndpointStats { - if (new_subject == NULL) - return nats_setDefaultError(NATS_INVALID_ARG); - - switch (verb) - { - case natsMicroserviceVerbPing: - *new_subject = "PING"; - return NATS_OK; - - case natsMicroserviceVerbStats: - *new_subject = "STATS"; - return NATS_OK; - - case natsMicroserviceVerbInfo: - *new_subject = "INFO"; - return NATS_OK; - - case natsMicroserviceVerbSchema: - *new_subject = "SCHEMA"; - return NATS_OK; - default: - return nats_setError(NATS_INVALID_ARG, "Invalid microservice verb %d", verb); - } -} + const char *name; + const char *subject; + int num_requests; + int num_errors; + char *last_error; + int processing_time_ms; + int average_processing_time_ms; + bool stopped; +} natsMicroserviceEndpointStats; + +/** + * The Microservice endpoint schema object. + */ +typedef struct natsMicroserviceSchema +{ + const char *request; + const char *response; +} natsMicroserviceSchema; + +/** + * Request unmarshaled as "arguments", a space-separated list of numbers and strings. + * TODO document the interface. + */ +typedef struct __args_t natsArgs; + +/** + * The Microservice client. Initialize with #nats_NewMicroserviceClient. + */ +typedef struct __microserviceClient natsMicroserviceClient; + +/** + * The Microservice configuration object. Initialize with #natsMicroserviceConfig_Init. + */ +typedef struct natsMicroserviceClientConfig +{ + // TBD in the future. + int dummy; +} natsMicroserviceClientConfig; + +/** \defgroup microserviceGroup Microservice support + * + * A simple NATS-based microservice implementation framework. + * + * \warning EXPERIMENTAL FEATURE! We reserve the right to change the API without + * necessarily bumping the major version of the library. + * + * @{ + */ + +/** \brief Makes a new error. + * + * @param new_microservice the location where to store the newly created + * #natsMicroservice object. + * @param nc the pointer to the #natsCOnnection object on which the service will listen on. + * @param cfg the pointer to the #natsMicroserviceConfig configuration + * information used to create the #natsMicroservice object. + */ +NATS_EXTERN natsMicroserviceError * +nats_NewMicroserviceError(natsStatus s, int code, const char *desc); + +NATS_EXTERN void +natsMicroserviceError_Destroy(natsMicroserviceError *err); + +/** \brief Adds a microservice with a given configuration. + * + * Adds a microservice with a given configuration. + * + * \note The return #natsMicroservice object needs to be destroyed using + * #natsMicroservice_Destroy when no longer needed to free allocated memory. + * + * @param new_microservice the location where to store the newly created + * #natsMicroservice object. + * @param nc the pointer to the #natsCOnnection object on which the service will listen on. + * @param cfg the pointer to the #natsMicroserviceConfig configuration + * information used to create the #natsMicroservice object. + */ +NATS_EXTERN natsMicroserviceError * +nats_AddMicroservice(natsMicroservice **new_microservice, natsConnection *nc, natsMicroserviceConfig *cfg); + +/** \brief Waits for the microservice to stop. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroservice_Run(natsMicroservice *m); + +/** \brief Stops a running microservice. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroservice_Stop(natsMicroservice *m); + +/** \brief Checks if a microservice is stopped. + */ +NATS_EXTERN bool +natsMicroservice_IsStopped(natsMicroservice *m); + +/** \brief Destroys a microservice object. + * + * Destroys a microservice object; frees all memory. The service must be stopped + * first, this function does not check if it is. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroservice_Destroy(natsMicroservice *m); + +/** \brief Adds (and starts) a microservice endpoint. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroservice_AddEndpoint(natsMicroserviceEndpoint **new_endpoint, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg); + +/** \brief Returns the microservice's NATS connection. + */ +NATS_EXTERN natsConnection * +natsMicroservice_GetConnection(natsMicroservice *m); + +/** \brief Responds to a microservice request. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroserviceRequest_Respond(natsMicroserviceRequest *req, const char *data, int len); + +/** \brief Responds to a microservice request with an error, does NOT free the + * error. + */ +NATS_EXTERN natsMicroserviceError * +natsMicroserviceRequest_RespondError(natsMicroserviceRequest *req, natsMicroserviceError *err, const char *data, int len); + +/** \brief A convenience method to respond to a microservice request with a + * simple error (no data), and free the error. + */ +NATS_EXTERN void +natsMicroserviceRequest_Error(natsMicroserviceRequest *req, natsMicroserviceError **err); + +/** \brief Returns the original NATS message underlying the request. + */ +NATS_EXTERN natsMsg * +natsMicroserviceRequest_GetMsg(natsMicroserviceRequest *req); + +#define natsMicroserviceRequestHeader_Set(req, key, value) \ + natsMsgHeader_Set(natsMicroserviceRequest_GetMsg(req), (key), (value)) +#define natsMicroserviceRequestHeader_Add(req, key, value) \ + natsMsgHeader_Add(natsMicroserviceRequest_GetMsg(req), (key), (value)) +#define natsMicroserviceRequestHeader_Get(req, key, value) \ + natsMsgHeader_Get(natsMicroserviceRequest_GetMsg(req), (key), (value)) +#define natsMicroserviceRequestHeader_Values(req, key, values, count) \ + natsMsgHeader_Values(natsMicroserviceRequest_GetMsg(req), (key), (values), (count)) +#define natsMicroserviceRequestHeader_Keys(req, key, keys, count) \ + natsMsgHeader_Keys(natsMicroserviceRequest_GetMsg(req), (key), (keys), (count)) +#define natsMicroserviceRequestHeader_Delete(req, key) \ + natsMsgHeader_Delete(natsMicroserviceRequest_GetMsg(req), (key)) + +#define natsMicroserviceRequest_GetSubject(req) \ + natsMsg_GetSubject(natsMicroserviceRequest_GetMsg(req)) +#define natsMicroserviceRequest_GetReply(req) \ + natsMsg_GetReply(natsMicroserviceRequest_GetMsg(req)) +#define natsMicroserviceRequest_GetData(req) \ + natsMsg_GetData(natsMicroserviceRequest_GetMsg(req)) +#define natsMicroserviceRequest_GetDataLength(req) \ + natsMsg_GetDataLength(natsMicroserviceRequest_GetMsg(req)) +#define natsMicroserviceRequest_GetSequence(req) \ + natsMsg_GetSequence(natsMicroserviceRequest_GetMsg(req)) +#define natsMicroserviceRequest_GetTime(req) \ + natsMsg_GetTime(natsMicroserviceRequest_GetMsg(req)) + +NATS_EXTERN natsMicroserviceEndpoint * +natsMicroserviceRequest_GetEndpoint(natsMicroserviceRequest *req); + +NATS_EXTERN natsMicroservice * +natsMicroserviceRequest_GetMicroservice(natsMicroserviceRequest *req); + +NATS_EXTERN natsConnection * +natsMicroserviceRequest_GetConnection(natsMicroserviceRequest *req); + +NATS_EXTERN natsMicroserviceError * +nats_MicroserviceErrorFromMsg(natsStatus s, natsMsg *msg); + +NATS_EXTERN natsMicroserviceError * +natsParseAsArgs(natsArgs **args, const char *data, int data_len); + +NATS_EXTERN int +natsArgs_Count(natsArgs *args); + +NATS_EXTERN natsMicroserviceError * +natsArgs_GetInt(int *val, natsArgs *args, int index); + +NATS_EXTERN natsMicroserviceError * +natsArgs_GetFloat(long double *val, natsArgs *args, int index); + +NATS_EXTERN natsMicroserviceError * +natsArgs_GetString(const char **val, natsArgs *args, int index); + +NATS_EXTERN void +natsArgs_Destroy(natsArgs *args); + +natsMicroserviceError * +nats_NewMicroserviceClient(natsMicroserviceClient **new_client, natsConnection *nc, natsMicroserviceClientConfig *cfg); + +void +natsMicroserviceClient_Destroy(natsMicroserviceClient *client); + +natsMicroserviceError * +natsMicroserviceClient_DoRequest(natsMicroserviceClient *client, natsMsg **reply, const char *subject, const char *data, int data_len); + +/** @} */ // end of microserviceGroup #endif /* MICRO_H_ */ diff --git a/src/micro_args_parser.c b/src/micro_args_parser.c new file mode 100644 index 000000000..f3743087e --- /dev/null +++ b/src/micro_args_parser.c @@ -0,0 +1,369 @@ +// Copyright 2021-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "micro.h" +#include "microp.h" +#include "mem.h" + +static natsMicroserviceError *parse(void **args, int *args_len, int *i, const char *data, int data_len); + +natsMicroserviceError * +natsParseAsArgs(natsArgs **new_args, const char *data, int data_len) +{ + natsMicroserviceError *err = NULL; + int n; + int i = 0; + natsArgs *args = NULL; + + if ((new_args == NULL) || (data == NULL) || (data_len < 0)) + return nats_NewMicroserviceError(NATS_INVALID_ARG, 500, "Invalid function argument"); + + // parse the number of arguments without allocating. + err = parse(NULL, &n, &i, data, data_len); + if (err == NULL) + { + args = NATS_CALLOC(1, sizeof(natsArgs)); + if (args == NULL) + err = natsMicroserviceErrorOutOfMemory; + } + if (err == NULL) + { + args->args = NATS_CALLOC(n, sizeof(void *)); + if (args == NULL) + err = natsMicroserviceErrorOutOfMemory; + else + args->count = n; + } + if (err == NULL) + { + i = 0; + err = parse(args->args, &n, &i, data, data_len); + } + + if (err == NULL) + { + *new_args = args; + } + else + { + natsArgs_Destroy(args); + } + + return err; +} + +void natsArgs_Destroy(natsArgs *args) +{ + int i; + + if (args == NULL) + return; + + for (i = 0; i < args->count; i++) + { + NATS_FREE(args->args[i]); + } + NATS_FREE(args->args); + NATS_FREE(args); +} + +int natsArgs_Count(natsArgs *args) +{ + if (args == NULL) + return 0; + + return args->count; +} + +natsMicroserviceError * +natsArgs_GetInt(int *val, natsArgs *args, int index) +{ + if ((args == NULL) || (index < 0) || (index >= args->count) || (val == NULL)) + return natsMicroserviceErrorInvalidArg; + + *val = *((int *)args->args[index]); + return NULL; +} + +natsMicroserviceError * +natsArgs_GetFloat(long double *val, natsArgs *args, int index) +{ + if ((args == NULL) || (index < 0) || (index >= args->count) || (val == NULL)) + return natsMicroserviceErrorInvalidArg; + + *val = *((long double *)args->args[index]); + return NULL; +} + +natsMicroserviceError * +natsArgs_GetString(const char **val, natsArgs *args, int index) +{ + if ((args == NULL) || (index < 0) || (index >= args->count) || (val == NULL)) + return natsMicroserviceErrorInvalidArg; + + *val = (const char *)args->args[index]; + return NULL; +} + +/// @brief decodes the rest of a string into a pre-allocated buffer of +/// sufficient length, or just calculates the needed buffer size. The opening +/// quote must have been already processed by the caller (parse). +/// +/// @param dup receives the parsed string, must be freed by the caller. pass NULL to just get the length. +/// @param data raw message data +/// @param data_len length of data +/// @return error in case the string is not properly terminated. +static natsMicroserviceError * +decode_rest_of_string(char *dup, int *decoded_len, int *i, const char *data, int data_len) +{ + char c; + int len = 0; + bool terminated = false; + bool escape = false; + + for (; !terminated && *i < data_len; (*i)++) + { + c = data[*i]; + switch (c) + { + case '"': + if (escape) + { + // include the escaped quote. + if (dup != NULL) + { + dup[len] = c; + } + len++; + escape = false; + } + else + { + // end of quoted string. + terminated = true; + } + break; + + case '\\': + if (!escape) + { + escape = true; + } + else + { + // include the escaped backslash. + if (dup != NULL) + { + dup[len] = c; + } + len++; + escape = false; + } + break; + + default: + if (dup != NULL) + { + dup[len] = c; + } + len++; + escape = false; + break; + } + } + if (!terminated) + { + nats_NewMicroserviceError(NATS_INVALID_ARG, 400, "a quoted string is not properly terminated"); + } + + *decoded_len = len; + return NULL; +} + +/// @brief decodes and duplicates the rest of a string, as the name says. +/// @param dup receives the parsed string, must be freed by the caller. pass +/// NULL to just get the length. +/// @param data raw message data +/// @param data_len length of data +/// @return error. +static natsMicroserviceError * +decode_and_dupe_rest_of_string(char **dup, int *i, const char *data, int data_len) +{ + natsMicroserviceError *err = NULL; + int start = *i; + int decoded_len; + + err = decode_rest_of_string(NULL, &decoded_len, i, data, data_len); + if (err != NULL) + { + return err; + } + if (dup == NULL) + { + // nothing else to do - the string has been scanned and validated. + return NULL; + } + + *i = start; + + *dup = NATS_CALLOC(decoded_len + 1, sizeof(char)); + if (*dup == NULL) + { + return natsMicroserviceErrorOutOfMemory; + } + + // no need to check for error the 2nd time, we already know the string is + // valid. + decode_rest_of_string(*dup, &decoded_len, i, data, data_len); + (*dup)[decoded_len] = 0; + return NULL; +} + +typedef enum parserState +{ + NewArg = 0, + NumberArg, +} parserState; + +static natsMicroserviceError * +parse(void **args, int *args_len, int *i, const char *data, int data_len) +{ + natsMicroserviceError *err = NULL; + char c; + int n = 0; + parserState state = NewArg; + char errbuf[1024]; + char numbuf[64]; + int num_len = 0; + bool is_float = false; + +#define EOS 0 + for (; *i < data_len + 1;) + { + c = (*i < data_len) ? data[*i] : EOS; + + switch (state) + { + case NewArg: + switch (c) + { + case EOS: + case ' ': + (*i)++; + break; + + case '"': + (*i)++; // consume the opening quote. + err = decode_and_dupe_rest_of_string((char **)(&args[n]), i, data, data_len); + if (err != NULL) + { + return err; + } + n++; + break; + + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + case '-': + case '+': + case '.': + state = NumberArg; + num_len = 0; + numbuf[num_len++] = c; + is_float = (c == '.'); + (*i)++; + break; + + default: + snprintf(errbuf, sizeof(errbuf), "unexpected '%c', an argument must be a number or a quoted string", c); + return nats_NewMicroserviceError(NATS_ERR, 400, errbuf); + } + break; + + case NumberArg: + switch (c) + { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + case '-': + case '+': + case '.': + case 'e': + case 'E': + case ',': + numbuf[num_len] = c; + num_len++; + is_float = is_float || (c == '.') || (c == 'e') || (c == 'E'); + (*i)++; + break; + + case EOS: + case ' ': + if (args != NULL) + { + numbuf[num_len] = 0; + if (is_float) + { + args[n] = NATS_CALLOC(1, sizeof(long double)); + if (args[n] == NULL) + { + return natsMicroserviceErrorOutOfMemory; + } + *(long double *)args[n] = strtold(numbuf, NULL); + } + else + { + args[n] = NATS_CALLOC(1, sizeof(int)); + if (args[n] == NULL) + { + return natsMicroserviceErrorOutOfMemory; + } + *(int *)args[n] = atoi(numbuf); + } + } + n++; + (*i)++; + state = NewArg; + break; + + default: + snprintf(errbuf, sizeof(errbuf), "unexpected '%c', a number must be followed by a space", c); + return nats_NewMicroserviceError(NATS_ERR, 400, errbuf); + } + break; + + default: + snprintf(errbuf, sizeof(errbuf), "unreachable: wrong state for a ' ', expected NewArg or NumberArg, got %d", state); + return nats_NewMicroserviceError(NATS_ERR, 500, errbuf); + } + } + + *args_len = n; + return NULL; +} diff --git a/src/micro_client.c b/src/micro_client.c new file mode 100644 index 000000000..61cd2170c --- /dev/null +++ b/src/micro_client.c @@ -0,0 +1,67 @@ +// Copyright 2015-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "micro.h" +#include "microp.h" +#include "mem.h" +#include "conn.h" + +natsMicroserviceError * +nats_NewMicroserviceClient(natsMicroserviceClient **new_client, natsConnection *nc, natsMicroserviceClientConfig *cfg) +{ + natsMicroserviceClient *client = NULL; + + if (new_client == NULL) + return natsMicroserviceErrorInvalidArg; + + client = NATS_CALLOC(1, sizeof(struct __microserviceClient)); + if (client == NULL) + return natsMicroserviceErrorOutOfMemory; + + natsConn_retain(nc); + client->nc = nc; + *new_client = client; + return NULL; +} + +void +natsMicroserviceClient_Destroy(natsMicroserviceClient *client) +{ + if (client == NULL) + return; + + natsConn_release(client->nc); + NATS_FREE(client); +} + +natsMicroserviceError * +natsMicroserviceClient_DoRequest(natsMicroserviceClient *client, natsMsg **reply, const char *subject, const char *data, int data_len) +{ + natsStatus s = NATS_OK; + natsMicroserviceError *err = NULL; + natsMsg *msg = NULL; + + if ((client == NULL ) || (reply == NULL)) + return natsMicroserviceErrorInvalidArg; + + s = natsConnection_Request(&msg, client->nc, subject, data, data_len, 5000); + if (s != NATS_OK) + return nats_NewMicroserviceError(s, 500, "request failed"); + + err = nats_MicroserviceErrorFromMsg(s, msg); + if (err == NULL) + { + *reply = msg; + } + return err; +} diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index 6c0f9e720..6d0234e6a 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -11,88 +11,132 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO review includes #include -#include +#include "micro.h" #include "microp.h" #include "mem.h" -#include "conn.h" + +static natsStatus +_newStats(natsMicroserviceEndpointStats **new_stats, const char *name, const char *subject); +static natsStatus +_setLastError(natsMicroserviceEndpointStats *stats, const char *error); +static void +_freeStats(natsMicroserviceEndpointStats *stats); static bool _isValidName(const char *name); static bool _isValidSubject(const char *subject); - -static natsStatus -_newEndpoint(natsMicroserviceEndpoint **__new_endpoint, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg); static natsStatus -_startEndpoint(natsMicroserviceEndpoint *endpoint); - +_lazyInitChildren(natsMicroserviceEndpoint *ep); static natsStatus -_newRequest(natsMicroserviceRequest **new_request, natsMsg *msg); -static void -_freeRequest(natsMicroserviceRequest *req); +_findAndRemovePreviousChild(int *found_index, natsMicroserviceEndpoint *ep, const char *name); static void _handleEndpointRequest(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); natsStatus -natsMicroservice_AddEndpoint(natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg) +natsMicroserviceEndpoint_AddEndpoint(natsMicroserviceEndpoint **new_ep, natsMicroserviceEndpoint *parent, const char *name, natsMicroserviceEndpointConfig *cfg) { natsStatus s = NATS_OK; - int i; - int add = -1; - natsMicroserviceEndpoint **new_endpoints = m->endpoints; - int new_num_endpoints = m->num_endpoints; - natsMicroserviceEndpoint *new_endpoint = NULL; - - // TODO return more comprehensive errors - if ((m == NULL) || (m->mu == NULL) || (cfg == NULL) || (cfg->subject == NULL) || - (!_isValidName(name)) || (!_isValidSubject(cfg->subject))) - { - return nats_setDefaultError(NATS_INVALID_ARG); - } - - // This is a rare call, usually happens at the initialization of the - // microservice, so it's ok to lock for the duration of the function, may - // not be necessary at all but will not hurt. - natsMutex_Lock(m->mu); + int index = -1; + natsMicroserviceEndpoint *ep = NULL; - for (i = 0; i < m->num_endpoints; i++) + // TODO <>/<> return more comprehensive errors + if (parent == NULL) { - if (strcmp(m->endpoints[i]->config.subject, cfg->subject) == 0) - { - add = i; - break; - } + return nats_setDefaultError(NATS_INVALID_ARG); } - if (add == -1) + if (cfg != NULL) { - new_endpoints = (natsMicroserviceEndpoint **) - NATS_REALLOC(m->endpoints, sizeof(natsMicroserviceEndpoint *) * (m->num_endpoints + 1)); - s = (new_endpoints != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); - if (s == NATS_OK) + if ((cfg->subject == NULL) || (!_isValidName(name)) || (!_isValidSubject(cfg->subject))) { - add = m->num_endpoints; - new_num_endpoints = m->num_endpoints + 1; + return nats_setDefaultError(NATS_INVALID_ARG); } } - IFOK(s, _newEndpoint(&new_endpoint, m, name, cfg)); - IFOK(s, _startEndpoint(new_endpoint)); + IFOK(s, _lazyInitChildren(parent)); + IFOK(s, _newMicroserviceEndpoint(&ep, parent->m, name, cfg)); + IFOK(s, _findAndRemovePreviousChild(&index, parent, name)); + IFOK(s, _microserviceEndpointList_Put(parent->children, index, ep)); + IFOK(s, natsMicroserviceEndpoint_Start(ep)); if (s == NATS_OK) { - new_endpoints[add] = new_endpoint; - m->endpoints = new_endpoints; - m->num_endpoints = new_num_endpoints; + if (new_ep != NULL) + *new_ep = ep; } else { - _freeMicroserviceEndpoint(new_endpoint); + natsMicroserviceEndpoint_Destroy(ep); + } + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +natsMicroserviceEndpoint_Start(natsMicroserviceEndpoint *ep) +{ + if ((ep->subject == NULL) || (ep->config == NULL) || (ep->config->handler == NULL)) + // nothing to do + return NATS_OK; + + return natsConnection_QueueSubscribe(&ep->sub, ep->m->nc, ep->subject, + natsMicroserviceQueueGroup, _handleEndpointRequest, ep); +} + +// TODO <>/<> COPY FROM GO +natsStatus +natsMicroserviceEndpoint_Stop(natsMicroserviceEndpoint *ep) +{ + natsStatus s = NATS_OK; + + // TODO <>/<> review locking for modifying endpoints, may not be necessary + // or ep may need its own lock (stats). + + // This is a rare call, usually happens at the initialization of the + // microservice, so it's ok to lock for the duration of the function, may + // not be necessary at all but will not hurt. + natsMutex_Lock(ep->m->mu); + + if (ep->sub != NULL) + { + s = natsSubscription_Drain(ep->sub); } + if (s == NATS_OK) + { + ep->sub = NULL; + ep->stopped = true; + // TODO <>/<> unsafe + ep->stats->stopped = true; + } + + natsMutex_Unlock(ep->m->mu); + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_freeMicroserviceEndpoint(natsMicroserviceEndpoint *ep) +{ + // ignore ep->children, must be taken care of by the caller. + _freeStats(ep->stats); + NATS_FREE(ep->name); + NATS_FREE(ep->subject); + NATS_FREE(ep); + return NATS_OK; +} + +natsStatus +natsMicroserviceEndpoint_Destroy(natsMicroserviceEndpoint *ep) +{ + natsStatus s = NATS_OK; + + if (ep == NULL) + return NATS_OK; + + IFOK(s, _destroyMicroserviceEndpointList(ep->children)); + IFOK(s, natsMicroserviceEndpoint_Stop(ep)); + IFOK(s, _freeMicroserviceEndpoint(ep)); - natsMutex_Unlock(m->mu); return NATS_UPDATE_ERR_STACK(s); } @@ -135,110 +179,68 @@ _isValidSubject(const char *subject) } static natsStatus -_newEndpoint(natsMicroserviceEndpoint **__new_endpoint, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg) +_lazyInitChildren(natsMicroserviceEndpoint *ep) { - natsStatus s = NATS_OK; - natsMicroserviceEndpoint *ep = NULL; - char *dup_name = NULL; - char *dup_subject = NULL; - - ep = (natsMicroserviceEndpoint *)NATS_CALLOC(1, sizeof(natsMicroserviceEndpoint)); - s = (ep != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); - if (s == NATS_OK) + if (ep->children == NULL) { - dup_name = NATS_STRDUP(name); - s = (dup_name != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); - } - if (s == NATS_OK) - { - dup_subject = NATS_STRDUP(cfg->subject); - s = (dup_subject != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); - } - if (s == NATS_OK) - { - ep->m = m; - ep->stats.name = dup_name; - ep->stats.subject = dup_subject; - ep->config = *cfg; - ep->config.subject = dup_subject; - } - if (s == NATS_OK) - { - *__new_endpoint = ep; - return NATS_OK; + return _newMicroserviceEndpointList(&ep->children); } - - NATS_FREE(dup_name); - NATS_FREE(dup_subject); - return NATS_UPDATE_ERR_STACK(s); + return NATS_OK; } static natsStatus -_startEndpoint(natsMicroserviceEndpoint *endpoint) +_findAndRemovePreviousChild(int *found_index, natsMicroserviceEndpoint *ep, const char *name) { - if (endpoint->config.handler == NULL) - // nothing to do - return NATS_OK; - - return natsConnection_QueueSubscribe(&endpoint->sub, endpoint->m->nc, endpoint->config.subject, - natsMicroserviceQueueGroup, _handleEndpointRequest, endpoint); + natsMicroserviceEndpoint *found = _microserviceEndpointList_Find(ep->children, name, found_index); + if (found != NULL) + { + return natsMicroserviceEndpoint_Destroy(found); + } + return NATS_OK; } -// TODO COPY FROM GO natsStatus -_stopMicroserviceEndpoint(natsMicroserviceEndpoint *endpoint) +_newMicroserviceEndpoint(natsMicroserviceEndpoint **new_endpoint, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg) { natsStatus s = NATS_OK; + natsMicroserviceEndpoint *ep = NULL; + // TODO <>/<> do I really need to duplicate name, subject? + char *dup_name = NULL; + char *dup_subject = NULL; - // TODO review locking for modifying endpoints, may not be necessary or - // endpoint may need its own lock (stats). - - // This is a rare call, usually happens at the initialization of the - // microservice, so it's ok to lock for the duration of the function, may - // not be necessary at all but will not hurt. - natsMutex_Lock(endpoint->m->mu); + IFOK(s, NATS_CALLOCS(&ep, 1, sizeof(natsMicroserviceEndpoint))); + IFOK(s, NATS_STRDUPS(&dup_name, name)); + if ((cfg != NULL) && (cfg->subject != NULL)) + IFOK(s, NATS_STRDUPS(&dup_subject, cfg->subject)); + IFOK(s, _newStats(&ep->stats, dup_name, dup_subject)); - if (endpoint->sub != NULL) + if (s == NATS_OK) { - s = natsSubscription_Drain(endpoint->sub); + ep->m = m; + ep->name = dup_name; + ep->subject = dup_subject; + ep->config = cfg; + *new_endpoint = ep; } - if (s == NATS_OK) + else { - endpoint->sub = NULL; - endpoint->stopped = true; - endpoint->stats.stopped = true; + _freeStats(ep->stats); + NATS_FREE(ep); + NATS_FREE(dup_name); + NATS_FREE(dup_subject); } - - natsMutex_Unlock(endpoint->m->mu); return NATS_UPDATE_ERR_STACK(s); } -void _freeMicroserviceEndpoint(natsMicroserviceEndpoint *endpoint) -{ - if (endpoint == NULL) - return; - - // The struct fields are declared as const char * for the external API, but - // we know that the strings were duplicated and need to be freed. - // endpoint->config.name is the same as endpoint->stats.name, no need to - // free it. - NATS_FREE((char *)endpoint->stats.name); - NATS_FREE((char *)endpoint->stats.subject); - - NATS_FREE(endpoint); -} - static void _handleEndpointRequest(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { natsStatus s = NATS_OK; - natsMicroserviceEndpoint *endpoint = (natsMicroserviceEndpoint *)closure; - // natsMicroserviceEndpointStats *stats = &endpoint->stats; - natsMicroserviceEndpointConfig *cfg = &endpoint->config; + natsMicroserviceEndpoint *ep = (natsMicroserviceEndpoint *)closure; + // natsMicroserviceEndpointStats *stats = &ep->stats; + natsMicroserviceEndpointConfig *cfg = ep->config; natsMicroserviceRequest *req = NULL; natsMicroserviceRequestHandler handler = cfg->handler; - void *handlerClosure = cfg->closure; - // const char *errorString = ""; if (handler == NULL) { @@ -248,57 +250,71 @@ _handleEndpointRequest(natsConnection *nc, natsSubscription *sub, natsMsg *msg, return; } - s = _newRequest(&req, msg); + s = _newMicroserviceRequest(&req, msg); if (s == NATS_OK) { - handler(endpoint->m, req, handlerClosure); - // errorString = req->err; - } - else - { - // errorString = natsStatus_GetText(s); + req->ep = ep; + handler(ep->m, req); } // Update stats - // natsMutex_Lock(endpoint->mu); + // natsMutex_Lock(ep->mu); // stats->numRequests++; - // natsMutex_Unlock(endpoint->mu); + // natsMutex_Unlock(ep->mu); - _freeRequest(req); + _freeMicroserviceRequest(req); natsMsg_Destroy(msg); } static natsStatus -_newRequest(natsMicroserviceRequest **new_request, natsMsg *msg) +_newStats(natsMicroserviceEndpointStats **new_stats, const char *name, const char *subject) { natsStatus s = NATS_OK; - natsMicroserviceRequest *req = NULL; + natsMicroserviceEndpointStats *stats = NULL; - req = (natsMicroserviceRequest *)NATS_CALLOC(1, sizeof(natsMicroserviceRequest)); - s = (req != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); + stats = (natsMicroserviceEndpointStats *)NATS_CALLOC(1, sizeof(natsMicroserviceEndpointStats)); + s = (stats != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); if (s == NATS_OK) { - req->msg = msg; - *new_request = req; + stats->name = name; + stats->subject = subject; + *new_stats = stats; return NATS_OK; } - _freeRequest(req); + NATS_FREE(stats); return NATS_UPDATE_ERR_STACK(s); } -static void -_freeRequest(natsMicroserviceRequest *req) +// All locking is to be done by the caller. +static natsStatus +_setLastError(natsMicroserviceEndpointStats *stats, const char *error) { - if (req == NULL) - return; + natsStatus s = NATS_OK; + + if (stats->last_error != NULL) + NATS_FREE(stats->last_error); + + if (nats_IsStringEmpty(error)) + { + stats->last_error = NULL; + return NATS_OK; + } + + stats->last_error = NATS_STRDUP(error); + s = (stats->last_error != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); - NATS_FREE(req->err); - NATS_FREE(req); + return NATS_UPDATE_ERR_STACK(s); } -natsMsg* -natsMicroserviceRequest_GetMsg(natsMicroserviceRequest *req) +static void +_freeStats(natsMicroserviceEndpointStats *stats) { - return req != NULL ? req->msg : NULL; + if (stats == NULL) + return; + + if (stats->last_error != NULL) + NATS_FREE(stats->last_error); + + NATS_FREE(stats); } diff --git a/src/micro_endpoint_list.c b/src/micro_endpoint_list.c new file mode 100644 index 000000000..191e5a721 --- /dev/null +++ b/src/micro_endpoint_list.c @@ -0,0 +1,163 @@ +// Copyright 2021-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "micro.h" +#include "microp.h" +#include "mem.h" + + +natsStatus + _destroyMicroserviceEndpointList(__microserviceEndpointList *list) +{ + natsStatus s = NATS_OK; + if (list == NULL) + return NATS_OK; + + for (int i = 0; i < list->len; i++) + { + s = natsMicroserviceEndpoint_Destroy(list->endpoints[i]); + if (s != NATS_OK) + { + return NATS_UPDATE_ERR_STACK(s); + } + } + + natsMutex_Destroy(list->mu); + NATS_FREE(list->endpoints); + NATS_FREE(list); + return NATS_OK; +} + +natsStatus +_newMicroserviceEndpointList(__microserviceEndpointList **new_list) +{ + natsStatus s = NATS_OK; + __microserviceEndpointList *list = NULL; + + list = NATS_CALLOC(1, sizeof(__microserviceEndpointList)); + if (list == NULL) + { + s = nats_setDefaultError(NATS_NO_MEMORY); + } + IFOK(s, natsMutex_Create(&list->mu)); + + if (s == NATS_OK) + { + *new_list = list; + } + else + { + _destroyMicroserviceEndpointList(list); + } + return NATS_UPDATE_ERR_STACK(s); +} + +natsMicroserviceEndpoint * +_microserviceEndpointList_Find(__microserviceEndpointList *list, const char *name, int *index) +{ + natsMicroserviceEndpoint *ep = NULL; + int i; + + natsMutex_Lock(list->mu); + for (i = 0; i < list->len; i++) + { + if (strcmp(list->endpoints[i]->name, name) == 0) + { + if (index != NULL) + { + *index = i; + } + ep = list->endpoints[i]; + break; + } + } + natsMutex_Unlock(list->mu); + return ep; +} + +natsMicroserviceEndpoint * +_microserviceEndpointList_Get(__microserviceEndpointList *list, const char *name, int *index) +{ + natsMicroserviceEndpoint *ep = NULL; + int i; + + natsMutex_Lock(list->mu); + for (i = 0; i < list->len; i++) + { + if (strcmp(list->endpoints[i]->name, name) == 0) + { + if (index != NULL) + { + *index = i; + } + ep = list->endpoints[i]; + break; + } + } + natsMutex_Unlock(list->mu); + return ep; +} + + +natsStatus +_microserviceEndpointList_Put(__microserviceEndpointList *list, int index, natsMicroserviceEndpoint *ep) +{ + natsStatus s = NATS_OK; + + natsMutex_Lock(list->mu); + + if (index >= list->len) + { + return nats_setDefaultError(NATS_INVALID_ARG); + } + else if (index == -1) + { + list->endpoints = (natsMicroserviceEndpoint **) + NATS_REALLOC(list->endpoints, sizeof(natsMicroserviceEndpoint *) * (list->len + 1)); + if (list->endpoints == NULL) + { + s = nats_setDefaultError(NATS_NO_MEMORY); + } + if (s == NATS_OK) + { + list->endpoints[list->len] = ep; + list->len++; + } + } + else + { + list->endpoints[index] = ep; + } + + natsMutex_Unlock(list->mu); + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +_microserviceEndpointList_Remove(__microserviceEndpointList *list, int index) +{ + natsMutex_Lock(list->mu); + + if (index >= list->len || index < 0) + { + return nats_setDefaultError(NATS_INVALID_ARG); + } + if (index < list->len - 1) + { + memmove(&list->endpoints[index], &list->endpoints[index + 1], (list->len - index - 1) * sizeof(natsMicroserviceEndpoint *)); + } + list->len--; + + natsMutex_Unlock(list->mu); + return NATS_OK; +} diff --git a/src/micro_error.c b/src/micro_error.c new file mode 100644 index 000000000..865c15091 --- /dev/null +++ b/src/micro_error.c @@ -0,0 +1,130 @@ +// Copyright 2015-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "micro.h" +#include "microp.h" +#include "mem.h" + +static natsMicroserviceError _errorOutOfMemory = { + .status = NATS_NO_MEMORY, + .code = 500, + .description = "Out of memory", +}; + +static natsMicroserviceError _errorInvalidArg = { + .status = NATS_INVALID_ARG, + .code = 400, + .description = "Invalid function argument", +}; + +static natsMicroserviceError *knownErrors[] = { + &_errorOutOfMemory, + &_errorInvalidArg, + NULL, +}; + +natsMicroserviceError *natsMicroserviceErrorOutOfMemory = &_errorOutOfMemory; +natsMicroserviceError *natsMicroserviceErrorInvalidArg = &_errorInvalidArg; + +static const char * +_string(natsMicroserviceError *err, char *buf, int size) { + if (err == NULL || buf == NULL) + return ""; + if (err->status == NATS_OK) + snprintf(buf, size, "%d: %s", err->code, err->description); + else + snprintf(buf, size, "%d:%d: %s", err->status, err->code, err->description); + return buf; +} + +natsMicroserviceError * +nats_NewMicroserviceError(natsStatus s, int code, const char *description) +{ + natsMicroserviceError *err = NULL; + + if (s == NATS_OK) + return NULL; + + err = NATS_CALLOC(1, sizeof(natsMicroserviceError)); + if (err == NULL) + return &_errorOutOfMemory; + + err->status = s; + err->code = code; + err->description = NATS_STRDUP(description); // it's ok if NULL + err->String = _string; + return err; +} + +void natsMicroserviceError_Destroy(natsMicroserviceError *err) +{ + int i; + + if (err == NULL) + return; + + for (i = 0; knownErrors[i] != NULL; i++) + { + if (err == knownErrors[i]) + return; + } + + // description is declared const for the users, but is strdup-ed on + // creation. + NATS_FREE((void *)err->description); + NATS_FREE(err); +} + +natsMicroserviceError * +nats_MicroserviceErrorFromMsg(natsStatus status, natsMsg *msg) +{ + natsMicroserviceError *err = NULL; + const char *c = NULL, *d = NULL; + bool is_error; + int code = 0; + + if (msg == NULL) + return NULL; + + natsMsgHeader_Get(msg, NATS_MICROSERVICE_ERROR_CODE_HDR, &c); + natsMsgHeader_Get(msg, NATS_MICROSERVICE_ERROR_HDR, &d); + + is_error = (status != NATS_OK) || !nats_IsStringEmpty(c) || !nats_IsStringEmpty(d); + if (!is_error) + return NULL; + + err = nats_NewMicroserviceError(status, 0, ""); + if (err == NULL) + { + // This is not 100% correct - returning an OOM error that was not in the + // message, but since it is usually a fatal condition, it is ok. + return &_errorOutOfMemory; + } + + if (nats_IsStringEmpty(d) && (status != NATS_OK)) + { + d = natsStatus_GetText(status); + if (d == NULL) + d = ""; + } + + if (!nats_IsStringEmpty(c)) + { + code = atoi(c); + } + err->status = status; + err->code = code; + err->description = d; + + return err; +} diff --git a/src/micro_monitoring.c b/src/micro_monitoring.c index 395f9eae8..fd8d2f50b 100644 --- a/src/micro_monitoring.c +++ b/src/micro_monitoring.c @@ -13,9 +13,9 @@ #include -#include "natsp.h" -#include "mem.h" +#include "micro.h" #include "microp.h" +#include "mem.h" static void _handleMicroservicePing(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); @@ -32,7 +32,37 @@ static natsStatus _newControlSubject(char **newSubject, natsMicroserviceVerb ver static natsStatus _newDottedSubject(char **new_subject, int count, ...); static bool _isEmpty(const char *s); -natsStatus _initMicroserviceMonitoring(natsMicroservice *m) { +static natsStatus +natsMicroserviceVerb_String(const char **new_subject, natsMicroserviceVerb verb) +{ + if (new_subject == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + switch (verb) + { + case natsMicroserviceVerbPing: + *new_subject = "PING"; + return NATS_OK; + + case natsMicroserviceVerbStats: + *new_subject = "STATS"; + return NATS_OK; + + case natsMicroserviceVerbInfo: + *new_subject = "INFO"; + return NATS_OK; + + case natsMicroserviceVerbSchema: + *new_subject = "SCHEMA"; + return NATS_OK; + default: + return nats_setError(NATS_INVALID_ARG, "Invalid microservice verb %d", verb); + } +} + +natsStatus +_initMicroserviceMonitoring(natsMicroservice *m) +{ natsStatus s = NATS_OK; IFOK(s, _addVerbHandlers(m, natsMicroserviceVerbPing, _handleMicroservicePing)); @@ -53,12 +83,9 @@ _handleMicroservicePing(natsConnection *nc, natsSubscription *sub, natsMsg *msg, s = _marshalPing(&buf, m); if (s == NATS_OK) { - s = natsMicroservice_Respond(m, &req, natsBuf_Data(buf), natsBuf_Len(buf)); - } - if (buf != NULL) - { - natsBuf_Destroy(buf); + natsMicroserviceRequest_Respond(&req, natsBuf_Data(buf), natsBuf_Len(buf)); } + natsBuf_Destroy(buf); } static bool diff --git a/src/micro_request.c b/src/micro_request.c new file mode 100644 index 000000000..c3b38fdd6 --- /dev/null +++ b/src/micro_request.c @@ -0,0 +1,131 @@ +// Copyright 2015-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "micro.h" +#include "microp.h" +#include "mem.h" + +natsMsg * +natsMicroserviceRequest_GetMsg(natsMicroserviceRequest *req) +{ + return req != NULL ? req->msg : NULL; +} + +natsMicroserviceEndpoint * +natsMicroserviceRequest_GetEndpoint(natsMicroserviceRequest *req) +{ + return (req != NULL) ? req->ep : NULL; +} + +natsMicroservice * +natsMicroserviceRequest_GetMicroservice(natsMicroserviceRequest *req) +{ + return ((req != NULL) && (req->ep != NULL)) ? req->ep->m : NULL; +} + +natsConnection * +natsMicroserviceRequest_GetConnection(natsMicroserviceRequest *req) +{ + return ((req != NULL) && (req->ep != NULL) && (req->ep->m != NULL)) ? req->ep->m->nc : NULL; +} + +natsMicroserviceError * +natsMicroserviceRequest_Respond(natsMicroserviceRequest *req, const char *data, int len) +{ + natsStatus s = NATS_OK; + + if ((req == NULL) || (req->msg == NULL) || (req->msg->sub == NULL) || (req->msg->sub->conn == NULL)) + return natsMicroserviceErrorInvalidArg; + + s = natsConnection_Publish(req->msg->sub->conn, natsMsg_GetReply(req->msg), data, len); + if (s == NATS_OK) + return NULL; + else + return nats_NewMicroserviceError(s, 500, "failed to respond to message"); +} + +natsMicroserviceError * +natsMicroserviceRequest_RespondError(natsMicroserviceRequest *req, natsMicroserviceError *err, const char *data, int len) +{ + natsMsg *msg = NULL; + natsStatus s = NATS_OK; + char buf[64]; + + if ((req == NULL) || (req->msg == NULL) || (req->msg->sub == NULL) || (req->msg->sub->conn == NULL) || (err == NULL)) + { + return natsMicroserviceErrorInvalidArg; + } + + IFOK(s, natsMsg_Create(&msg, natsMsg_GetReply(req->msg), NULL, data, len)); + + if ((s == NATS_OK) && (err->status != NATS_OK)) + { + s = natsMsgHeader_Set(msg, NATS_MICROSERVICE_STATUS_HDR, natsStatus_GetText(err->status)); + } + if (s == NATS_OK) + { + s = natsMsgHeader_Set(msg, NATS_MICROSERVICE_ERROR_HDR, err->description); + } + if (s == NATS_OK) + { + snprintf(buf, sizeof(buf), "%d", err->code); + s = natsMsgHeader_Set(msg, NATS_MICROSERVICE_ERROR_CODE_HDR, buf); + } + IFOK(s, natsConnection_PublishMsg(req->msg->sub->conn, msg)); + + natsMsg_Destroy(msg); + + if (s == NATS_OK) + return NULL; + else + return nats_NewMicroserviceError(s, 500, "failed to respond to message"); +} + +void +natsMicroserviceRequest_Error(natsMicroserviceRequest *req, natsMicroserviceError **err) +{ + if (err == NULL) + return; + natsMicroserviceRequest_RespondError(req, *err, NULL, 0); + natsMicroserviceError_Destroy(*err); + *err = NULL; +} + + +void _freeMicroserviceRequest(natsMicroserviceRequest *req) +{ + if (req == NULL) + return; + + NATS_FREE(req->err); + NATS_FREE(req); +} + +natsStatus +_newMicroserviceRequest(natsMicroserviceRequest **new_request, natsMsg *msg) +{ + natsStatus s = NATS_OK; + natsMicroserviceRequest *req = NULL; + + req = (natsMicroserviceRequest *)NATS_CALLOC(1, sizeof(natsMicroserviceRequest)); + s = (req != NULL) ? NATS_OK : nats_setDefaultError(NATS_NO_MEMORY); + if (s == NATS_OK) + { + req->msg = msg; + *new_request = req; + return NATS_OK; + } + + _freeMicroserviceRequest(req); + return NATS_UPDATE_ERR_STACK(s); +} diff --git a/src/microp.h b/src/microp.h index b6a02868f..80987bd48 100644 --- a/src/microp.h +++ b/src/microp.h @@ -15,7 +15,6 @@ #define MICROP_H_ #include "natsp.h" -#include "micro.h" #define natsMicroserviceInfoResponseType "io.nats.micro.v1.info_response" #define natsMicroservicePingResponseType "io.nats.micro.v1.ping_response" @@ -26,13 +25,126 @@ #define natsMicroserviceDefaultEndpointName "default" -extern natsStatus +struct __microserviceClient +{ + natsConnection *nc; +}; + +struct __microserviceConfig +{ + const char *name; + const char *version; + const char *description; +}; + +struct __microserviceIdentity +{ + const char *name; + const char *version; + char *id; +}; + +typedef struct __microserviceEndpointList +{ + natsMutex *mu; + int len; + struct __microserviceEndpoint **endpoints; +} __microserviceEndpointList; + +struct __microserviceEndpoint +{ + // The name of the endpoint, uniquely identifies the endpoint. The name "" + // is reserved for the top level endpoint of the service. + char *name; + + // Indicates if the endpoint is stopped, or is actively subscribed to a + // subject. + bool stopped; + + // The subject that the endpoint is listening on. The subject is also used + // as the prefix for the children endpoints. + char *subject; + natsSubscription *sub; + + // Children endpoints. Their subscriptions are prefixed with the parent's + // subject. Their stats are summarized in the parent's stats when requested. + __microserviceEndpointList *children; + int len_children; + + // Endpoint stats. These are initialized only for running endpoints, and are + // cleared if the endpoint is stopped. + natsMutex *stats_mu; + natsMicroserviceEndpointStats *stats; + + // References to other entities. + natsMicroservice *m; + natsMicroserviceEndpointConfig *config; +}; + +struct __microservice +{ + natsConnection *nc; + int refs; + natsMutex *mu; + struct __microserviceIdentity identity; + struct natsMicroserviceConfig *cfg; + bool stopped; + struct __microserviceEndpoint *root; +}; + +struct __microserviceRequest +{ + natsMsg *msg; + struct __microserviceEndpoint *ep; + char *err; + void *closure; +}; + +typedef struct __args_t +{ + void **args; + int count; +} __args_t; + +extern natsMicroserviceError *natsMicroserviceErrorOutOfMemory; +extern natsMicroserviceError *natsMicroserviceErrorInvalidArg; + +natsStatus _initMicroserviceMonitoring(natsMicroservice *m); -extern natsStatus -_stopMicroserviceEndpoint(natsMicroserviceEndpoint *endpoint); -extern void -_freeMicroserviceEndpoint(natsMicroserviceEndpoint *endpoint); +natsStatus +_newMicroserviceEndpoint(natsMicroserviceEndpoint **new_endpoint, natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg); + +natsStatus +natsMicroserviceEndpoint_AddEndpoint(natsMicroserviceEndpoint **new_ep, natsMicroserviceEndpoint *parent, const char *name, natsMicroserviceEndpointConfig *cfg); + +natsStatus +natsMicroserviceEndpoint_Start(natsMicroserviceEndpoint *ep); + +natsStatus +natsMicroserviceEndpoint_Stop(natsMicroserviceEndpoint *ep); + +natsStatus +natsMicroserviceEndpoint_Destroy(natsMicroserviceEndpoint *ep); + +natsStatus +_destroyMicroserviceEndpointList(__microserviceEndpointList *list); + +natsStatus +_newMicroserviceEndpointList(__microserviceEndpointList **new_list); + +natsMicroserviceEndpoint * +_microserviceEndpointList_Find(__microserviceEndpointList *list, const char *name, int *index); + +natsStatus +_microserviceEndpointList_Put(__microserviceEndpointList *list, int index, natsMicroserviceEndpoint *ep); + +natsStatus +_microserviceEndpointList_Remove(__microserviceEndpointList *list, int index); + +natsStatus +_newMicroserviceRequest(natsMicroserviceRequest **new_request, natsMsg *msg); +void _freeMicroserviceRequest(natsMicroserviceRequest *req); #endif /* MICROP_H_ */ diff --git a/src/nats.h b/src/nats.h index b2ec5290b..5b53a29a5 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1158,81 +1158,6 @@ typedef struct jsOptions } jsOptions; -/** - * The Microservice object. Initialize with #nats_AddMicroservice. - * TODO document the interface. - */ -typedef struct __microservice natsMicroservice; - -/** - * The Microservice request object. - * TODO document the interface. - */ -typedef struct __microserviceRequest natsMicroserviceRequest; - -/** - * The Microservice endpoint object. - * TODO document the interface. - */ -typedef struct __microserviceEndpoint natsMicroserviceEndpoint; - -/** \brief Callback used to deliver messages to a microservice. - * - * This is the callback that one provides when creating a microservice endpoint. - * The library will invoke this callback for each message arriving to the - * specified subject. - * - * @see natsMicroservice_AddEndpoint() - * @see natsMicroserviceEndpoint_AddEndpoint() - */ -typedef void (*natsMicroserviceRequestHandler)(natsMicroservice *m, natsMicroserviceRequest *req, void *closure); - -/** - * The Microservice schema object. - */ -typedef struct natsMicroserviceSchema -{ - const char *request; - const char *response; -} natsMicroserviceSchema; - -/** - * The Microservice endpoint configuration object. - */ -typedef struct natsMicroserviceEndpointConfig -{ - const char *subject; - natsMicroserviceRequestHandler handler; - void *closure; - natsMicroserviceSchema *schema; -} natsMicroserviceEndpointConfig; - -/** - * The Microservice endpoint stats struct. - */ -typedef struct natsMicroserviceEndpointStats -{ - char *name; - char *subject; - int num_requests; - int num_errors; - char *last_error; - int processing_time_ms; - int average_processing_time_ms; - bool stopped; -} natsMicroserviceEndpointStats; - -/** - * The Microservice configuration object. Initialize with #natsMicroserviceConfig_Init. - */ -typedef struct natsMicroserviceConfig -{ - const char *name; - const char *version; - const char *description; - natsMicroserviceEndpointConfig *endpoint; -} natsMicroserviceConfig; - /** * The KeyValue store object. */ @@ -7177,112 +7102,6 @@ kvStatus_Destroy(kvStatus *sts); /** @} */ // end of kvGroup -/** \defgroup microserviceGroup Microservice support - * - * A simple NATS-based microservice implementation framework. - * - * \warning EXPERIMENTAL FEATURE! We reserve the right to change the API without - * necessarily bumping the major version of the library. - * - * @{ - */ - -// /** \brief Initializes a Microservice configuration structure. -// * -// * Use this before setting specific #natsMicroserviceConfig options and passing it -// * to #nats_AddMicroservice. -// * -// * @see nats_AddMicroservice -// * -// * @param cfg the pointer to the stack variable #natsMicroserviceConfig to -// * initialize. -// */ -// NATS_EXTERN natsStatus -// natsMicroserviceConfig_Init(natsMicroserviceConfig *cfg); - -/** \brief Adds a microservice with a given configuration. - * - * Adds a microservice with a given configuration. - * - * \note The return #natsMicroservice object needs to be destroyed using - * #natsMicroservice_Destroy when no longer needed to free allocated memory. - * - * @param new_microservice the location where to store the newly created - * #natsMicroservice object. - * @param nc the pointer to the #natsCOnnection object on which the service will listen on. - * @param cfg the pointer to the #natsMicroserviceConfig configuration - * information used to create the #natsMicroservice object. - */ -NATS_EXTERN natsStatus -nats_AddMicroservice(natsMicroservice **new_microservice, natsConnection *nc, natsMicroserviceConfig *cfg); - -/** \brief Waits for the microservice to stop. - */ -NATS_EXTERN natsStatus -natsMicroservice_Run(natsMicroservice *m); - -/** \brief Stops a running microservice. - */ -NATS_EXTERN natsStatus -natsMicroservice_Stop(natsMicroservice *m); - -/** \brief Checks if a microservice is stopped. - */ -NATS_EXTERN bool -natsMicroservice_IsStopped(natsMicroservice *m); - -/** \brief Destroys a microservice object. - * - * Destroys a microservice object; frees all memory. The service must be stopped - * first, this function does not check if it is. - */ -NATS_EXTERN void -natsMicroservice_Destroy(natsMicroservice *m); - -/** \brief Adds (and starts) a microservice endpoint. - */ -NATS_EXTERN natsStatus -natsMicroservice_AddEndpoint(natsMicroservice *m, const char *name, natsMicroserviceEndpointConfig *cfg); - -/** \brief Adds (and starts) a microservice endpoint. - */ -NATS_EXTERN natsStatus -natsMicroservice_Respond(natsMicroservice *m, natsMicroserviceRequest *r, const char *data, int len); - -/** \brief Returns the original NATS message underlying the request. - */ -NATS_EXTERN natsMsg* -natsMicroserviceRequest_GetMsg(natsMicroserviceRequest *req); - -#define natsMicroserviceRequestHeader_Set(req, key, value) \ - natsMsgHeader_Set(natsMicroserviceRequest_GetMsg(req), (key), (value)) -#define natsMicroserviceRequestHeader_Add(req, key, value) \ - natsMsgHeader_Add(natsMicroserviceRequest_GetMsg(req), (key), (value)) -#define natsMicroserviceRequestHeader_Get(req, key, value) \ - natsMsgHeader_Get(natsMicroserviceRequest_GetMsg(req), (key), (value)) -#define natsMicroserviceRequestHeader_Values(req, key, values, count) \ - natsMsgHeader_Values(natsMicroserviceRequest_GetMsg(req), (key), (values), (count)) -#define natsMicroserviceRequestHeader_Keys(req, key, keys, count) \ - natsMsgHeader_Keys(natsMicroserviceRequest_GetMsg(req), (key), (keys), (count)) -#define natsMicroserviceRequestHeader_Delete(req, key) \ - natsMsgHeader_Delete(natsMicroserviceRequest_GetMsg(req), (key)) - -#define natsMicroserviceRequest_GetSubject(req) \ - natsMsg_GetSubject(natsMicroserviceRequest_GetMsg(req)) -#define natsMicroserviceRequest_GetReply(req) \ - natsMsg_GetReply(natsMicroserviceRequest_GetMsg(req)) -#define natsMicroserviceRequest_GetData(req) \ - natsMsg_GetData(natsMicroserviceRequest_GetMsg(req)) -#define natsMicroserviceRequest_GetDataLength(req) \ - natsMsg_GetDataLength(natsMicroserviceRequest_GetMsg(req)) -#define natsMicroserviceRequest_GetSequence(req) \ - natsMsg_GetSequence(natsMicroserviceRequest_GetMsg(req)) -#define natsMicroserviceRequest_GetTime(req) \ - natsMsg_GetTime(natsMicroserviceRequest_GetMsg(req)) - -/** @} */ // end of microserviceGroup - - /** @} */ // end of funcGroup /** \defgroup wildcardsGroup Wildcards diff --git a/src/natsp.h b/src/natsp.h index acef7ecf7..21c71e8a4 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -132,6 +132,8 @@ #endif #define IFOK(s, c) if (s == NATS_OK) { s = (c); } +// TODO <>/<> do I really need it? +#define IFOK_DO(s, c) if (s == NATS_OK) { c; } #define NATS_MILLIS_TO_NANOS(d) (((int64_t)d)*(int64_t)1E6) #define NATS_SECONDS_TO_NANOS(d) (((int64_t)d)*(int64_t)1E9) @@ -432,47 +434,6 @@ typedef struct __jsSub } jsSub; -struct __microserviceConfig -{ - const char *name; - const char *version; - const char *description; -}; - -struct __microserviceIdentity -{ - const char *name; - const char *version; - char *id; -}; - -struct __microservice -{ - natsMutex *mu; - natsConnection *nc; - int refs; - struct __microserviceIdentity identity; - struct natsMicroserviceConfig cfg; - bool stopped; - struct __microserviceEndpoint **endpoints; - int num_endpoints; -}; - -struct __microserviceRequest -{ - natsMsg *msg; - char *err; -}; - -struct __microserviceEndpoint -{ - natsMicroservice *m; - bool stopped; - natsMicroserviceEndpointConfig config; - natsMicroserviceEndpointStats stats; - natsSubscription *sub; -}; - struct __kvStore { natsMutex *mu;