From 71a9dea076531981cde7be52693cd98be1e34318 Mon Sep 17 00:00:00 2001 From: Harshil Shukla Date: Mon, 14 Nov 2022 11:12:34 -0800 Subject: [PATCH 1/2] Add support for post datastream create,update,delete and state change action (#915) * Add support for post datastream create,update,delete and state change action * Update comment * Add unit test * Update unit test to fix the style check * Update comments * Update comments * Update tests * Remove flaky test and rely on newly added complete test * Refactor postDSStateChangeAction to get called as part of Rest.li, Add unit test * Reformt the imports * Update gradle version to 6.9.3 * Revert gradle version * Update datastream status for delete, bump version * Call postdatastreamstatechangeaction for stopped status too, update unit test * Update comment * Address comment and update test * Address comment and update test * Update test to fix race condition * Address review comment Co-authored-by: Harshil Shukla --- .../server/api/connector/Connector.java | 16 ++++- .../server/dms/DatastreamResources.java | 40 ++++++++++- .../datastream/server/TestCoordinator.java | 69 +++++++++++++++++++ .../server/dms/TestDatastreamResources.java | 67 +++++++++++++++++- .../datastream/server/ConnectorWrapper.java | 18 +++++ .../datastream/server/Coordinator.java | 24 +++++++ .../datastream/connectors/DummyConnector.java | 14 ++++ gradle/maven.gradle | 2 +- 8 files changed, 244 insertions(+), 6 deletions(-) diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java index a73f3acf4..7dee429be 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java @@ -15,6 +15,7 @@ import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.DatastreamConstants; +import com.linkedin.datastream.common.DatastreamException; import com.linkedin.datastream.common.DatastreamMetadataConstants; import com.linkedin.datastream.metrics.MetricsAware; import com.linkedin.datastream.server.DatastreamTask; @@ -131,7 +132,7 @@ default String getDestinationName(Datastream datastream) { * with its set of initializations. * * NOTE: This method is called by the Rest.li service before the datastream is written to ZooKeeper, so please make - * sure this call doesn't block for more then few seconds otherwise the REST call will timeout. + * sure this call doesn't block for more than few seconds otherwise the REST call will timeout. * @param stream Datastream being initialized * @param allDatastreams all existing datastreams in the system of connector type of the datastream that is being * initialized. @@ -139,4 +140,17 @@ default String getDestinationName(Datastream datastream) { default void postDatastreamInitialize(Datastream stream, List allDatastreams) throws DatastreamValidationException { } + + /** + * Hook that can be used to do any additional operations once the datastream has been created, update or deleted + * successfully on the ZooKeeper. This method will be invoked for datastream state change too. + * + * NOTE: This method is called by the Rest.li service after the datastream is written to/deleted from ZooKeeper. So + * please make sure this call doesn't block for more than few seconds otherwise the REST call will timeout. If you + * have non-critical work, see if that can be done as an async operation or on a separate thread. + * @param stream the datastream + * @throws DatastreamException on fail to perform post datastream state change operations successfully. + */ + default void postDatastreamStateChangeAction(Datastream stream) throws DatastreamException { + } } diff --git a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java index 65daf0ef0..31f2e7fea 100644 --- a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java +++ b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java @@ -303,6 +303,9 @@ private void doUpdateDatastreams(Map datastreamMap) { // are updated before we touch the "assignments" node to avoid race condition for (String key : datastreamMap.keySet()) { _store.updateDatastream(key, datastreamMap.get(key), false); + + // invoke post datastream state change action for recently updated datastream + invokePostDSStateChangeAction(datastreamMap.get(key)); } _coordinator.broadcastDatastreamUpdate(); } catch (DatastreamException e) { @@ -359,6 +362,8 @@ public ActionResult pause(@PathKeysParam PathKeys pathKeys, if (DatastreamStatus.READY.equals(datastream.getStatus())) { d.setStatus(DatastreamStatus.PAUSED); _store.updateDatastream(d.getName(), d, true); + // invoke post datastream state change action for recently paused datastream + invokePostDSStateChangeAction(d); } else { LOG.warn("Cannot pause datastream {}, as it is not in READY state. State: {}", d, datastream.getStatus()); } @@ -467,12 +472,16 @@ public ActionResult stop(@PathKeysParam PathKeys pathKeys, d.setStatus(DatastreamStatus.STOPPING); _store.updateDatastream(d.getName(), d, true); _store.deleteDatastreamNumTasks(d.getName()); + // invoke post datastream state change action for recently stopped datastream + invokePostDSStateChangeAction(d); } else if (DatastreamStatus.STOPPING.equals(d.getStatus())) { // this check helps in preventing any datastream from being stuck in STOPPING state indefinitely LOG.warn("Datastream {} is already in {} state. Notifying leader to initiate transition", d, d.getStatus()); _store.updateDatastream(d.getName(), d, true); _store.deleteDatastreamNumTasks(d.getName()); + // invoke post datastream state change action for recently stopped datastream + invokePostDSStateChangeAction(d); } else { LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, d.getStatus()); } @@ -491,6 +500,14 @@ public ActionResult stop(@PathKeysParam PathKeys pathKeys, LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName())); + // above check guarantees us that datastreams will be in STOPPED state only, so not need to fetch the latest state + // from store explicitly + datastreamsToStop.forEach(ds -> { + // set the status as STOPPED as the original status would be READY, PAUSED or STOPPING + ds.setStatus(DatastreamStatus.STOPPED); + invokePostDSStateChangeAction(ds); + }); + return new ActionResult<>(HttpStatus.S_200_OK); } @@ -528,6 +545,8 @@ public ActionResult resume(@PathKeysParam PathKeys pathKeys, DatastreamStatus.STOPPED.equals(datastream.getStatus())) { d.setStatus(DatastreamStatus.READY); _store.updateDatastream(d.getName(), d, true); + // invoke post datastream state change action for recently resumed datastream + invokePostDSStateChangeAction(d); } else { LOG.warn("Will not resume datastream {}, as it is not already in PAUSED/STOPPED state", d); } @@ -712,7 +731,8 @@ public ActionResult resumeSourcePartitions(@PathKeysParam PathKeys pathKey @Override public UpdateResponse delete(String datastreamName) { - if (null == _store.getDatastream(datastreamName)) { + final Datastream datastream = _store.getDatastream(datastreamName); + if (null == datastream) { _errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Datastream requested to be deleted does not exist: " + datastreamName); } @@ -725,6 +745,9 @@ public UpdateResponse delete(String datastreamName) { _store.deleteDatastream(datastreamName); DELETE_CALL_LATENCY_MS.set(Duration.between(startTime, Instant.now()).toMillis()); + // invoke post datastream state change action for recently deleted datastream + invokePostDSStateChangeAction(datastream); + return new UpdateResponse(HttpStatus.S_200_OK); } catch (Exception e) { _dynamicMetricsManager.createOrUpdateMeter(CLASS_NAME, CALL_ERROR, 1); @@ -893,6 +916,10 @@ public CreateResponse create(Datastream datastream) { CREATE_CALL_LATENCY_MS.set(delta.toMillis()); LOG.info("Datastream persisted to zookeeper, total time used: {} ms", delta.toMillis()); + + // invoke post datastream state change action for recently created datastream + invokePostDSStateChangeAction(datastream); + return new CreateResponse(datastream.getName(), HttpStatus.S_201_CREATED); } catch (IllegalArgumentException e) { _dynamicMetricsManager.createOrUpdateMeter(CLASS_NAME, CALL_ERROR, 1); @@ -920,6 +947,17 @@ public CreateResponse create(Datastream datastream) { return null; } + private void invokePostDSStateChangeAction(Datastream datastream) { + try { + LOG.debug("Invoke post datastream state change action datastream={}", datastream); + _coordinator.invokePostDataStreamStateChangeAction(datastream); + LOG.info("Invoked post datastream state change action datastream={}", datastream); + } catch (DatastreamException e) { + _errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, + "Failed to perform post datastream state change action datastream=" + datastream, e); + } + } + /** * Get the list of metrics emitted by this class */ diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index aa43c5a0d..6e631cf89 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -1155,6 +1155,64 @@ public void testDatastreamWithBYOT() throws Exception { zkClient.close(); } + @Test + public void testInvokePostDataStreamStateChangeAction() throws Exception { + String testCluster = "testDatastreamCreateUpdateDelete"; + String connectorType = "connectorType"; + + TestHookConnector connector1 = new TestHookConnector("connector1", connectorType); + Coordinator coordinator1 = createCoordinator(_zkConnectionString, testCluster); + coordinator1.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()), + false, new SourceBasedDeduper(), null); + coordinator1.start(); + + TestHookConnector connector2 = new TestHookConnector("connector2", connectorType); + Coordinator coordinator2 = createCoordinator(_zkConnectionString, testCluster); + coordinator2.addConnector(connectorType, connector2, new BroadcastStrategy(Optional.empty()), + false, new SourceBasedDeduper(), null); + coordinator2.start(); + ZkClient zkClient = new ZkClient(_zkConnectionString); + + // create datastream + Datastream[] list = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, + "datastream1", "datastream2"); + Datastream datastream1 = list[0]; + LOG.info("Created datastream1: {}", datastream1); + Datastream datastream2 = list[1]; + LOG.info("Created datastream2: {}", datastream2); + + coordinator1.invokePostDataStreamStateChangeAction(datastream1); + Assert.assertTrue(coordinator1.getIsLeader().getAsBoolean()); + // post datastream state change action method should be invoked for created datastream + Assert.assertEquals(connector1.getPostDSStatechangeActionInvokeCount(), 1); + Assert.assertEquals(connector2.getPostDSStatechangeActionInvokeCount(), 0); + + // wait for datastream to be READY + PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1") + .getStatus() + .equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS); + datastream1 = DatastreamTestUtils.getDatastream(zkClient, testCluster, datastream1.getName()); + + // update datastream + datastream1.getMetadata().put("key", "value"); + datastream1.getSource().setConnectionString("newSource"); + + LOG.info("Updating datastream: {}", datastream1); + CachedDatastreamReader datastreamCache = new CachedDatastreamReader(zkClient, testCluster); + ZookeeperBackedDatastreamStore dsStore = new ZookeeperBackedDatastreamStore(datastreamCache, zkClient, testCluster); + DatastreamResources datastreamResources = new DatastreamResources(dsStore, coordinator1); + datastreamResources.update(datastream1.getName(), datastream1); + + Assert.assertEquals(connector1.getPostDSStatechangeActionInvokeCount(), 2); + Assert.assertEquals(connector2.getPostDSStatechangeActionInvokeCount(), 0); + + coordinator1.stop(); + coordinator1.getDatastreamCache().getZkclient().close(); + coordinator2.stop(); + coordinator2.getDatastreamCache().getZkclient().close(); + zkClient.close(); + } + /** * Test datastream creation with Connector-managed destination; coordinator should not create or delete topics. */ @@ -3110,6 +3168,7 @@ public class TestHookConnector implements Connector { List _tasks = new ArrayList<>(); String _instance = ""; String _name; + private int _postDSStatechangeActionInvokeCount; /** * Constructor for TestHookConnector @@ -3141,6 +3200,10 @@ public List getTasks() { return _tasks; } + public int getPostDSStatechangeActionInvokeCount() { + return _postDSStatechangeActionInvokeCount; + } + @Override public void start(CheckpointProvider checkpointProvider) { _isStarted = true; @@ -3178,6 +3241,12 @@ public void validateUpdateDatastreams(List datastreams, List resource1.get(datastreamName).getStatus() == DatastreamStatus.READY, 100, 10000); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Mock PathKeys PathKeys pathKey = Mockito.mock(PathKeys.class); @@ -226,6 +230,7 @@ public void testPauseDatastream() { Assert.assertEquals(resource1.get(datastreamName).getStatus(), DatastreamStatus.READY); ActionResult pauseResponse = resource1.pause(pathKey, false); Assert.assertEquals(pauseResponse.getStatus(), HttpStatus.S_200_OK); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); // Retrieve datastream and check that is in pause state. Datastream ds = resource2.get(datastreamName); @@ -235,6 +240,7 @@ public void testPauseDatastream() { // Resume datastream. ActionResult resumeResponse = resource1.resume(pathKey, false); Assert.assertEquals(resumeResponse.getStatus(), HttpStatus.S_200_OK); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 3); // Retrieve datastream and check that is not paused. Datastream ds2 = resource2.get(datastreamName); @@ -246,6 +252,8 @@ public void testPauseDatastream() { public void testStopDatastream() { DatastreamResources resource1 = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); DatastreamResources resource2 = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); // Create a Datastream. Datastream datastreamToCreate = generateDatastream(0); @@ -258,6 +266,7 @@ public void testStopDatastream() { PollUtils.poll(() -> resource1.get(datastreamName).getStatus() == DatastreamStatus.READY, 100, 10000); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Mock PathKeys PathKeys pathKey = Mockito.mock(PathKeys.class); @@ -272,6 +281,8 @@ public void testStopDatastream() { Datastream ds = resource2.get(datastreamName); Assert.assertNotNull(ds); Assert.assertEquals(ds.getStatus(), DatastreamStatus.STOPPED); + // postDatastreamStateChangeAction should be invoked for status STOPPING and STOPPED + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 3); } @Test @@ -377,6 +388,8 @@ public void testStopDatastreamWhenRequestHandlerHostDies() { public void testPerformingAllActionsOnStoppingDatastream() throws DatastreamException { DatastreamResources resource1 = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); DatastreamStore store = _datastreamKafkaCluster.getPrimaryDatastreamServer().getDatastreamStore(); + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); // Create a Datastream Datastream datastreamToCreate = generateDatastream(0); @@ -389,6 +402,7 @@ public void testPerformingAllActionsOnStoppingDatastream() throws DatastreamExce PollUtils.poll(() -> resource1.get(datastreamName).getStatus() == DatastreamStatus.READY, 100, 10000); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Mock PathKeys PathKeys pathKey = Mockito.mock(PathKeys.class); @@ -397,25 +411,34 @@ public void testPerformingAllActionsOnStoppingDatastream() throws DatastreamExce // Setting status to STOPPING explicitly to perform testing. datastreamToCreate.setStatus(DatastreamStatus.STOPPING); store.updateDatastream(datastreamName, datastreamToCreate, false); + // as we are updating datastream directly on store, postDatastreamStateChangeActionn should not be invoked + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Retrieve datastream and check that is in STOPPING state. Datastream ds = resource1.get(datastreamName); Assert.assertNotNull(ds); Assert.assertEquals(ds.getStatus(), DatastreamStatus.STOPPING); + // datastream get should not invoke postDatastreamStateChangeAction + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Attempting to pause a datastream in stopping state, which should raise an exception. Assert.assertEquals( Assert.expectThrows(RestLiServiceException.class, () -> resource1.pause(pathKey, false)).getStatus(), HttpStatus.S_405_METHOD_NOT_ALLOWED); + // postDatastreamStateChangeAction should not be invoked + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Attempting to resume a datastream in stopping state, which should raise an exception. Assert.assertEquals( Assert.expectThrows(RestLiServiceException.class, () -> resource1.resume(pathKey, false)).getStatus(), HttpStatus.S_405_METHOD_NOT_ALLOWED); + // postDatastreamStateChangeAction should not be invoked + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Attempting to stop a datastream in stopping state, which should get executed without exception. Assert.assertEquals(resource1.stop(pathKey, false).getStatus(), HttpStatus.S_200_OK); Assert.assertEquals(resource1.get(datastreamName).getStatus(), DatastreamStatus.STOPPED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 3); // Setting status to STOPPING again explicitly to perform testing. datastreamToCreate.setStatus(DatastreamStatus.STOPPING); @@ -429,11 +452,13 @@ public void testPerformingAllActionsOnStoppingDatastream() throws DatastreamExce // Attempting to delete a datastream in stopping state, which should get executed without exception. Assert.assertEquals(resource1.delete(datastreamName).getStatus(), HttpStatus.S_200_OK); Assert.assertEquals(resource1.get(datastreamName).getStatus(), DatastreamStatus.DELETING); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 4); } @Test public void testStopRequestTimeoutScenarioWithConfigurableTimeouts() { DatastreamServer testDatastreamServer = _datastreamKafkaCluster.getPrimaryDatastreamServer(); + DummyConnector connector = (DummyConnector) testDatastreamServer.getCoordinator().getConnector(DUMMY_CONNECTOR); // Configuring small timeouts to mock timeout scenario Properties testProperties = new Properties(); @@ -452,6 +477,7 @@ public void testStopRequestTimeoutScenarioWithConfigurableTimeouts() { PollUtils.poll(() -> resource1.get(datastreamName).getStatus() == DatastreamStatus.READY, 100, 10000); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Mock PathKeys PathKeys pathKey = Mockito.mock(PathKeys.class); @@ -464,12 +490,16 @@ public void testStopRequestTimeoutScenarioWithConfigurableTimeouts() { Assert.assertEquals( Assert.expectThrows(RestLiServiceException.class, () -> resource1.stop(pathKey, false)).getStatus(), HttpStatus.S_408_REQUEST_TIMEOUT); + // postDatastreamStateChangeAction should be invoked only for STOPPING and not for STOPPED + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); } @Test public void testStopRequestTimeoutWithBusyLeader() throws DatastreamException { - DatastreamStore testDatastreamStore = _datastreamKafkaCluster.getPrimaryDatastreamServer().getDatastreamStore(); + DatastreamServer testDatastreamServer = _datastreamKafkaCluster.getPrimaryDatastreamServer(); + DatastreamStore testDatastreamStore = testDatastreamServer.getDatastreamStore(); + DummyConnector connector = (DummyConnector) testDatastreamServer.getCoordinator().getConnector(DUMMY_CONNECTOR); // Attaching mock spies to the test instances of DatastreamCluster, DatastreamServer and DatastreamStore EmbeddedDatastreamCluster mockDatastreamCluster = Mockito.spy(_datastreamKafkaCluster); @@ -501,6 +531,7 @@ public void testStopRequestTimeoutWithBusyLeader() throws DatastreamException { PollUtils.poll(() -> resource1.get(datastreamName).getStatus() == DatastreamStatus.READY, 100, 10000); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Mock PathKeys PathKeys pathKey = Mockito.mock(PathKeys.class); @@ -513,6 +544,8 @@ public void testStopRequestTimeoutWithBusyLeader() throws DatastreamException { Assert.assertEquals( Assert.expectThrows(RestLiServiceException.class, () -> resource1.stop(pathKey, false)).getStatus(), HttpStatus.S_408_REQUEST_TIMEOUT); + // postDatastreamStateChangeAction should be invoked only for STOPPING and not for STOPPED + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); } @Test @@ -778,41 +811,51 @@ private Datastream createAndWaitUntilInitialized(DatastreamResources resources, @Test public void testUpdateDatastream() throws Exception { DatastreamResources resource = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); Datastream originalDatastream1 = generateDatastream(1); checkBadRequest(() -> resource.update("none", originalDatastream1), HttpStatus.S_400_BAD_REQUEST); checkBadRequest(() -> resource.update(originalDatastream1.getName(), originalDatastream1), HttpStatus.S_404_NOT_FOUND); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 0); Datastream datastream1 = createAndWaitUntilInitialized(resource, originalDatastream1); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Cant update destination / status / connector / transport provider Datastream modifyDestination = generateDatastream(1); modifyDestination.getDestination().setConnectionString("updated"); checkBadRequest(() -> resource.update(modifyDestination.getName(), modifyDestination), HttpStatus.S_400_BAD_REQUEST); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); Datastream modifyStatus = generateDatastream(1); modifyStatus.setStatus(DatastreamStatus.PAUSED); checkBadRequest(() -> resource.update(modifyStatus.getName(), modifyStatus), HttpStatus.S_400_BAD_REQUEST); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); Datastream modifyConnector = generateDatastream(1); modifyStatus.setConnectorName("Random"); checkBadRequest(() -> resource.update(modifyConnector.getName(), modifyConnector), HttpStatus.S_400_BAD_REQUEST); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); Datastream modifyTransport = generateDatastream(1); modifyStatus.setTransportProviderName("Random"); checkBadRequest(() -> resource.update(modifyTransport.getName(), modifyTransport), HttpStatus.S_400_BAD_REQUEST); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); Datastream modifyNumTasks = generateDatastream(1); modifyNumTasks.getMetadata().put("numTasks", "10"); checkBadRequest(() -> resource.update(modifyNumTasks.getName(), modifyNumTasks), HttpStatus.S_400_BAD_REQUEST); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Create another datastream that gets deduped to datastream1. Datastream originalDatastream2 = generateDatastream(2); originalDatastream2.getDestination().setConnectionString("a different destination"); final Datastream datastream2 = createAndWaitUntilInitialized(resource, originalDatastream2); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); // Update metadata for all streams in the group. It is up to the connector to validate if metadata updates are // allowed. Dummy connector allows it @@ -825,6 +868,7 @@ public void testUpdateDatastream() throws Exception { BatchUpdateResult response = resource.batchUpdate(batchRequest); Assert.assertTrue( response.getResults().values().stream().allMatch(res -> res.getStatus().equals(HttpStatus.S_200_OK))); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 4); } catch (RestLiServiceException e) { Assert.fail("Valid batch update request failed"); } @@ -846,6 +890,8 @@ public void testUpdateDatastream() throws Exception { } catch (RestLiServiceException e) { // do nothing } + // post datastream state change should not get called on update failure + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 4); // make sure that on a failed batch update even the valid datastream update doesn't go through Thread.sleep(200); @@ -856,14 +902,16 @@ public void testUpdateDatastream() throws Exception { } @Test - public void testCreateEncryptedDatastream() throws Exception { + public void testCreateEncryptedDatastream() { DatastreamResources resource = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); - + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); // Happy Path Datastream encryptedDS = generateEncryptedDatastream(1, true, true); CreateResponse response = resource.create(encryptedDS); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // Regression for Byot Datastream justByotDS = generateEncryptedDatastream(3, false, true); @@ -872,18 +920,22 @@ public void testCreateEncryptedDatastream() throws Exception { response = resource.create(justByotDS); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); } @Test public void testCreateDatastream() throws Exception { DatastreamResources resource = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); Set missingFields = new HashSet<>(); + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); // happy path Datastream fullDatastream = generateDatastream(0); CreateResponse response = resource.create(fullDatastream); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 1); // datastream names with leading and/or trailing whitespace are trimmed Datastream whitespaceDatastream = generateDatastream(1); @@ -895,12 +947,14 @@ public void testCreateDatastream() throws Exception { Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); Assert.assertEquals(response.getId(), originalName); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 2); missingFields.add("target"); Datastream allRequiredFields = generateDatastream(2, missingFields); response = resource.create(allRequiredFields); Assert.assertNull(response.getError()); Assert.assertEquals(response.getStatus(), HttpStatus.S_201_CREATED); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 3); // missing necessary fields missingFields.clear(); @@ -953,6 +1007,8 @@ public void testCreateDatastream() throws Exception { Datastream badDatastream = generateDatastream(0); badDatastream.getMetadata().put("numTasks", "100"); checkBadRequest(() -> resource.create(badDatastream)); + System.out.println("KHS11= " + connector.getPostDSStatechangeActionInvokeCount()); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 3); } private Datastream createDatastream(DatastreamResources resource, String name, int seed) { @@ -971,11 +1027,14 @@ private List createDatastreams(DatastreamResources resource, String @Test public void testCreateGetAllDatastreams() throws Exception { DatastreamResources resource = new DatastreamResources(_datastreamKafkaCluster.getPrimaryDatastreamServer()); + DummyConnector connector = (DummyConnector) _datastreamKafkaCluster.getPrimaryDatastreamServer().getCoordinator() + .getConnector(DUMMY_CONNECTOR); Assert.assertEquals(resource.getAll(NO_PAGING).size(), 0); String datastreamName = "TestDatastream-"; List datastreams = createDatastreams(resource, datastreamName, 10); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 10); // Get All Optional> result = @@ -994,12 +1053,14 @@ public void testCreateGetAllDatastreams() throws Exception { // Delete one entry Datastream removed = queryStreams.remove(0); Assert.assertEquals(resource.delete(removed.getName()).getStatus(), HttpStatus.S_200_OK); + Assert.assertEquals(connector.getPostDSStatechangeActionInvokeCount(), 11); // Get All List remainingQueryStreams = resource.getAll(NO_PAGING) .stream() .filter(x -> x.getStatus() != DatastreamStatus.DELETING) .collect(Collectors.toList()); + // getAll should not invoke postDatastreamStateChangeAction // Compare datastreams set only by name since destination is empty upon creation and later populated Assert.assertEquals(queryStreams.stream().map(Datastream::getName).collect(Collectors.toSet()), diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/ConnectorWrapper.java b/datastream-server/src/main/java/com/linkedin/datastream/server/ConnectorWrapper.java index 37cd14cab..056ddb643 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/ConnectorWrapper.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/ConnectorWrapper.java @@ -15,6 +15,7 @@ import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.DatastreamConstants; import com.linkedin.datastream.common.DatastreamDestination; +import com.linkedin.datastream.common.DatastreamException; import com.linkedin.datastream.server.api.connector.Connector; import com.linkedin.datastream.server.api.connector.DatastreamValidationException; import com.linkedin.datastream.server.providers.CheckpointProvider; @@ -249,4 +250,21 @@ public void postDatastreamInitialize(Datastream stream, List allData logApiEnd("postDatastreamInitialize"); } + + /** + * Hook that can be used to perform action for post datastream upsert/deletion or state change. This will be called + * as part of the Rest.li call once the ZooKeeper has been updated. + * + * @param stream the modified(created, updated, deleted, state changed) Datastream + */ + public void postDatastreamStateChangeAction(Datastream stream) throws DatastreamException { + logApiStart("postDatastreamStateChangeAction"); + try { + _connector.postDatastreamStateChangeAction(stream); + } catch (DatastreamException e) { + logErrorAndException("postDatastreamStateChangeAction", e); + throw e; + } + logApiEnd("postDatastreamStateChangeAction"); + } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 833b33d47..3477e0479 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -49,6 +49,7 @@ import com.linkedin.datastream.common.DatastreamAlreadyExistsException; import com.linkedin.datastream.common.DatastreamConstants; import com.linkedin.datastream.common.DatastreamDestination; +import com.linkedin.datastream.common.DatastreamException; import com.linkedin.datastream.common.DatastreamMetadataConstants; import com.linkedin.datastream.common.DatastreamRuntimeException; import com.linkedin.datastream.common.DatastreamStatus; @@ -1089,6 +1090,28 @@ private void hardDeleteDatastream(Datastream ds, List activeStreams) _adapter.deleteDatastream(ds.getName()); } + /** + * Invokes post datastream state change action of connector for given datastream. + * @param datastream the datastream + * @throws DatastreamException if fails to perform post datastream action + */ + public void invokePostDataStreamStateChangeAction(final Datastream datastream) throws DatastreamException { + _log.info("Invoke post datastream state change action"); + try { + Datastream datastreamCopy = datastream.copy(); + final String connectorName = datastreamCopy.getConnectorName(); + final ConnectorInfo connectorInfo = _connectors.get(connectorName); + connectorInfo.getConnector().postDatastreamStateChangeAction(datastreamCopy); + } catch (CloneNotSupportedException e) { + _log.error("Failed to copy object for datastream={}", datastream.getName()); + throw new DatastreamException("Failed to copy datastream object", e); + } catch (DatastreamException e) { + _log.error("Failed to perform post datastream state change action datastream={}", datastream.getName()); + _metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.POST_DATASTREAMS_STATE_CHANGE_ACTION_NUM_ERRORS, 1); + throw e; + } + } + private void createTopic(Datastream datastream) { _transportProviderAdmins.get(datastream.getTransportProviderName()).createDestination(datastream); @@ -2157,6 +2180,7 @@ public enum KeyedMeter { IS_PARTITION_ASSIGNMENT_SUPPORTED_NUM_ERRORS("isPartitionAssignmentSupported", NUM_ERRORS), IS_DATASTREAM_UPDATE_TYPE_SUPPORTED_NUM_ERRORS("isDatastreamUpdateTypeSupported", NUM_ERRORS), INITIALIZE_DATASTREAM_NUM_ERRORS("initializeDatastream", NUM_ERRORS), + POST_DATASTREAMS_STATE_CHANGE_ACTION_NUM_ERRORS("postDatastreamStateChangeAction", NUM_ERRORS), /* Coordinator event metrics */ LEADER_DO_ASSIGNMENT_NUM_ERRORS(HANDLE_EVENT_PREFIX + EventType.LEADER_DO_ASSIGNMENT, NUM_ERRORS), LEADER_PARTITION_ASSIGNMENT_NUM_ERRORS(HANDLE_EVENT_PREFIX + EventType.LEADER_PARTITION_ASSIGNMENT, NUM_ERRORS), diff --git a/datastream-testcommon/src/main/java/com/linkedin/datastream/connectors/DummyConnector.java b/datastream-testcommon/src/main/java/com/linkedin/datastream/connectors/DummyConnector.java index b636109f9..6a7ae8fad 100644 --- a/datastream-testcommon/src/main/java/com/linkedin/datastream/connectors/DummyConnector.java +++ b/datastream-testcommon/src/main/java/com/linkedin/datastream/connectors/DummyConnector.java @@ -9,8 +9,11 @@ import java.util.Map; import java.util.Properties; +import org.testng.Assert; + import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.DatastreamConstants; +import com.linkedin.datastream.common.DatastreamException; import com.linkedin.datastream.common.DiagnosticsAware; import com.linkedin.datastream.metrics.BrooklinMetricInfo; import com.linkedin.datastream.server.DatastreamTask; @@ -28,6 +31,7 @@ public class DummyConnector implements Connector, DiagnosticsAware { public static final String CONNECTOR_TYPE = "DummyConnector"; private final Properties _properties; + private int _postDSStatechangeActionInvokeCount; /** * Constructor for DummyConnector @@ -43,6 +47,10 @@ public DummyConnector(Properties properties) throws Exception { } } + public int getPostDSStatechangeActionInvokeCount() { + return _postDSStatechangeActionInvokeCount; + } + @Override public void start(CheckpointProvider checkpointProvider) { } @@ -72,6 +80,12 @@ public void validateUpdateDatastreams(List datastreams, List getMetricInfos() { return null; diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 8febd361d..05c4e1c07 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "4.0.0" + version = "4.1.0" } subprojects { From 4eafa7b9eb3a676b8a5206de22da910435aa5c1b Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Mon, 14 Nov 2022 14:38:26 -0600 Subject: [PATCH 2/2] Version 4.2.0-SNAPSHOT (#914) * version 4.1.0 * Bumping version to 4.2.0-SNAPSHOT * Update changelog * Update README version badge Co-authored-by: Harshil Shukla --- CHANGELOG.md | 60 +++++++++++++++++++++++++++++++++++++++++++++ README.asciidoc | 2 +- gradle/maven.gradle | 2 +- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 920c82e6f..c53a0f324 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,66 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## 4.1.0 — 2022-09-29 + +- Refactored StickyPartitionAssignmentStrategy and implemented task estimation logic in LoadBasedPartitionAssignmentStrategy #835 +- Fix flaky test testConsumeFromMultipleTopics #838 +- Refactor Optional parameters in the constructor of strategy #836 +- Implemented logic to prevent tasks from having more than specified number of partitions #837 +- Publishing artifacts to JFrog #839 +- Make the Throughput based assignment and task estimation based on partition assignment configurable #841 +- Implemented metrics for LoadBasedPartitionAssignmentStrategy #840 +- Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler #843 +- Handle the new session after session expiry #770 +- Fixed issue with config key #847 +- Print count of orphaned tasks and orphaned task locks in the log message #844 +- Datastream stop transition redesign #842 +- Fix flaky stopping simultaneous datastream test #849 +- Update the DatabaseChunkedReader to take the Connection as input rather than the DataSource #850 +- Handle leaking TP exception in handleAssignmentChange #845 +- Migrating from AdminUtils with AdminClient #848 +- Fix running task data structure logic in AbstractKafkaConnector for failing to stop task #851 +- Removing partial Helix ZkClient dependency #852 +- Stop the tasks in parallel in AbstractKafkaConnector #853 +- Add additional log in LoadBasedPartitionAssignmentStrategy #856 +- Fixing flaky test testCreateDatastreamHappyPathDefaultRetention #854 +- Added toString() override in PartitionThroughputInfo #858 +- Add Stats to DatastreamTaskImpl #855 +- Make PartitionThroughputProvider metricsAware #859 +- Added base strategy metric info in LoadBasedPartitionAssignmentStrategy #857 +- Add alert metrics to identify that elastic task configurations require adjustment #860 +- Fix restartDeadTask logic when the task thread has died #861 +- Fix metric infos in PartitionThroughputProvider #862 +- Fix the metrics deregistration in AbstractKafkaConnector when multiple stop are called #865 +- Fix logging in LoadBasedPartitionAssignmentStrategy #866 +- Make Default byte in rate and Msg in rate configurable #864 +- Metrics are getting emitted in LoadBasedPartitionAssignmentStrategy only when it needs adjustment #867 +- Use topic level throughput information when partition level information is unavailable #871 +- Fix compilation error #874 +- Loadbased Partition assigner not using topic level metrics to recognize partitions #876 +- Flushless producer supporting both comparable and non comparable offsets #873 +- LiveInstanceProvider subscription should be done only by the leader coordinator #879 +- Fixed issue with missing exception message during task initialization #882 +- Kafka upgrade #881 +- Skipping onPartitionsRevoked during consumer.close() call #886 +- Scala 2.12 upgrade #895 +- Upgrade avro and move jackson from codehaus to fasterxml #894 +- Fix topic deletion when multiple duplicate streams expire at the same time #897 +- Use 2.4.1.57 kafka version #901 +- Tests for min/max partitions per task metrics and minor code quality improvements #887 +- Fix rebalancing-tasks bug and added tests #900 +- Refactor Stopping Tasks On Assignment Change of Tasks #868 +- Change python dependency in commit-msg git hook #904 +- Remove Scala Dependencies #905 +- Introduce broadcast API to TransportProvider #903 +- Dedupe tasks on LeaderDoAssignment #906 +- Fix Stopping Logic and Maintain Stopping Latch Counter #877 +- Fixing test OnAssignmentChangeMultipleReassignments #908 +- Update kafka version #910 +- Replace 101tec ZkClient with Helix ZkClient #909 +- Add retry to query retention time for destination topic #863 +- Upgrade Zookeeper version to 3.6.3 #913 + ## 1.0.2 — 2019-10-01 - Relax Kafka broker hostname validation checks (#656) diff --git a/README.asciidoc b/README.asciidoc index 907125d2c..476cc3067 100644 --- a/README.asciidoc +++ b/README.asciidoc @@ -1,7 +1,7 @@ = Brooklin image:https://img.shields.io/github/license/linkedin/brooklin.svg[link="https://github.com/linkedin/brooklin/blob/master/LICENSE"] -image:https://img.shields.io/badge/bintray-v1.0.2-blue.svg?style=popout[link="https://bintray.com/linkedin/maven/brooklin"] +image:https://img.shields.io/badge/bintray-v4.1.0-blue.svg?style=popout[link="https://bintray.com/linkedin/maven/brooklin"] image:https://img.shields.io/gitter/room/linkedin/kafka.svg?style=popout[link="https://gitter.im/linkedin/brooklin"] image:https://img.shields.io/github/last-commit/linkedin/brooklin.svg?style=popout[link="https://github.com/linkedin/brooklin/commits/master"] image:https://img.shields.io/github/issues/linkedin/brooklin/bug.svg?color=orange?style=popout[link="https://github.com/linkedin/brooklin/labels/bug"] diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 05c4e1c07..7d6eac021 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "4.1.0" + version = "4.2.0-SNAPSHOT" } subprojects {