Skip to content

Commit

Permalink
Merge pull request #22 from delta-mpc/0.3.5
Browse files Browse the repository at this point in the history
add dst to subscribe stream for debug
  • Loading branch information
mh739025250 authored Mar 23, 2022
2 parents 2975e7d + 0f6278c commit c6cbd14
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/impl/chain/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class _Impl implements Impl {
await this.hflContract.init();
}

subscribe(timeout: number): Readable {
subscribe(address: string, timeout: number): Readable {
const src = this.hflContract.subscribe();
src.on("data", (event: EventData) => {
const res = event.returnValues;
Expand Down Expand Up @@ -133,7 +133,7 @@ class _Impl implements Impl {
break;
}
});
const res = this.subscriber.subscribe(timeout);
const res = this.subscriber.subscribe(address, timeout);
this.subscribeMap.set(res, src);
return res;
}
Expand Down
61 changes: 40 additions & 21 deletions src/impl/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PassThrough, Readable } from "stream";
import log from "~/log";

export interface TaskCreatedEvent {
type: "TaskCreated";
Expand Down Expand Up @@ -62,39 +63,57 @@ export type Event =
| TaskFinishedEvent
| HeartBeatEvent;

export class Subscriber {
private streams: PassThrough[] = [];
private timers: (NodeJS.Timer | null)[] = [];
class EventStream {
public readonly stream: PassThrough;
private dst: string;
private timer: NodeJS.Timer | null;

subscribe(timeout: number): Readable {
const stream = new PassThrough({ objectMode: true });
this.streams.push(stream);
constructor(dst: string, timeout: number = 0) {
this.stream = new PassThrough({ objectMode: true });
this.dst = dst;
if (timeout > 0) {
const timer = setInterval(() => {
stream.write({ type: "Heartbeat" });
this.timer = setInterval(() => {
this.stream.write({ type: "Heartbeat" });
}, timeout * 1000);
this.timers.push(timer);
} else {
this.timers.push(null);
this.timer = null;
}
}

write(event: Event) {
log.debug(`send event ${event.type} to ${this.dst}`);
this.stream.write(event);
}

destroy() {
if (this.timer) {
clearInterval(this.timer);
}
return stream;
this.stream.destroy();
log.debug(`destroy event stream for ${this.dst}`);
}
}

export class Subscriber {
private streamMap: Map<Readable, EventStream> = new Map<Readable, EventStream>();

subscribe(address: string, timeout: number): Readable {
const stream = new EventStream(address, timeout);
this.streamMap.set(stream.stream, stream);
return stream.stream;
}

unsubscribe(stream: Readable): void {
if (this.streams.length > 0) {
const i = this.streams.indexOf(stream as PassThrough);
this.streams.splice(i);
const timer = this.timers[i];
if (timer) {
clearInterval(timer);
}
this.timers.splice(i);
const eventStream = this.streamMap.get(stream);
if (eventStream) {
eventStream.destroy();
this.streamMap.delete(stream);
}
}

publish(event: Event): void {
for (const stream of this.streams) {
stream.write(event);
for (const eventStrem of this.streamMap.values()) {
eventStrem.write(event);
}
}
}
4 changes: 2 additions & 2 deletions src/impl/monkey/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ class _Impl implements Impl {
return randomHex(32);
}

subscribe(timeout: number): Readable {
return this.subscriber.subscribe(timeout);
subscribe(address: string, timeout: number): Readable {
return this.subscriber.subscribe(address, timeout);
}

unsubscribe(stream: Readable) {
Expand Down
2 changes: 1 addition & 1 deletion src/impl/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ export interface Impl {
receiver: string
): Promise<SecretShareData[]>;
endRound(address: string, taskID: string, round: number): Promise<string>;
subscribe(timeout: number): Readable;
subscribe(address: string, timeout: number): Readable;
unsubscribe(stream: Readable): void;
}
2 changes: 1 addition & 1 deletion src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ export const chainService: ChainHandlers = {
const address = call.request.address;
const timeout = call.request.timeout;
log.info(`node ${address} subscribe`);
const stream = impl.subscribe(timeout);
const stream = impl.subscribe(address, timeout);
stream.on("data", (event: ImplEvent) => {
switch (event.type) {
case "TaskCreated":
Expand Down

0 comments on commit c6cbd14

Please sign in to comment.