Skip to content

Commit

Permalink
Re-think the logger API and internal propagation between shared core …
Browse files Browse the repository at this point in the history
…and SDK (#465)

* Re-think the logging API, clearly separating the logger transport with the logger facade. Introduce a Logger interface, extending the Console interface, that will be used within the sdk by the shared core.
* Proper fix for log propagation from shared core to SDK. Now the logs are correctly propagated and they use the context aware logger created when the invocation starts.
  • Loading branch information
slinkydeveloper authored Dec 16, 2024
1 parent 759b32b commit 189153b
Show file tree
Hide file tree
Showing 20 changed files with 679 additions and 444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
export function start(): void;
/**
* This will set the log level of the overall log subscriber.
* @param {LogLevel} level
*/
export function set_log_level(level: LogLevel): void;
Expand All @@ -22,6 +23,8 @@ export interface WasmFailure {
message: string;
}

export type WasmSendHandle = number;

export interface WasmExponentialRetryConfig {
initial_interval: number | undefined;
factor: number;
Expand All @@ -35,7 +38,7 @@ export interface WasmAwakeable {
handle: number;
}

export type WasmAsyncResultValue = "NotReady" | "Empty" | { Success: Uint8Array } | { Failure: WasmFailure } | { StateKeys: string[] } | { CombinatorResult: WasmAsyncResultHandle[] };
export type WasmAsyncResultValue = "NotReady" | "Empty" | { Success: Uint8Array } | { Failure: WasmFailure } | { StateKeys: string[] } | { InvocationId: string } | { CombinatorResult: WasmAsyncResultHandle[] };

export type WasmRunEnterResult = { ExecutedWithSuccess: Uint8Array } | { ExecutedWithFailure: WasmFailure } | "NotExecuted";

Expand Down Expand Up @@ -103,8 +106,10 @@ export class WasmVM {
free(): void;
/**
* @param {(WasmHeader)[]} headers
* @param {LogLevel} log_level
* @param {number} logger_id
*/
constructor(headers: (WasmHeader)[]);
constructor(headers: (WasmHeader)[], log_level: LogLevel, logger_id: number);
/**
* @returns {WasmResponseHead}
*/
Expand Down Expand Up @@ -172,18 +177,21 @@ export class WasmVM {
* @param {string} service
* @param {string} handler
* @param {Uint8Array} buffer
* @param {string | undefined} [key]
* @param {string | undefined} key
* @param {(WasmHeader)[]} headers
* @returns {number}
*/
sys_call(service: string, handler: string, buffer: Uint8Array, key?: string): number;
sys_call(service: string, handler: string, buffer: Uint8Array, key: string | undefined, headers: (WasmHeader)[]): number;
/**
* @param {string} service
* @param {string} handler
* @param {Uint8Array} buffer
* @param {string | undefined} [key]
* @param {string | undefined} key
* @param {(WasmHeader)[]} headers
* @param {bigint | undefined} [delay]
* @returns {WasmSendHandle}
*/
sys_send(service: string, handler: string, buffer: Uint8Array, key?: string, delay?: bigint): void;
sys_send(service: string, handler: string, buffer: Uint8Array, key: string | undefined, headers: (WasmHeader)[], delay?: bigint): WasmSendHandle;
/**
* @returns {WasmAwakeable}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ export function start() {
wasm.start();
}

function getArrayU8FromWasm0(ptr, len) {
ptr = ptr >>> 0;
return getUint8ArrayMemory0().subarray(ptr / 1, ptr / 1 + len);
}
/**
* This will set the log level of the overall log subscriber.
* @param {LogLevel} level
*/
export function set_log_level(level) {
Expand Down Expand Up @@ -568,13 +573,15 @@ export class WasmVM {
}
/**
* @param {(WasmHeader)[]} headers
* @param {LogLevel} log_level
* @param {number} logger_id
*/
constructor(headers) {
constructor(headers, log_level, logger_id) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
const len0 = WASM_VECTOR_LEN;
wasm.wasmvm_new(retptr, ptr0, len0);
wasm.wasmvm_new(retptr, ptr0, len0, log_level, logger_id);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
Expand Down Expand Up @@ -801,10 +808,11 @@ export class WasmVM {
* @param {string} service
* @param {string} handler
* @param {Uint8Array} buffer
* @param {string | undefined} [key]
* @param {string | undefined} key
* @param {(WasmHeader)[]} headers
* @returns {number}
*/
sys_call(service, handler, buffer, key) {
sys_call(service, handler, buffer, key, headers) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(service, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
Expand All @@ -813,7 +821,9 @@ export class WasmVM {
const len1 = WASM_VECTOR_LEN;
var ptr2 = isLikeNone(key) ? 0 : passStringToWasm0(key, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
var len2 = WASM_VECTOR_LEN;
wasm.wasmvm_sys_call(retptr, this.__wbg_ptr, ptr0, len0, ptr1, len1, addHeapObject(buffer), ptr2, len2);
const ptr3 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
const len3 = WASM_VECTOR_LEN;
wasm.wasmvm_sys_call(retptr, this.__wbg_ptr, ptr0, len0, ptr1, len1, addHeapObject(buffer), ptr2, len2, ptr3, len3);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
Expand All @@ -829,10 +839,12 @@ export class WasmVM {
* @param {string} service
* @param {string} handler
* @param {Uint8Array} buffer
* @param {string | undefined} [key]
* @param {string | undefined} key
* @param {(WasmHeader)[]} headers
* @param {bigint | undefined} [delay]
* @returns {WasmSendHandle}
*/
sys_send(service, handler, buffer, key, delay) {
sys_send(service, handler, buffer, key, headers, delay) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(service, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
Expand All @@ -841,12 +853,16 @@ export class WasmVM {
const len1 = WASM_VECTOR_LEN;
var ptr2 = isLikeNone(key) ? 0 : passStringToWasm0(key, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
var len2 = WASM_VECTOR_LEN;
wasm.wasmvm_sys_send(retptr, this.__wbg_ptr, ptr0, len0, ptr1, len1, addHeapObject(buffer), ptr2, len2, !isLikeNone(delay), isLikeNone(delay) ? BigInt(0) : delay);
const ptr3 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
const len3 = WASM_VECTOR_LEN;
wasm.wasmvm_sys_send(retptr, this.__wbg_ptr, ptr0, len0, ptr1, len1, addHeapObject(buffer), ptr2, len2, ptr3, len3, !isLikeNone(delay), isLikeNone(delay) ? BigInt(0) : delay);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
if (r1) {
throw takeObject(r0);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
if (r2) {
throw takeObject(r1);
}
return takeObject(r0);
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
}
Expand Down Expand Up @@ -1229,8 +1245,8 @@ export class WasmVM {
}
}

export function __wbg_vmlog_13455a06b760bbc0(arg0, arg1, arg2) {
vm_log(arg0, getStringFromWasm0(arg1, arg2));
export function __wbg_vmlog_13455a06b760bbc0(arg0, arg1, arg2, arg3, arg4) {
vm_log(arg0, getArrayU8FromWasm0(arg1, arg2), arg3 === 0 ? undefined : arg4 >>> 0);
};

export function __wbindgen_object_drop_ref(arg0) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export function __wbg_wasminput_free(a: number, b: number): void;
export function __wbg_get_wasminput_headers(a: number, b: number): void;
export function __wbg_get_wasminput_input(a: number): number;
export function __wbg_wasmvm_free(a: number, b: number): void;
export function wasmvm_new(a: number, b: number, c: number): void;
export function wasmvm_new(a: number, b: number, c: number, d: number, e: number): void;
export function wasmvm_get_response_head(a: number): number;
export function wasmvm_notify_input(a: number, b: number, c: number): void;
export function wasmvm_notify_input_closed(a: number): void;
Expand All @@ -30,8 +30,8 @@ export function wasmvm_sys_set_state(a: number, b: number, c: number, d: number,
export function wasmvm_sys_clear_state(a: number, b: number, c: number, d: number): void;
export function wasmvm_sys_clear_all_state(a: number, b: number): void;
export function wasmvm_sys_sleep(a: number, b: number, c: number): void;
export function wasmvm_sys_call(a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number): void;
export function wasmvm_sys_send(a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number): void;
export function wasmvm_sys_call(a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number): void;
export function wasmvm_sys_send(a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number, l: number, m: number): void;
export function wasmvm_sys_awakeable(a: number, b: number): void;
export function wasmvm_sys_complete_awakeable_success(a: number, b: number, c: number, d: number, e: number): void;
export function wasmvm_sys_complete_awakeable_failure(a: number, b: number, c: number, d: number, e: number): void;
Expand Down
10 changes: 5 additions & 5 deletions packages/restate-sdk-examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
"test": "vitest run --silent",
"verify": "npm run format-check && npm run lint && npm run build",
"release": "",
"object": "RESTATE_JOURNAL_LOGGING=OFF tsx ./src/object.ts",
"greeter": "RESTATE_JOURNAL_LOGGING=OFF tsx ./src/greeter.ts",
"workflow": "RESTATE_JOURNAL_LOGGING=OFF tsx ./src/workflow.ts",
"workflow_client": "RESTATE_JOURNAL_LOGGING=OFF tsx ./src/workflow_client.ts",
"ingress": "RESTATE_JOURNAL_LOGGING=OFF tsx ./src/ingress_client.ts"
"object": "RESTATE_LOGGING=debug tsx ./src/object.ts",
"greeter": "RESTATE_LOGGING=debug tsx ./src/greeter.ts",
"workflow": "RESTATE_LOGGING=debug tsx ./src/workflow.ts",
"workflow_client": "RESTATE_LOGGING=debug tsx ./src/workflow_client.ts",
"ingress": "RESTATE_LOGGING=debug tsx ./src/ingress_client.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.4.0",
Expand Down
5 changes: 3 additions & 2 deletions packages/restate-sdk/src/common_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ export type {
export type { ServiceBundle, RestateEndpoint } from "./endpoint.js";
export { RestateError, TerminalError, TimeoutError } from "./types/errors.js";
export type {
Logger,
LoggerTransport,
LogMetadata,
LogParams,
RestateLogLevel,
LoggerContext,
LogSource,
} from "./logger.js";
} from "./logging/logger_transport.js";
12 changes: 6 additions & 6 deletions packages/restate-sdk/src/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type {
ServiceDefinition,
WorkflowDefinition,
} from "@restatedev/restate-sdk-core";
import type { Logger } from "./logger.js";
import type { LoggerTransport } from "./logging/logger_transport.js";

/**
* Utility interface for a bundle of one or more services belonging together
Expand Down Expand Up @@ -65,27 +65,27 @@ export interface RestateEndpointBase<E> {
withIdentityV1(...keys: string[]): E;

/**
* Replace the default console-based {@link Logger}
* Replace the default console-based {@link LoggerTransport}
* @param logger
* @example
* Using console:
* ```ts
* restate.setLogger((params, message, ...o) => {console.log(`${params.level}: `, message, ...o)})
* restate.setLogger((meta, message, ...o) => {console.log(`${meta.level}: `, message, ...o)})
* ```
* @example
* Using winston:
* ```ts
* const logger = createLogger({ ... })
* restate.setLogger((params, message, ...o) => {logger.log(params.level, {invocationId: params.context?.invocationId}, [message, ...o].join(' '))})
* restate.setLogger((meta, message, ...o) => {logger.log(meta.level, {invocationId: meta.context?.invocationId}, [message, ...o].join(' '))})
* ```
* @example
* Using pino:
* ```ts
* const logger = pino()
* restate.setLogger((params, message, ...o) => {logger[params.level]({invocationId: params.context?.invocationId}, [message, ...o].join(' '))})
* restate.setLogger((meta, message, ...o) => {logger[meta.level]({invocationId: meta.context?.invocationId}, [message, ...o].join(' '))})
* ```
*/
setLogger(logger: Logger): E;
setLogger(logger: LoggerTransport): E;
}

/**
Expand Down
18 changes: 9 additions & 9 deletions packages/restate-sdk/src/endpoint/endpoint_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import {
} from "../types/components.js";

import type * as discovery from "../types/discovery.js";
import { defaultLoggerTransport } from "../logging/console_logger_transport.js";
import {
type LoggerTransport,
LogSource,
type Logger,
createRestateConsole,
defaultLogger,
} from "../logger.js";
} from "../logging/logger_transport.js";
import { createLogger } from "../logging/logger.js";

function isServiceDefinition<P extends string, M>(
m: Record<string, any>
Expand All @@ -54,14 +54,14 @@ function isWorkflowDefinition<P extends string, M>(

export class EndpointBuilder {
private readonly services: Map<string, Component> = new Map();
public logger: Logger = defaultLogger;
public loggerTransport: LoggerTransport = defaultLoggerTransport;

/**
* This is a simple console without contextual info.
*
* This should be used only in cases where no contextual info is available.
*/
public rlog = createRestateConsole(this.logger, LogSource.SYSTEM);
public rlog = createLogger(this.loggerTransport, LogSource.SYSTEM);

private _keySet: string[] = [];

Expand Down Expand Up @@ -114,9 +114,9 @@ export class EndpointBuilder {
return this;
}

public setLogger(newLogger: Logger) {
this.logger = newLogger;
this.rlog = createRestateConsole(this.logger, LogSource.SYSTEM);
public setLogger(newLogger: LoggerTransport) {
this.loggerTransport = newLogger;
this.rlog = createLogger(this.loggerTransport, LogSource.SYSTEM);
return this;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/restate-sdk/src/endpoint/fetch_endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type {
import { GenericHandler } from "./handlers/generic.js";
import { fetcher } from "./handlers/fetch.js";
import { ProtocolMode } from "../types/discovery.js";
import type { Logger } from "../logger.js";
import type { LoggerTransport } from "../logging/logger_transport.js";

/**
* Generic Fetch encapsulates all the Restate services served by this endpoint.
Expand Down Expand Up @@ -93,7 +93,7 @@ export class FetchEndpointImpl implements FetchEndpoint {
return this;
}

public setLogger(newLogger: Logger): FetchEndpoint {
public setLogger(newLogger: LoggerTransport): FetchEndpoint {
this.builder.setLogger(newLogger);
return this;
}
Expand Down
Loading

0 comments on commit 189153b

Please sign in to comment.