diff --git a/libraries/adb/src/adb.ts b/libraries/adb/src/adb.ts index bdb384d27..3ccace618 100644 --- a/libraries/adb/src/adb.ts +++ b/libraries/adb/src/adb.ts @@ -40,6 +40,8 @@ export interface AdbTransport extends Closeable { readonly disconnected: Promise; + readonly clientFeatures: readonly AdbFeature[]; + connect(service: string): ValueOrPromise; addReverseTunnel( @@ -71,6 +73,14 @@ export class Adb implements Closeable { return this.transport.disconnected; } + public get clientFeatures() { + return this.transport.clientFeatures; + } + + public get deviceFeatures() { + return this.banner.features; + } + readonly subprocess: AdbSubprocess; readonly power: AdbPower; readonly reverse: AdbReverseCommand; @@ -85,8 +95,11 @@ export class Adb implements Closeable { this.tcpip = new AdbTcpIpCommand(this); } - supportsFeature(feature: AdbFeature): boolean { - return this.banner.features.includes(feature); + canUseFeature(feature: AdbFeature): boolean { + return ( + this.clientFeatures.includes(feature) && + this.deviceFeatures.includes(feature) + ); } async createSocket(service: string): Promise { diff --git a/libraries/adb/src/commands/subprocess/protocols/shell.ts b/libraries/adb/src/commands/subprocess/protocols/shell.ts index a97e78e96..476eb1c76 100644 --- a/libraries/adb/src/commands/subprocess/protocols/shell.ts +++ b/libraries/adb/src/commands/subprocess/protocols/shell.ts @@ -47,7 +47,7 @@ type AdbShellProtocolPacket = StructValueType; */ export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol { static isSupported(adb: Adb) { - return adb.supportsFeature(AdbFeature.ShellV2); + return adb.canUseFeature(AdbFeature.ShellV2); } static async pty(adb: Adb, command: string) { diff --git a/libraries/adb/src/commands/sync/sync.ts b/libraries/adb/src/commands/sync/sync.ts index 300b6755c..d3595adbc 100644 --- a/libraries/adb/src/commands/sync/sync.ts +++ b/libraries/adb/src/commands/sync/sync.ts @@ -74,16 +74,15 @@ export class AdbSync extends AutoDisposable { this._adb = adb; this._socket = new AdbSyncSocket(socket, adb.maxPayloadSize); - this.#supportsStat = adb.supportsFeature(AdbFeature.StatV2); - this.#supportsListV2 = adb.supportsFeature(AdbFeature.ListV2); - this.#fixedPushMkdir = adb.supportsFeature(AdbFeature.FixedPushMkdir); - this.#supportsSendReceiveV2 = adb.supportsFeature( + this.#supportsStat = adb.canUseFeature(AdbFeature.StatV2); + this.#supportsListV2 = adb.canUseFeature(AdbFeature.ListV2); + this.#fixedPushMkdir = adb.canUseFeature(AdbFeature.FixedPushMkdir); + this.#supportsSendReceiveV2 = adb.canUseFeature( AdbFeature.SendReceiveV2, ); // https://android.googlesource.com/platform/packages/modules/adb/+/91768a57b7138166e0a3d11f79cd55909dda7014/client/file_sync_client.cpp#1361 this.#needPushMkdirWorkaround = - this._adb.supportsFeature(AdbFeature.ShellV2) && - !this.fixedPushMkdir; + this._adb.canUseFeature(AdbFeature.ShellV2) && !this.fixedPushMkdir; } /** diff --git a/libraries/adb/src/daemon/dispatcher.ts b/libraries/adb/src/daemon/dispatcher.ts index 8645fe339..eef6ac49a 100644 --- a/libraries/adb/src/daemon/dispatcher.ts +++ b/libraries/adb/src/daemon/dispatcher.ts @@ -13,7 +13,7 @@ import { ConsumableWritableStream, WritableStream, } from "@yume-chan/stream-extra"; -import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct"; +import { EMPTY_UINT8_ARRAY, NumberFieldType } from "@yume-chan/struct"; import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js"; import { decodeUtf8, encodeUtf8 } from "../utils/index.js"; @@ -32,6 +32,13 @@ export interface AdbPacketDispatcherOptions { */ appendNullToServiceString: boolean; maxPayloadSize: number; + /** + * The number of bytes the device can send before receiving an ack packet. + + * Set to 0 or any negative value to disable delayed ack. + * Otherwise the value must be in the range of unsigned 32-bit integer. + */ + initialDelayedAckBytes: number; /** * Whether to preserve the connection open after the `AdbPacketDispatcher` is closed. */ @@ -39,6 +46,11 @@ export interface AdbPacketDispatcherOptions { debugSlowRead?: boolean | undefined; } +interface SocketOpenResult { + remoteId: number; + availableWriteBytes: number; +} + /** * The dispatcher is the "dumb" part of the connection handling logic. * @@ -79,6 +91,10 @@ export class AdbPacketDispatcher implements Closeable { options: AdbPacketDispatcherOptions, ) { this.options = options; + // Don't allow negative values in dispatcher + if (this.options.initialDelayedAckBytes < 0) { + this.options.initialDelayedAckBytes = 0; + } connection.readable .pipeTo( @@ -169,15 +185,39 @@ export class AdbPacketDispatcher implements Closeable { } #handleOkay(packet: AdbPacketData) { - if (this.#initializers.resolve(packet.arg1, packet.arg0)) { + let ackBytes: number; + if (this.options.initialDelayedAckBytes !== 0) { + if (packet.payload.byteLength !== 4) { + throw new Error( + "Invalid OKAY packet. Payload size should be 4", + ); + } + ackBytes = NumberFieldType.Uint32.deserialize(packet.payload, true); + } else { + if (packet.payload.byteLength !== 0) { + throw new Error( + "Invalid OKAY packet. Payload size should be 0", + ); + } + ackBytes = Infinity; + } + + if ( + this.#initializers.resolve(packet.arg1, { + remoteId: packet.arg0, + availableWriteBytes: ackBytes, + } satisfies SocketOpenResult) + ) { // Device successfully created the socket return; } const socket = this.#sockets.get(packet.arg1); if (socket) { - // Device has received last `WRTE` to the socket - socket.ack(); + // When delayed ack is enabled, device has received `ackBytes` from the socket. + // When delayed ack is disabled, device has received last `WRTE` packet from the socket, + // `ackBytes` is `Infinity` in this case. + socket.ack(ackBytes); return; } @@ -186,6 +226,18 @@ export class AdbPacketDispatcher implements Closeable { void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0); } + #sendOkay(localId: number, remoteId: number, ackBytes: number) { + let payload: Uint8Array; + if (this.options.initialDelayedAckBytes !== 0) { + payload = new Uint8Array(4); + new DataView(payload.buffer).setUint32(0, ackBytes, true); + } else { + payload = EMPTY_UINT8_ARRAY; + } + + return this.sendPacket(AdbCommand.Okay, localId, remoteId, payload); + } + async #handleOpen(packet: AdbPacketData) { // `AsyncOperationManager` doesn't support skipping IDs // Use `add` + `resolve` to simulate this behavior @@ -193,9 +245,20 @@ export class AdbPacketDispatcher implements Closeable { this.#initializers.resolve(localId, undefined); const remoteId = packet.arg0; - let service = decodeUtf8(packet.payload); - if (service.endsWith("\0")) { - service = service.substring(0, service.length - 1); + let initialDelayedAckBytes = packet.arg1; + const service = decodeUtf8(packet.payload); + + if (this.options.initialDelayedAckBytes === 0) { + if (initialDelayedAckBytes !== 0) { + throw new Error("Invalid OPEN packet. arg1 should be 0"); + } + initialDelayedAckBytes = Infinity; + } else { + if (initialDelayedAckBytes === 0) { + throw new Error( + "Invalid OPEN packet. arg1 should be greater than 0", + ); + } } const handler = this.#incomingSocketHandlers.get(service); @@ -211,11 +274,16 @@ export class AdbPacketDispatcher implements Closeable { localCreated: false, service, }); + controller.ack(initialDelayedAckBytes); try { await handler(controller.socket); this.#sockets.set(localId, controller); - await this.sendPacket(AdbCommand.Okay, localId, remoteId); + await this.#sendOkay( + localId, + remoteId, + this.options.initialDelayedAckBytes, + ); } catch (e) { await this.sendPacket(AdbCommand.Close, 0, remoteId); } @@ -238,10 +306,10 @@ export class AdbPacketDispatcher implements Closeable { }), (async () => { await socket.enqueue(packet.payload); - await this.sendPacket( - AdbCommand.Okay, + await this.#sendOkay( packet.arg1, packet.arg0, + packet.payload.length, ); handled = true; })(), @@ -255,11 +323,17 @@ export class AdbPacketDispatcher implements Closeable { service += "\0"; } - const [localId, initializer] = this.#initializers.add(); - await this.sendPacket(AdbCommand.Open, localId, 0, service); + const [localId, initializer] = + this.#initializers.add(); + await this.sendPacket( + AdbCommand.Open, + localId, + this.options.initialDelayedAckBytes, + service, + ); // Fulfilled by `handleOk` - const remoteId = await initializer; + const { remoteId, availableWriteBytes } = await initializer; const controller = new AdbDaemonSocketController({ dispatcher: this, localId, @@ -267,6 +341,7 @@ export class AdbPacketDispatcher implements Closeable { localCreated: true, service, }); + controller.ack(availableWriteBytes); this.#sockets.set(localId, controller); return controller.socket; diff --git a/libraries/adb/src/daemon/socket.ts b/libraries/adb/src/daemon/socket.ts index 5d76c6e2e..42608620f 100644 --- a/libraries/adb/src/daemon/socket.ts +++ b/libraries/adb/src/daemon/socket.ts @@ -49,7 +49,6 @@ export class AdbDaemonSocketController return this.#readable; } - #writePromise: PromiseResolver | undefined; #writableController!: WritableStreamDefaultController; readonly writable: WritableStream>; @@ -65,6 +64,23 @@ export class AdbDaemonSocketController return this.#socket; } + #availableWriteBytesChanged: PromiseResolver | undefined; + /** + * When delayed ack is disabled, can be `Infinity` if the socket is ready to write. + * Exactly one packet can be written no matter how large it is. Or `-1` if the socket + * is waiting for ack. + * + * When delayed ack is enabled, a non-negative finite number indicates the number of + * bytes that can be written to the socket before receiving an ack. + */ + #availableWriteBytes = 0; + /** + * Gets the number of bytes that can be written to the socket without blocking. + */ + public get availableWriteBytes() { + return this.#availableWriteBytes; + } + constructor(options: AdbDaemonSocketConstructionOptions) { this.#dispatcher = options.dispatcher; this.localId = options.localId; @@ -88,17 +104,30 @@ export class AdbDaemonSocketController start < size; start = end, end += chunkSize ) { - this.#writePromise = new PromiseResolver(); + const chunk = data.subarray(start, end); + const length = chunk.byteLength; + while (this.#availableWriteBytes < length) { + // Only one lock is required because Web Streams API guarantees + // that `write` is not reentrant. + this.#availableWriteBytesChanged = + new PromiseResolver(); + await raceSignal( + () => this.#availableWriteBytesChanged!.promise, + controller.signal, + ); + } + + if (this.#availableWriteBytes === Infinity) { + this.#availableWriteBytes = -1; + } else { + this.#availableWriteBytes -= length; + } + await this.#dispatcher.sendPacket( AdbCommand.Write, this.localId, this.remoteId, - data.subarray(start, end), - ); - // Wait for ack packet - await raceSignal( - () => this.#writePromise!.promise, - controller.signal, + chunk, ); } }, @@ -124,8 +153,9 @@ export class AdbDaemonSocketController } } - ack() { - this.#writePromise?.resolve(); + public ack(bytes: number) { + this.#availableWriteBytes += bytes; + this.#availableWriteBytesChanged?.resolve(); } async close(): Promise { @@ -134,6 +164,8 @@ export class AdbDaemonSocketController } this.#closed = true; + this.#availableWriteBytesChanged?.reject(new Error("Socket closed")); + try { this.#writableController.error(new Error("Socket closed")); } catch { diff --git a/libraries/adb/src/daemon/transport.ts b/libraries/adb/src/daemon/transport.ts index cdb6f2ab8..daca69962 100644 --- a/libraries/adb/src/daemon/transport.ts +++ b/libraries/adb/src/daemon/transport.ts @@ -26,6 +26,30 @@ import type { AdbPacketData, AdbPacketInit } from "./packet.js"; import { AdbCommand, calculateChecksum } from "./packet.js"; export const ADB_DAEMON_VERSION_OMIT_CHECKSUM = 0x01000001; +// https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 +// There are some other feature constants, but some of them are only used by ADB server, not devices (daemons). +export const ADB_DAEMON_DEFAULT_FEATURES = [ + AdbFeature.ShellV2, + AdbFeature.Cmd, + AdbFeature.StatV2, + AdbFeature.ListV2, + AdbFeature.FixedPushMkdir, + "apex", + AdbFeature.Abb, + // only tells the client the symlink timestamp issue in `adb push --sync` has been fixed. + // No special handling required. + "fixed_push_symlink_timestamp", + AdbFeature.AbbExec, + "remount_shell", + "track_app", + AdbFeature.SendReceiveV2, + "sendrecv_v2_brotli", + "sendrecv_v2_lz4", + "sendrecv_v2_zstd", + "sendrecv_v2_dry_run_send", + AdbFeature.DelayedAck, +] as AdbFeature[]; +export const ADB_DAEMON_DEFAULT_INITIAL_PAYLOAD_SIZE = 32 * 1024 * 1024; export type AdbDaemonConnection = ReadableWritablePair< AdbPacketData, @@ -37,6 +61,16 @@ interface AdbDaemonAuthenticationOptions { connection: AdbDaemonConnection; credentialStore: AdbCredentialStore; authenticators?: AdbAuthenticator[]; + features?: readonly AdbFeature[]; + /** + * The number of bytes the device can send before receiving an ack packet. + * + * Set to 0 or any negative value to disable delayed ack in handshake. + * Otherwise the value must be in the range of unsigned 32-bit integer. + * + * Delayed ack requires Android 14, this option is ignored on older versions. + */ + initialDelayedAckBytes?: number; /** * Whether to preserve the connection open after the `AdbDaemonTransport` is closed. */ @@ -50,6 +84,16 @@ interface AdbDaemonSocketConnectorConstructionOptions { version: number; maxPayloadSize: number; banner: string; + features?: readonly AdbFeature[]; + /** + * The number of bytes the device can send before receiving an ack packet. + * + * Set to 0 or any negative value to disable delayed ack in handshake. + * Otherwise the value must be in the range of unsigned 32-bit integer. + * + * Delayed ack requires Android 14, this option is ignored on older versions. + */ + initialDelayedAckBytes?: number; /** * Whether to preserve the connection open after the `AdbDaemonTransport` is closed. */ @@ -71,6 +115,8 @@ export class AdbDaemonTransport implements AdbTransport { connection, credentialStore, authenticators = ADB_DEFAULT_AUTHENTICATORS, + features = ADB_DAEMON_DEFAULT_FEATURES, + initialDelayedAckBytes = ADB_DAEMON_DEFAULT_INITIAL_PAYLOAD_SIZE, ...options }: AdbDaemonAuthenticationOptions): Promise { // Initially, set to highest-supported version and payload size. @@ -144,38 +190,25 @@ export class AdbDaemonTransport implements AdbTransport { await ConsumableWritableStream.write(writer, init as AdbPacketInit); } + const actualFeatures = features.slice(); + if (initialDelayedAckBytes <= 0) { + const index = features.indexOf(AdbFeature.DelayedAck); + if (index !== -1) { + actualFeatures.splice(index, 1); + } + } + let banner: string; try { - // https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 - // There are some other feature constants, but some of them are only used by ADB server, not devices (daemons). - const features = [ - AdbFeature.ShellV2, - AdbFeature.Cmd, - AdbFeature.StatV2, - AdbFeature.ListV2, - AdbFeature.FixedPushMkdir, - "apex", - AdbFeature.Abb, - // only tells the client the symlink timestamp issue in `adb push --sync` has been fixed. - // No special handling required. - "fixed_push_symlink_timestamp", - AdbFeature.AbbExec, - "remount_shell", - "track_app", - AdbFeature.SendReceiveV2, - "sendrecv_v2_brotli", - "sendrecv_v2_lz4", - "sendrecv_v2_zstd", - "sendrecv_v2_dry_run_send", - ].join(","); - await sendPacket({ command: AdbCommand.Connect, arg0: version, arg1: maxPayloadSize, // The terminating `;` is required in formal definition // But ADB daemon (all versions) can still work without it - payload: encodeUtf8(`host::features=${features}`), + payload: encodeUtf8( + `host::features=${actualFeatures.join(",")}`, + ), }); banner = await resolver.promise; @@ -195,6 +228,8 @@ export class AdbDaemonTransport implements AdbTransport { version, maxPayloadSize, banner, + features: actualFeatures, + initialDelayedAckBytes, ...options, }); } @@ -229,16 +264,38 @@ export class AdbDaemonTransport implements AdbTransport { return this.#dispatcher.disconnected; } + #clientFeatures: readonly AdbFeature[]; + get clientFeatures() { + return this.#clientFeatures; + } + constructor({ serial, connection, version, banner, + features = ADB_DAEMON_DEFAULT_FEATURES, + initialDelayedAckBytes = ADB_DAEMON_DEFAULT_INITIAL_PAYLOAD_SIZE, ...options }: AdbDaemonSocketConnectorConstructionOptions) { this.#serial = serial; this.#connection = connection; this.#banner = AdbBanner.parse(banner); + this.#clientFeatures = features; + + if (features.includes(AdbFeature.DelayedAck)) { + if (initialDelayedAckBytes <= 0) { + throw new Error( + "`initialDelayedAckBytes` must be greater than 0 when DelayedAck feature is enabled.", + ); + } + + if (!this.#banner.features.includes(AdbFeature.DelayedAck)) { + initialDelayedAckBytes = 0; + } + } else { + initialDelayedAckBytes = 0; + } let calculateChecksum: boolean; let appendNullToServiceString: boolean; @@ -253,6 +310,7 @@ export class AdbDaemonTransport implements AdbTransport { this.#dispatcher = new AdbPacketDispatcher(connection, { calculateChecksum, appendNullToServiceString, + initialDelayedAckBytes, ...options, }); diff --git a/libraries/adb/src/features.ts b/libraries/adb/src/features.ts index 758ca5ec1..707c212c5 100644 --- a/libraries/adb/src/features.ts +++ b/libraries/adb/src/features.ts @@ -1,5 +1,5 @@ // The order follows -// https://android.googlesource.com/platform/packages/modules/adb/+/79010dc6d5ca7490c493df800d4421730f5466ca/transport.cpp#1252 +// https://cs.android.com/android/platform/superproject/+/master:packages/modules/adb/transport.cpp;l=77;drc=6d14d35d0241f6fee145f8e54ffd77252e8d29fd export enum AdbFeature { ShellV2 = "shell_v2", Cmd = "cmd", @@ -9,4 +9,5 @@ export enum AdbFeature { Abb = "abb", AbbExec = "abb_exec", SendReceiveV2 = "sendrecv_v2", + DelayedAck = "delayed_ack", } diff --git a/libraries/adb/src/server/transport.ts b/libraries/adb/src/server/transport.ts index dadb64b47..2f9ac9644 100644 --- a/libraries/adb/src/server/transport.ts +++ b/libraries/adb/src/server/transport.ts @@ -8,9 +8,31 @@ import type { AdbTransport, } from "../adb.js"; import type { AdbBanner } from "../banner.js"; +import { AdbFeature } from "../features.js"; import type { AdbServerClient } from "./client.js"; +export const ADB_SERVER_DEFAULT_FEATURES = [ + AdbFeature.ShellV2, + AdbFeature.Cmd, + AdbFeature.StatV2, + AdbFeature.ListV2, + AdbFeature.FixedPushMkdir, + "apex", + AdbFeature.Abb, + // only tells the client the symlink timestamp issue in `adb push --sync` has been fixed. + // No special handling required. + "fixed_push_symlink_timestamp", + AdbFeature.AbbExec, + "remount_shell", + "track_app", + AdbFeature.SendReceiveV2, + "sendrecv_v2_brotli", + "sendrecv_v2_lz4", + "sendrecv_v2_zstd", + "sendrecv_v2_dry_run_send", +] as AdbFeature[]; + export class AdbServerTransport implements AdbTransport { #client: AdbServerClient; @@ -26,6 +48,12 @@ export class AdbServerTransport implements AdbTransport { #waitAbortController = new AbortController(); readonly disconnected: Promise; + get clientFeatures() { + // No need to get host features (features supported by ADB server) + // Because we create all ADB packets ourselves + return ADB_SERVER_DEFAULT_FEATURES; + } + constructor( client: AdbServerClient, serial: string, diff --git a/libraries/android-bin/src/cmd.ts b/libraries/android-bin/src/cmd.ts index bd7fe07e1..7bcf5d4c6 100644 --- a/libraries/android-bin/src/cmd.ts +++ b/libraries/android-bin/src/cmd.ts @@ -35,10 +35,10 @@ export class Cmd extends AdbCommandBase { constructor(adb: Adb) { super(adb); - this.#supportsShellV2 = adb.supportsFeature(AdbFeature.ShellV2); - this.#supportsCmd = adb.supportsFeature(AdbFeature.Cmd); - this.#supportsAbb = adb.supportsFeature(AdbFeature.Abb); - this.#supportsAbbExec = adb.supportsFeature(AdbFeature.AbbExec); + this.#supportsShellV2 = adb.canUseFeature(AdbFeature.ShellV2); + this.#supportsCmd = adb.canUseFeature(AdbFeature.Cmd); + this.#supportsAbb = adb.canUseFeature(AdbFeature.Abb); + this.#supportsAbbExec = adb.canUseFeature(AdbFeature.AbbExec); } async spawn(