Skip to content

Commit

Permalink
Long polling (#541)
Browse files Browse the repository at this point in the history
* JST-83: Add long-polling mechanism to payments, demand and activity
* JST-167: Disabled axios timeout. Fixed destroying httpAgent in payments.
* JST-83: Fixed lint
* JST-83: Added tsconfig to tests (for ts-jest)
* JST-83: Fixed unit tests with mocked long polling
* JST-83: Fixed error handling for long polling requests
* JST-83: Fixed logging
  • Loading branch information
mgordel authored Aug 16, 2023
1 parent 62276b1 commit d82e8cb
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 113 deletions.
8 changes: 6 additions & 2 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class Activity {
const maxRetries = 5;
const { id: activityId, agreementId } = this;
const isRunning = () => this.isRunning;
const { activityExecuteTimeout, api, activityExeBatchResultsFetchInterval, eventTarget } = this.options;
const { activityExecuteTimeout, api, eventTarget } = this.options;
const handleError = this.handleError.bind(this);
return new Readable({
objectMode: true,
Expand All @@ -191,6 +191,11 @@ export class Activity {
const { data: results }: { data: Result[] } = (await api.control.getExecBatchResults(
activityId,
batchId,
undefined,
activityExecuteTimeout / 1000,
{
timeout: 0,
},
)) as unknown as { data: Result[] };
retryCount = 0;
const newResults = results.slice(lastIndex + 1);
Expand All @@ -201,7 +206,6 @@ export class Activity {
lastIndex = result.index;
});
}
if (!isBatchFinished) await sleep(activityExeBatchResultsFetchInterval, true);
} catch (error) {
try {
retryCount = await handleError(error, lastIndex, retryCount, maxRetries);
Expand Down
2 changes: 1 addition & 1 deletion src/activity/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Agent } from "http";
const DEFAULTS = {
activityRequestTimeout: 10000,
activityExecuteTimeout: 1000 * 60 * 5, // 5 min,
activityExeBatchResultsFetchInterval: 3000,
activityExeBatchResultsFetchInterval: 20000,
};

/**
Expand Down
3 changes: 1 addition & 2 deletions src/market/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const DEFAULTS = {
subnetTag: "public",
marketTimeout: 1000 * 60 * 3, // 3 min,
maxOfferEvents: 10,
offerFetchingInterval: 10000,
offerFetchingInterval: 20000,
marketOfferExpiration: 1000 * 60 * 30, // 30 min
debitNotesAcceptanceTimeout: 30,
proposalFilter: acceptAllProposalFilter(),
Expand All @@ -28,7 +28,6 @@ export class DemandConfig {
public readonly subnetTag: string;
public readonly maxOfferEvents: number;
public readonly offerFetchingInterval: number;
public readonly proposalTimeout?: number;
public readonly logger?: Logger;
public readonly eventTarget?: EventTarget;
public readonly httpAgent: Agent;
Expand Down
23 changes: 16 additions & 7 deletions src/market/demand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ export class Demand extends EventTarget {
private async subscribe() {
while (this.isRunning) {
try {
const { data: events } = await this.options.api.collectOffers(this.id, 3, this.options.maxOfferEvents, {
timeout: 5000,
});
const { data: events } = await this.options.api.collectOffers(
this.id,
this.options.offerFetchingInterval / 1000,
this.options.maxOfferEvents,
{
timeout: 0,
},
);
for (const event of events as Array<ProposalEvent & ProposalRejectedEvent>) {
if (event.eventType === "ProposalRejectedEvent") {
this.logger?.debug(`Proposal rejected. Reason: ${event.reason?.message}`);
Expand Down Expand Up @@ -141,14 +146,18 @@ export class Demand extends EventTarget {
const reason = error.response?.data?.message || error;
this.options.eventTarget?.dispatchEvent(new Events.CollectFailed({ id: this.id, reason }));
this.logger?.warn(`Unable to collect offers. ${reason}`);
if (error.response?.status === 404) {
if (error.code === "ECONNREFUSED" || error.response?.status === 404) {
this.dispatchEvent(
new DemandEvent(DemandEventType, undefined, new Error(`Subscription expired. ${reason}`)),
new DemandEvent(
DemandEventType,
undefined,
new Error(`${error.code === "ECONNREFUSED" ? "Yagna connection error." : "Demand expired."} ${reason}`),
),
);
break;
}
await sleep(2);
}
} finally {
await sleep(this.options.offerFetchingInterval, true);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/market/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ export class MarketService {

private demandEventListener(event: Event) {
const proposal = (event as DemandEvent).proposal;
if ((event as DemandEvent).error) {
this.logger?.error("Subscription expired. Trying to subscribe a new one...");
const error = (event as DemandEvent).error;
if (error) {
this.logger?.error("Subscription failed. Trying to subscribe a new one...");
this.resubscribeDemand().catch((e) => this.logger?.warn(e));
return;
}
Expand Down
12 changes: 3 additions & 9 deletions src/payment/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import { Agent } from "http";
const DEFAULTS = Object.freeze({
payment: { network: "goerli", driver: "erc20" },
budget: 1.0,
paymentTimeout: 1000 * 60 * 2, // 2 min
paymentTimeout: 1000 * 60, // 1 min
allocationExpires: 1000 * 60 * 60, // 60 min
invoiceReceiveTimeout: 1000 * 60 * 5, // 5 min
maxInvoiceEvents: 500,
maxDebitNotesEvents: 500,
invoiceFetchingInterval: 2000,
debitNotesFetchingInterval: 2000,
payingInterval: 2000,
paymentRequestTimeout: 10000,
invoiceFetchingInterval: 20000,
debitNotesFetchingInterval: 20000,
debitNoteFilter: acceptAllDebitNotesFilter(),
invoiceFilter: acceptAllInvoicesFilter(),
});
Expand All @@ -43,7 +41,6 @@ abstract class BaseConfig {
public readonly logger?: Logger;
public readonly eventTarget?: EventTarget;
public readonly payment: { driver: string; network: string };
public readonly paymentRequestTimeout: number;
public readonly httpAgent: Agent;

constructor(public readonly options?: BasePaymentOptions) {
Expand All @@ -66,7 +63,6 @@ abstract class BaseConfig {
};
this.logger = options?.logger;
this.eventTarget = options?.eventTarget;
this.paymentRequestTimeout = options?.paymentRequestTimeout || DEFAULTS.paymentRequestTimeout;
}
}
/**
Expand All @@ -75,7 +71,6 @@ abstract class BaseConfig {
export class PaymentConfig extends BaseConfig {
public readonly invoiceFetchingInterval: number;
public readonly debitNotesFetchingInterval: number;
public readonly payingInterval: number;
public readonly maxInvoiceEvents: number;
public readonly maxDebitNotesEvents: number;
public readonly debitNoteFilter: DebitNoteFilter;
Expand All @@ -87,7 +82,6 @@ export class PaymentConfig extends BaseConfig {
this.debitNotesFetchingInterval = options?.debitNotesFetchingInterval ?? DEFAULTS.debitNotesFetchingInterval;
this.maxInvoiceEvents = options?.maxInvoiceEvents ?? DEFAULTS.maxInvoiceEvents;
this.maxDebitNotesEvents = options?.maxDebitNotesEvents ?? DEFAULTS.maxDebitNotesEvents;
this.payingInterval = options?.payingInterval ?? DEFAULTS.payingInterval;
this.debitNoteFilter = options?.debitNotesFilter ?? DEFAULTS.debitNoteFilter;
this.invoiceFilter = options?.invoiceFilter ?? DEFAULTS.invoiceFilter;
}
Expand Down
123 changes: 64 additions & 59 deletions src/payment/payments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { Events } from "../events";
export interface PaymentOptions extends BasePaymentOptions {
invoiceFetchingInterval?: number;
debitNotesFetchingInterval?: number;
payingInterval?: number;
maxInvoiceEvents?: number;
maxDebitNotesEvents?: number;
}
Expand All @@ -21,7 +20,7 @@ export class Payments extends EventTarget {
private lastInvoiceFetchingTime: string = new Date().toISOString();
private lastDebitNotesFetchingTime: string = new Date().toISOString();
static async create(options?: PaymentOptions) {
return new Payments(new PaymentConfig(options));
return new Payments(options);
}

constructor(options?: PaymentOptions) {
Expand All @@ -36,85 +35,91 @@ export class Payments extends EventTarget {
*/
async unsubscribe() {
this.isRunning = false;
this.removeEventListener(PaymentEventType, null);
this.options.httpAgent.destroy?.();
this.logger?.debug(`Payments unsubscribed`);
}

private async subscribe() {
this.subscribeForInvoices().catch(
(e) => this.logger?.error(`Unable to collect invoices. ${e?.response?.data?.message || e}`),
(e) => this.logger?.debug(`Unable to collect invoices. ${e?.response?.data?.message || e}`),
);
this.subscribeForDebitNotes().catch(
(e) => this.logger?.error(`Unable to collect debit notes. ${e?.response?.data?.message || e}`),
(e) => this.logger?.debug(`Unable to collect debit notes. ${e?.response?.data?.message || e}`),
);
}

private async subscribeForInvoices() {
while (this.isRunning) {
const { data: invoiceEvents } = await this.options.api
.getInvoiceEvents(
this.options.paymentRequestTimeout / 1000,
try {
const { data: invoiceEvents } = await this.options.api.getInvoiceEvents(
this.options.invoiceFetchingInterval / 1000,
this.lastInvoiceFetchingTime,
this.options.maxInvoiceEvents,
)
.catch((e) => {
this.logger?.error(`Unable to collect invoices. ${e?.response?.data?.message || e}`);
return { data: [] };
});
for (const event of invoiceEvents) {
if (event.eventType !== "InvoiceReceivedEvent") continue;
const invoice = await Invoice.create(event["invoiceId"], { ...this.options.options }).catch(
(e) =>
this.logger?.error(
`Unable to create invoice ID: ${event["invoiceId"]}. ${e?.response?.data?.message || e}`,
),
undefined,
{ timeout: 0 },
);
if (!invoice) continue;
this.dispatchEvent(new InvoiceEvent(PaymentEventType, invoice));
this.lastInvoiceFetchingTime = event.eventDate;
this.options.eventTarget?.dispatchEvent(new Events.InvoiceReceived(invoice));
this.logger?.debug(`New Invoice received for agreement ${invoice.agreementId}. Amount: ${invoice.amount}`);
for (const event of invoiceEvents) {
if (event.eventType !== "InvoiceReceivedEvent") continue;
const invoice = await Invoice.create(event["invoiceId"], { ...this.options.options }).catch(
(e) =>
this.logger?.error(
`Unable to create invoice ID: ${event["invoiceId"]}. ${e?.response?.data?.message || e}`,
),
);
if (!invoice) continue;
this.dispatchEvent(new InvoiceEvent(PaymentEventType, invoice));
this.lastInvoiceFetchingTime = event.eventDate;
this.options.eventTarget?.dispatchEvent(new Events.InvoiceReceived(invoice));
this.logger?.debug(`New Invoice received for agreement ${invoice.agreementId}. Amount: ${invoice.amount}`);
}
} catch (error) {
const reason = error.response?.data?.message || error;
this.logger?.debug(`Unable to get invoices. ${reason}`);
await sleep(2);
}
await sleep(this.options.invoiceFetchingInterval, true);
}
}

private async subscribeForDebitNotes() {
while (this.isRunning) {
const { data: debitNotesEvents } = await this.options.api
.getDebitNoteEvents(
this.options.paymentRequestTimeout / 1000,
this.lastDebitNotesFetchingTime,
this.options.maxDebitNotesEvents,
)
.catch((e) => {
this.logger?.error(`Unable to collect debit notes. ${e?.response?.data?.message || e}`);
return { data: [] };
});
for (const event of debitNotesEvents) {
if (event.eventType !== "DebitNoteReceivedEvent") continue;
const debitNote = await DebitNote.create(event["debitNoteId"], { ...this.options.options }).catch(
(e) =>
this.logger?.error(
`Unable to create debit note ID: ${event["debitNoteId"]}. ${e?.response?.data?.message || e}`,
),
);
if (!debitNote) continue;
this.dispatchEvent(new DebitNoteEvent(PaymentEventType, debitNote));
this.lastDebitNotesFetchingTime = event.eventDate;
this.options.eventTarget?.dispatchEvent(
new Events.DebitNoteReceived({
id: debitNote.id,
agreementId: debitNote.agreementId,
activityId: debitNote.activityId,
amount: debitNote.totalAmountDue,
}),
);
this.logger?.debug(
`New Debit Note received for agreement ${debitNote.agreementId}. Amount: ${debitNote.totalAmountDue}`,
);
try {
const { data: debitNotesEvents } = await this.options.api
.getDebitNoteEvents(
this.options.debitNotesFetchingInterval / 1000,
this.lastDebitNotesFetchingTime,
this.options.maxDebitNotesEvents,
undefined,
{ timeout: 0 },
)
.catch(() => ({ data: [] }));
for (const event of debitNotesEvents) {
if (event.eventType !== "DebitNoteReceivedEvent") continue;
const debitNote = await DebitNote.create(event["debitNoteId"], { ...this.options.options }).catch(
(e) =>
this.logger?.error(
`Unable to create debit note ID: ${event["debitNoteId"]}. ${e?.response?.data?.message || e}`,
),
);
if (!debitNote) continue;
this.dispatchEvent(new DebitNoteEvent(PaymentEventType, debitNote));
this.lastDebitNotesFetchingTime = event.eventDate;
this.options.eventTarget?.dispatchEvent(
new Events.DebitNoteReceived({
id: debitNote.id,
agreementId: debitNote.agreementId,
activityId: debitNote.activityId,
amount: debitNote.totalAmountDue,
}),
);
this.logger?.debug(
`New Debit Note received for agreement ${debitNote.agreementId}. Amount: ${debitNote.totalAmountDue}`,
);
}
} catch (error) {
const reason = error.response?.data?.message || error;
this.logger?.debug(`Unable to get debit notes. ${reason}`);
await sleep(2);
}
await sleep(this.options.debitNotesFetchingInterval, true);
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/payment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ export interface PaymentOptions extends BasePaymentOptions {
invoiceFetchingInterval?: number;
/** Interval for checking new debit notes */
debitNotesFetchingInterval?: number;
/** Interval for processing payments */
payingInterval?: number;
/** Maximum number of invoice events per one fetching */
maxInvoiceEvents?: number;
/** Maximum number of debit notes events per one fetching */
Expand Down
2 changes: 1 addition & 1 deletion tests/mock/rest/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { RequestorApi } from "ya-ts-client/dist/ya-market/src/api/requestor-api"
import { AgreementProposal } from "ya-ts-client/dist/ya-market/src/models";
import { AxiosError, AxiosRequestConfig, AxiosResponse } from "axios";
import { v4 as uuidv4 } from "uuid";
import { DemandOfferBase, Event, ProposalEvent } from "ya-ts-client/dist/ya-market/src/models";
import { DemandOfferBase, Event } from "ya-ts-client/dist/ya-market/src/models";
import { agreementsApproved, proposalsDraft, proposalsInitial } from "../fixtures";
import { sleep } from "../../../src/utils";

Expand Down
11 changes: 9 additions & 2 deletions tests/mock/rest/payment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from "ya-ts-client/dist/ya-payment/src/models";
import { allocations, debitNotesEvents, debitNotes, invoiceEvents, invoices } from "../fixtures";
import { Rejection } from "ya-ts-client/dist/ya-payment/src/models";
import { sleep } from "../../../src/utils";

global.expectedEvents = [];
global.expectedInvoices = [];
Expand Down Expand Up @@ -57,7 +58,10 @@ export class PaymentApiMock extends RequestorApi {
appSessionId?: string,
options?: AxiosRequestConfig,
): Promise<import("axios").AxiosResponse<InvoiceEvent[]>> {
return new Promise((res) => res({ data: global.expectedEvents } as AxiosResponse));
return new Promise(async (res) => {
await sleep(100, true);
res({ data: global.expectedEvents } as AxiosResponse);
});
}

// @ts-ignore
Expand All @@ -73,7 +77,10 @@ export class PaymentApiMock extends RequestorApi {
appSessionId?: string,
options?: AxiosRequestConfig,
): Promise<import("axios").AxiosResponse<DebitNoteEvent[]>> {
return new Promise((res) => res({ data: global.expectedEvents } as AxiosResponse));
return new Promise(async (res) => {
await sleep(100, true);
res({ data: global.expectedEvents } as AxiosResponse);
});
}

// @ts-ignore
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/market_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe("Market Service", () => {
setExpectedProposals(proposalsDraft);
await logger.expectToInclude("Proposal has been confirmed", 10);
const addedProposalsIds = agreementPoolServiceMock["getProposals"]().map((p) => p.id);
expect(addedProposalsIds).toEqual(proposalsDraft.map((p) => p.proposal.proposalId));
expect(addedProposalsIds).toEqual(expect.arrayContaining(proposalsDraft.map((p) => p.proposal.proposalId)));
await marketService.end();
});

Expand Down
Loading

0 comments on commit d82e8cb

Please sign in to comment.