From 66977c8628e2e6c45843bfcd734070fcfc5005ea Mon Sep 17 00:00:00 2001 From: Vladimir Date: Tue, 31 May 2022 16:36:50 +0300 Subject: [PATCH] Fixed reverted PR: Choose optimal sync modes by default on UI (#12770) * Revert "Revert "Choose optimal sync modes by default on UI (#12411)" (#12583)" This reverts commit 9789ffdaf2f6d8f7b7e4a07910ad8322ab46595d. * get optimal sync mode only in creation mode * sort sync mode options by priority * add tests for calculateInitialCatalog * remove duplicated tests * fixed one-line 'if' * move calculationInitialCatalog function to file and export it as default * - fix merge conflicts - fix Type * update tests --- airbyte-webapp/package.json | 1 + .../components/TransformationView.tsx | 4 +- .../Connection/CatalogTree/CatalogSection.tsx | 1 + .../CatalogTree/StreamFieldTable.tsx | 2 +- .../calculateInitialCatalog.test.ts | 394 ++++++++++++++++++ .../ConnectionForm/calculateInitialCatalog.ts | 111 +++++ .../Connection/ConnectionForm/formConfig.tsx | 75 +--- 7 files changed, 524 insertions(+), 64 deletions(-) create mode 100644 airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts create mode 100644 airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts diff --git a/airbyte-webapp/package.json b/airbyte-webapp/package.json index b69544559f9a..8d41ad3b300b 100644 --- a/airbyte-webapp/package.json +++ b/airbyte-webapp/package.json @@ -9,6 +9,7 @@ "start": "react-scripts start", "build": "react-scripts build", "test": "react-scripts test", + "test:coverage": "npm test -- --coverage --watchAll=false", "format": "prettier --write 'src/**/*.{ts,tsx}'", "storybook": "start-storybook -p 9009 -s public --quiet", "lint": "eslint --ext js,ts,tsx src", diff --git a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/TransformationView.tsx b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/TransformationView.tsx index fe3d5901845a..ee638c22a019 100644 --- a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/TransformationView.tsx +++ b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/TransformationView.tsx @@ -50,7 +50,7 @@ const NoSupportedTransformationCard = styled(ContentCard)` `; const CustomTransformationsCard: React.FC<{ - operations?: OperationRead[]; + operations?: OperationCreate[]; onSubmit: FormikOnSubmit<{ transformations?: OperationRead[] }>; mode: ConnectionFormMode; }> = ({ operations, onSubmit, mode }) => { @@ -58,7 +58,7 @@ const CustomTransformationsCard: React.FC<{ const initialValues = useMemo( () => ({ - transformations: getInitialTransformations(operations), + transformations: getInitialTransformations(operations || []), }), [operations] ); diff --git a/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx b/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx index bdfc3f506de3..6bb927875cce 100644 --- a/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx +++ b/airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx @@ -26,6 +26,7 @@ import { flatten, getPathType } from "./utils"; const Section = styled.div<{ error?: boolean; isSelected: boolean }>` border: 1px solid ${(props) => (props.error ? props.theme.dangerColor : "none")}; background: ${({ theme, isSelected }) => (isSelected ? "rgba(97, 94, 255, 0.1);" : theme.greyColor0)}; + padding: 2px; &:first-child { border-radius: 8px 8px 0 0; diff --git a/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx b/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx index 36b975d18cfe..ed04071a5032 100644 --- a/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx +++ b/airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx @@ -39,7 +39,7 @@ export const StreamFieldTable: React.FC = (props) => { {props.syncSchemaFields.map((field) => ( - + { + test("should assign ids to all streams", () => { + const { id, ...restProps } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [restProps], + } as unknown as SyncSchema, + [], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("id", "0"); + }); + }); + + test("should set default 'FullRefresh' if 'supportedSyncModes' in stream is empty(or null)", () => { + const { config, stream } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + supportedSyncModes: null, + }, + config, + }, + ], + } as unknown as SyncSchema, + [], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("stream.supportedSyncModes", [SyncMode.full_refresh]); + }); + }); + + test("should select 'Incremental(cursor defined) => Append Dedup'", () => { + const { config, stream } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + sourceDefinedCursor: true, + defaultCursorField: ["id"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "2", + stream: { + ...stream, + name: "test", + sourceDefinedCursor: true, + defaultCursorField: ["updated_at"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "3", + stream: { + ...stream, + name: "test", + sourceDefinedCursor: true, + defaultCursorField: ["name"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append, + syncMode: SyncMode.full_refresh, + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("config.syncMode", SyncMode.incremental); + expect(stream).toHaveProperty("config.destinationSyncMode", DestinationSyncMode.append_dedup); + }); + }); + + test("should select 'Full Refresh => Overwrite'", () => { + const { config, stream } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.incremental, + }, + }, + { + id: "2", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "3", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append, + syncMode: SyncMode.full_refresh, + }, + }, + ], + }, + [DestinationSyncMode.overwrite], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("config.syncMode", SyncMode.full_refresh); + expect(stream).toHaveProperty("config.destinationSyncMode", DestinationSyncMode.overwrite); + }); + }); + + test("should select 'Incremental => Append'", () => { + const { config, stream } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.incremental, + }, + }, + { + id: "2", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "3", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append, + syncMode: SyncMode.full_refresh, + }, + }, + ], + }, + [DestinationSyncMode.append], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("config.syncMode", SyncMode.incremental); + expect(stream).toHaveProperty("config.destinationSyncMode", DestinationSyncMode.append); + }); + }); + + test("should select 'Full Refresh => Append'", () => { + const { config, stream } = mockSyncSchemaStream; + + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.full_refresh], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.incremental, + }, + }, + { + id: "2", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.full_refresh], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + }, + }, + { + id: "3", + stream: { + ...stream, + name: "test", + supportedSyncModes: [SyncMode.full_refresh], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append, + syncMode: SyncMode.incremental, + }, + }, + ], + }, + [DestinationSyncMode.append], + false + ); + + values.streams.forEach((stream) => { + expect(stream).toHaveProperty("config.syncMode", SyncMode.full_refresh); + expect(stream).toHaveProperty("config.destinationSyncMode", DestinationSyncMode.append); + }); + }); + + test("should not change syncMode, destinationSyncMode and cursorField in EditMode", () => { + const { config, stream } = mockSyncSchemaStream; + + const { streams } = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + sourceDefinedCursor: true, + defaultCursorField: ["id"], + supportedSyncModes: [], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append, + syncMode: SyncMode.full_refresh, + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + true + ); + + expect(streams[0]).toHaveProperty("stream.supportedSyncModes", [SyncMode.full_refresh]); + + expect(streams[0]).toHaveProperty("config.cursorField", []); + expect(streams[0]).toHaveProperty("config.syncMode", SyncMode.full_refresh); + expect(streams[0]).toHaveProperty("config.destinationSyncMode", DestinationSyncMode.append); + }); + + test("should set the default cursorField value when it's available and no cursorField is selected", () => { + const { stream, config } = mockSyncSchemaStream; + + const { streams } = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...stream, + name: "test", + defaultCursorField: ["default_path"], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + cursorField: [], + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "2", + stream: { + ...stream, + name: "test", + defaultCursorField: ["default_path"], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + cursorField: ["selected_path"], + syncMode: SyncMode.full_refresh, + }, + }, + { + id: "3", + stream: { + ...stream, + name: "test", + defaultCursorField: [], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + cursorField: [], + syncMode: SyncMode.full_refresh, + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + false + ); + + expect(streams[0]).toHaveProperty("config.cursorField", ["default_path"]); + expect(streams[1]).toHaveProperty("config.cursorField", ["selected_path"]); + expect(streams[2]).toHaveProperty("config.cursorField", []); + }); +}); diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts new file mode 100644 index 000000000000..e12b6c1d786e --- /dev/null +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts @@ -0,0 +1,111 @@ +import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; +import { DestinationSyncMode, SyncMode, AirbyteStreamConfiguration } from "core/request/AirbyteClient"; + +const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => { + if (streamNode.stream?.defaultCursorField?.length) { + return streamNode.stream.defaultCursorField; + } + return streamNode.config?.cursorField || []; +}; + +const verifySupportedSyncModes = (streamNode: SyncSchemaStream): SyncSchemaStream => { + if (!streamNode.stream) { + return streamNode; + } + const { + stream: { supportedSyncModes }, + } = streamNode; + + if (supportedSyncModes?.length) { + return streamNode; + } + return { ...streamNode, stream: { ...streamNode.stream, supportedSyncModes: [SyncMode.full_refresh] } }; +}; + +const verifyConfigCursorField = (streamNode: SyncSchemaStream): SyncSchemaStream => { + if (!streamNode.config) { + return streamNode; + } + const { config } = streamNode; + + return { + ...streamNode, + config: { + ...config, + cursorField: config.cursorField?.length ? config.cursorField : getDefaultCursorField(streamNode), + }, + }; +}; + +const getOptimalSyncMode = ( + streamNode: SyncSchemaStream, + supportedDestinationSyncModes: DestinationSyncMode[] +): SyncSchemaStream => { + const updateStreamConfig = ( + config: Pick + ): SyncSchemaStream => ({ + ...streamNode, + config: { ...streamNode.config, ...config }, + }); + if (!streamNode.stream?.supportedSyncModes) { + return streamNode; + } + const { + stream: { supportedSyncModes, sourceDefinedCursor }, + } = streamNode; + + if ( + supportedSyncModes.includes(SyncMode.incremental) && + supportedDestinationSyncModes.includes(DestinationSyncMode.append_dedup) && + sourceDefinedCursor + ) { + return updateStreamConfig({ + syncMode: SyncMode.incremental, + destinationSyncMode: DestinationSyncMode.append_dedup, + }); + } + + if (supportedDestinationSyncModes.includes(DestinationSyncMode.overwrite)) { + return updateStreamConfig({ + syncMode: SyncMode.full_refresh, + destinationSyncMode: DestinationSyncMode.overwrite, + }); + } + + if ( + supportedSyncModes.includes(SyncMode.incremental) && + supportedDestinationSyncModes.includes(DestinationSyncMode.append) + ) { + return updateStreamConfig({ + syncMode: SyncMode.incremental, + destinationSyncMode: DestinationSyncMode.append, + }); + } + + if (supportedDestinationSyncModes.includes(DestinationSyncMode.append)) { + return updateStreamConfig({ + syncMode: SyncMode.full_refresh, + destinationSyncMode: DestinationSyncMode.append, + }); + } + return streamNode; +}; + +const calculateInitialCatalog = ( + schema: SyncSchema, + supportedDestinationSyncModes: DestinationSyncMode[], + isEditMode?: boolean +): SyncSchema => ({ + streams: schema.streams.map((apiNode, id) => { + const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; + const nodeStream = verifySupportedSyncModes(nodeWithId); + + if (isEditMode) { + return nodeStream; + } + + return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes); + }), +}); + +export default calculateInitialCatalog; diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx index 71c11f5f2f68..3f30bc0c3686 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx @@ -1,4 +1,3 @@ -import { setIn } from "formik"; import { useMemo } from "react"; import { useIntl } from "react-intl"; import * as yup from "yup"; @@ -6,18 +5,14 @@ import * as yup from "yup"; import { DropDownRow } from "components"; import FrequencyConfig from "config/FrequencyConfig.json"; -import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; +import { SyncSchema } from "core/domain/catalog"; import { isDbtTransformation, isNormalizationTransformation, NormalizationType, } from "core/domain/connection/operation"; import { SOURCE_NAMESPACE_TAG } from "core/domain/connector/source"; -import { ValuesProps } from "hooks/services/useConnectionHook"; -import { useCurrentWorkspace } from "services/workspaces/WorkspacesService"; - import { - AirbyteStreamConfiguration, ConnectionSchedule, DestinationDefinitionSpecificationRead, DestinationSyncMode, @@ -27,7 +22,11 @@ import { OperatorType, SyncMode, WebBackendConnectionRead, -} from "../../../core/request/AirbyteClient"; +} from "core/request/AirbyteClient"; +import { ValuesProps } from "hooks/services/useConnectionHook"; +import { useCurrentWorkspace } from "services/workspaces/WorkspacesService"; + +import calculateInitialCatalog from "./calculateInitialCatalog"; type FormikConnectionFormValues = { name?: string; @@ -43,10 +42,10 @@ type FormikConnectionFormValues = { type ConnectionFormValues = ValuesProps; const SUPPORTED_MODES: [SyncMode, DestinationSyncMode][] = [ + [SyncMode.incremental, DestinationSyncMode.append_dedup], [SyncMode.full_refresh, DestinationSyncMode.overwrite], - [SyncMode.full_refresh, DestinationSyncMode.append], [SyncMode.incremental, DestinationSyncMode.append], - [SyncMode.incremental, DestinationSyncMode.append_dedup], + [SyncMode.full_refresh, DestinationSyncMode.append], ]; const DEFAULT_SCHEDULE: ConnectionSchedule = { @@ -201,57 +200,7 @@ function mapFormPropsToOperation( return newOperations; } -function getDefaultCursorField(streamNode: SyncSchemaStream) { - if (streamNode.stream?.defaultCursorField?.length) { - return streamNode.stream.defaultCursorField; - } - return streamNode.config?.cursorField; -} - -const useInitialSchema = (schema: SyncSchema): SyncSchema => - useMemo( - () => ({ - streams: schema.streams.map((apiNode, id) => { - const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; - - // If the value in supportedSyncModes is empty assume the only supported sync mode is FULL_REFRESH. - // Otherwise, it supports whatever sync modes are present. - const streamNode = nodeWithId.stream?.supportedSyncModes?.length - ? nodeWithId - : setIn(nodeWithId, "stream.supportedSyncModes", [SyncMode.full_refresh]); - - // If syncMode isn't null - don't change item - if (streamNode.config.syncMode) { - return streamNode; - } - - const updateStreamConfig = (config: Partial): SyncSchemaStream => ({ - ...streamNode, - config: { ...streamNode.config, ...config }, - }); - - const supportedSyncModes = streamNode.stream.supportedSyncModes; - - // Prefer INCREMENTAL sync mode over other sync modes - if (supportedSyncModes.includes(SyncMode.incremental)) { - return updateStreamConfig({ - cursorField: streamNode.config.cursorField.length - ? streamNode.config.cursorField - : getDefaultCursorField(streamNode), - syncMode: SyncMode.incremental, - }); - } - - // If source don't support INCREMENTAL and FULL_REFRESH - set first value from supportedSyncModes list - return updateStreamConfig({ - syncMode: streamNode.stream.supportedSyncModes[0], - }); - }), - }), - [schema.streams] - ); - -const getInitialTransformations = (operations?: OperationCreate[]): OperationRead[] => +const getInitialTransformations = (operations: OperationCreate[]): OperationRead[] => operations?.filter(isDbtTransformation) ?? []; const getInitialNormalization = ( @@ -275,7 +224,11 @@ const useInitialValues = ( destDefinition: DestinationDefinitionSpecificationRead, isEditMode?: boolean ): FormikConnectionFormValues => { - const initialSchema = useInitialSchema(connection.syncCatalog); + const initialSchema = useMemo( + () => + calculateInitialCatalog(connection.syncCatalog, destDefinition?.supportedDestinationSyncModes || [], isEditMode), + [connection.syncCatalog, destDefinition, isEditMode] + ); return useMemo(() => { const initialValues: FormikConnectionFormValues = {