Skip to content

Commit

Permalink
feat(adb): support delayed ack
Browse files Browse the repository at this point in the history
  • Loading branch information
yume-chan committed Feb 1, 2024
1 parent 59d78da commit ac6dc1e
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 61 deletions.
17 changes: 15 additions & 2 deletions libraries/adb/src/adb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export interface AdbTransport extends Closeable {

readonly disconnected: Promise<void>;

readonly clientFeatures: readonly AdbFeature[];

connect(service: string): ValueOrPromise<AdbSocket>;

addReverseTunnel(
Expand Down Expand Up @@ -71,6 +73,14 @@ export class Adb implements Closeable {
return this.transport.disconnected;
}

public get clientFeatures() {
return this.transport.clientFeatures;
}

public get deviceFeatures() {
return this.banner.features;
}

readonly subprocess: AdbSubprocess;
readonly power: AdbPower;
readonly reverse: AdbReverseCommand;
Expand All @@ -85,8 +95,11 @@ export class Adb implements Closeable {
this.tcpip = new AdbTcpIpCommand(this);
}

supportsFeature(feature: AdbFeature): boolean {
return this.banner.features.includes(feature);
canUseFeature(feature: AdbFeature): boolean {
return (
this.clientFeatures.includes(feature) &&
this.deviceFeatures.includes(feature)
);
}

async createSocket(service: string): Promise<AdbSocket> {
Expand Down
2 changes: 1 addition & 1 deletion libraries/adb/src/commands/subprocess/protocols/shell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type AdbShellProtocolPacket = StructValueType<typeof AdbShellProtocolPacket>;
*/
export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol {
static isSupported(adb: Adb) {
return adb.supportsFeature(AdbFeature.ShellV2);
return adb.canUseFeature(AdbFeature.ShellV2);
}

static async pty(adb: Adb, command: string) {
Expand Down
11 changes: 5 additions & 6 deletions libraries/adb/src/commands/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,15 @@ export class AdbSync extends AutoDisposable {
this._adb = adb;
this._socket = new AdbSyncSocket(socket, adb.maxPayloadSize);

this.#supportsStat = adb.supportsFeature(AdbFeature.StatV2);
this.#supportsListV2 = adb.supportsFeature(AdbFeature.ListV2);
this.#fixedPushMkdir = adb.supportsFeature(AdbFeature.FixedPushMkdir);
this.#supportsSendReceiveV2 = adb.supportsFeature(
this.#supportsStat = adb.canUseFeature(AdbFeature.StatV2);
this.#supportsListV2 = adb.canUseFeature(AdbFeature.ListV2);
this.#fixedPushMkdir = adb.canUseFeature(AdbFeature.FixedPushMkdir);
this.#supportsSendReceiveV2 = adb.canUseFeature(
AdbFeature.SendReceiveV2,
);
// https://android.googlesource.com/platform/packages/modules/adb/+/91768a57b7138166e0a3d11f79cd55909dda7014/client/file_sync_client.cpp#1361
this.#needPushMkdirWorkaround =
this._adb.supportsFeature(AdbFeature.ShellV2) &&
!this.fixedPushMkdir;
this._adb.canUseFeature(AdbFeature.ShellV2) && !this.fixedPushMkdir;
}

/**
Expand Down
101 changes: 88 additions & 13 deletions libraries/adb/src/daemon/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ConsumableWritableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";
import { EMPTY_UINT8_ARRAY, NumberFieldType } from "@yume-chan/struct";

import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js";
import { decodeUtf8, encodeUtf8 } from "../utils/index.js";
Expand All @@ -32,13 +32,25 @@ export interface AdbPacketDispatcherOptions {
*/
appendNullToServiceString: boolean;
maxPayloadSize: number;
/**
* The number of bytes the device can send before receiving an ack packet.
* Set to 0 or any negative value to disable delayed ack.
* Otherwise the value must be in the range of unsigned 32-bit integer.
*/
initialDelayedAckBytes: number;
/**
* Whether to preserve the connection open after the `AdbPacketDispatcher` is closed.
*/
preserveConnection?: boolean | undefined;
debugSlowRead?: boolean | undefined;
}

interface SocketOpenResult {
remoteId: number;
availableWriteBytes: number;
}

/**
* The dispatcher is the "dumb" part of the connection handling logic.
*
Expand Down Expand Up @@ -79,6 +91,10 @@ export class AdbPacketDispatcher implements Closeable {
options: AdbPacketDispatcherOptions,
) {
this.options = options;
// Don't allow negative values in dispatcher
if (this.options.initialDelayedAckBytes < 0) {
this.options.initialDelayedAckBytes = 0;
}

connection.readable
.pipeTo(
Expand Down Expand Up @@ -169,15 +185,39 @@ export class AdbPacketDispatcher implements Closeable {
}

#handleOkay(packet: AdbPacketData) {
if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
let ackBytes: number;
if (this.options.initialDelayedAckBytes !== 0) {
if (packet.payload.byteLength !== 4) {
throw new Error(
"Invalid OKAY packet. Payload size should be 4",
);
}
ackBytes = NumberFieldType.Uint32.deserialize(packet.payload, true);
} else {
if (packet.payload.byteLength !== 0) {
throw new Error(
"Invalid OKAY packet. Payload size should be 0",
);
}
ackBytes = Infinity;
}

if (
this.#initializers.resolve(packet.arg1, {
remoteId: packet.arg0,
availableWriteBytes: ackBytes,
} satisfies SocketOpenResult)
) {
// Device successfully created the socket
return;
}

const socket = this.#sockets.get(packet.arg1);
if (socket) {
// Device has received last `WRTE` to the socket
socket.ack();
// When delayed ack is enabled, device has received `ackBytes` from the socket.
// When delayed ack is disabled, device has received last `WRTE` packet from the socket,
// `ackBytes` is `Infinity` in this case.
socket.ack(ackBytes);
return;
}

Expand All @@ -186,16 +226,39 @@ export class AdbPacketDispatcher implements Closeable {
void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
}

#sendOkay(localId: number, remoteId: number, ackBytes: number) {
let payload: Uint8Array;
if (this.options.initialDelayedAckBytes !== 0) {
payload = new Uint8Array(4);
new DataView(payload.buffer).setUint32(0, ackBytes, true);
} else {
payload = EMPTY_UINT8_ARRAY;
}

return this.sendPacket(AdbCommand.Okay, localId, remoteId, payload);
}

async #handleOpen(packet: AdbPacketData) {
// `AsyncOperationManager` doesn't support skipping IDs
// Use `add` + `resolve` to simulate this behavior
const [localId] = this.#initializers.add<number>();
this.#initializers.resolve(localId, undefined);

const remoteId = packet.arg0;
let service = decodeUtf8(packet.payload);
if (service.endsWith("\0")) {
service = service.substring(0, service.length - 1);
let initialDelayedAckBytes = packet.arg1;
const service = decodeUtf8(packet.payload);

if (this.options.initialDelayedAckBytes === 0) {
if (initialDelayedAckBytes !== 0) {
throw new Error("Invalid OPEN packet. arg1 should be 0");
}
initialDelayedAckBytes = Infinity;
} else {
if (initialDelayedAckBytes === 0) {
throw new Error(
"Invalid OPEN packet. arg1 should be greater than 0",
);
}
}

const handler = this.#incomingSocketHandlers.get(service);
Expand All @@ -211,11 +274,16 @@ export class AdbPacketDispatcher implements Closeable {
localCreated: false,
service,
});
controller.ack(initialDelayedAckBytes);

try {
await handler(controller.socket);
this.#sockets.set(localId, controller);
await this.sendPacket(AdbCommand.Okay, localId, remoteId);
await this.#sendOkay(
localId,
remoteId,
this.options.initialDelayedAckBytes,
);
} catch (e) {
await this.sendPacket(AdbCommand.Close, 0, remoteId);
}
Expand All @@ -238,10 +306,10 @@ export class AdbPacketDispatcher implements Closeable {
}),
(async () => {
await socket.enqueue(packet.payload);
await this.sendPacket(
AdbCommand.Okay,
await this.#sendOkay(
packet.arg1,
packet.arg0,
packet.payload.length,
);
handled = true;
})(),
Expand All @@ -255,18 +323,25 @@ export class AdbPacketDispatcher implements Closeable {
service += "\0";
}

const [localId, initializer] = this.#initializers.add<number>();
await this.sendPacket(AdbCommand.Open, localId, 0, service);
const [localId, initializer] =
this.#initializers.add<SocketOpenResult>();
await this.sendPacket(
AdbCommand.Open,
localId,
this.options.initialDelayedAckBytes,
service,
);

// Fulfilled by `handleOk`
const remoteId = await initializer;
const { remoteId, availableWriteBytes } = await initializer;
const controller = new AdbDaemonSocketController({
dispatcher: this,
localId,
remoteId,
localCreated: true,
service,
});
controller.ack(availableWriteBytes);
this.#sockets.set(localId, controller);

return controller.socket;
Expand Down
52 changes: 42 additions & 10 deletions libraries/adb/src/daemon/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export class AdbDaemonSocketController
return this.#readable;
}

#writePromise: PromiseResolver<void> | undefined;
#writableController!: WritableStreamDefaultController;
readonly writable: WritableStream<Consumable<Uint8Array>>;

Expand All @@ -65,6 +64,23 @@ export class AdbDaemonSocketController
return this.#socket;
}

#availableWriteBytesChanged: PromiseResolver<void> | undefined;
/**
* When delayed ack is disabled, can be `Infinity` if the socket is ready to write.
* Exactly one packet can be written no matter how large it is. Or `-1` if the socket
* is waiting for ack.
*
* When delayed ack is enabled, a non-negative finite number indicates the number of
* bytes that can be written to the socket before receiving an ack.
*/
#availableWriteBytes = 0;
/**
* Gets the number of bytes that can be written to the socket without blocking.
*/
public get availableWriteBytes() {
return this.#availableWriteBytes;
}

constructor(options: AdbDaemonSocketConstructionOptions) {
this.#dispatcher = options.dispatcher;
this.localId = options.localId;
Expand All @@ -88,17 +104,30 @@ export class AdbDaemonSocketController
start < size;
start = end, end += chunkSize
) {
this.#writePromise = new PromiseResolver();
const chunk = data.subarray(start, end);
const length = chunk.byteLength;
while (this.#availableWriteBytes < length) {
// Only one lock is required because Web Streams API guarantees
// that `write` is not reentrant.
this.#availableWriteBytesChanged =
new PromiseResolver();
await raceSignal(
() => this.#availableWriteBytesChanged!.promise,
controller.signal,
);
}

if (this.#availableWriteBytes === Infinity) {
this.#availableWriteBytes = -1;
} else {
this.#availableWriteBytes -= length;
}

await this.#dispatcher.sendPacket(
AdbCommand.Write,
this.localId,
this.remoteId,
data.subarray(start, end),
);
// Wait for ack packet
await raceSignal(
() => this.#writePromise!.promise,
controller.signal,
chunk,
);
}
},
Expand All @@ -124,8 +153,9 @@ export class AdbDaemonSocketController
}
}

ack() {
this.#writePromise?.resolve();
public ack(bytes: number) {
this.#availableWriteBytes += bytes;
this.#availableWriteBytesChanged?.resolve();
}

async close(): Promise<void> {
Expand All @@ -134,6 +164,8 @@ export class AdbDaemonSocketController
}
this.#closed = true;

this.#availableWriteBytesChanged?.reject(new Error("Socket closed"));

try {
this.#writableController.error(new Error("Socket closed"));
} catch {
Expand Down
Loading

0 comments on commit ac6dc1e

Please sign in to comment.