Skip to content

Commit

Permalink
Merge branch 'master' into zk-change
Browse files Browse the repository at this point in the history
  • Loading branch information
Suraj Nagaraja Kasi authored Nov 22, 2022
2 parents ccd4f7a + 4eafa7b commit 4d0b312
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 7 deletions.
60 changes: 60 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,66 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## 4.1.0 — 2022-09-29

- Refactored StickyPartitionAssignmentStrategy and implemented task estimation logic in LoadBasedPartitionAssignmentStrategy #835
- Fix flaky test testConsumeFromMultipleTopics #838
- Refactor Optional parameters in the constructor of strategy #836
- Implemented logic to prevent tasks from having more than specified number of partitions #837
- Publishing artifacts to JFrog #839
- Make the Throughput based assignment and task estimation based on partition assignment configurable #841
- Implemented metrics for LoadBasedPartitionAssignmentStrategy #840
- Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler #843
- Handle the new session after session expiry #770
- Fixed issue with config key #847
- Print count of orphaned tasks and orphaned task locks in the log message #844
- Datastream stop transition redesign #842
- Fix flaky stopping simultaneous datastream test #849
- Update the DatabaseChunkedReader to take the Connection as input rather than the DataSource #850
- Handle leaking TP exception in handleAssignmentChange #845
- Migrating from AdminUtils with AdminClient #848
- Fix running task data structure logic in AbstractKafkaConnector for failing to stop task #851
- Removing partial Helix ZkClient dependency #852
- Stop the tasks in parallel in AbstractKafkaConnector #853
- Add additional log in LoadBasedPartitionAssignmentStrategy #856
- Fixing flaky test testCreateDatastreamHappyPathDefaultRetention #854
- Added toString() override in PartitionThroughputInfo #858
- Add Stats to DatastreamTaskImpl #855
- Make PartitionThroughputProvider metricsAware #859
- Added base strategy metric info in LoadBasedPartitionAssignmentStrategy #857
- Add alert metrics to identify that elastic task configurations require adjustment #860
- Fix restartDeadTask logic when the task thread has died #861
- Fix metric infos in PartitionThroughputProvider #862
- Fix the metrics deregistration in AbstractKafkaConnector when multiple stop are called #865
- Fix logging in LoadBasedPartitionAssignmentStrategy #866
- Make Default byte in rate and Msg in rate configurable #864
- Metrics are getting emitted in LoadBasedPartitionAssignmentStrategy only when it needs adjustment #867
- Use topic level throughput information when partition level information is unavailable #871
- Fix compilation error #874
- Loadbased Partition assigner not using topic level metrics to recognize partitions #876
- Flushless producer supporting both comparable and non comparable offsets #873
- LiveInstanceProvider subscription should be done only by the leader coordinator #879
- Fixed issue with missing exception message during task initialization #882
- Kafka upgrade #881
- Skipping onPartitionsRevoked during consumer.close() call #886
- Scala 2.12 upgrade #895
- Upgrade avro and move jackson from codehaus to fasterxml #894
- Fix topic deletion when multiple duplicate streams expire at the same time #897
- Use 2.4.1.57 kafka version #901
- Tests for min/max partitions per task metrics and minor code quality improvements #887
- Fix rebalancing-tasks bug and added tests #900
- Refactor Stopping Tasks On Assignment Change of Tasks #868
- Change python dependency in commit-msg git hook #904
- Remove Scala Dependencies #905
- Introduce broadcast API to TransportProvider #903
- Dedupe tasks on LeaderDoAssignment #906
- Fix Stopping Logic and Maintain Stopping Latch Counter #877
- Fixing test OnAssignmentChangeMultipleReassignments #908
- Update kafka version #910
- Replace 101tec ZkClient with Helix ZkClient #909
- Add retry to query retention time for destination topic #863
- Upgrade Zookeeper version to 3.6.3 #913

## 1.0.2 — 2019-10-01

- Relax Kafka broker hostname validation checks (#656)
Expand Down
2 changes: 1 addition & 1 deletion README.asciidoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
= Brooklin

image:https://img.shields.io/github/license/linkedin/brooklin.svg[link="https://github.com/linkedin/brooklin/blob/master/LICENSE"]
image:https://img.shields.io/badge/bintray-v1.0.2-blue.svg?style=popout[link="https://bintray.com/linkedin/maven/brooklin"]
image:https://img.shields.io/badge/bintray-v4.1.0-blue.svg?style=popout[link="https://bintray.com/linkedin/maven/brooklin"]
image:https://img.shields.io/gitter/room/linkedin/kafka.svg?style=popout[link="https://gitter.im/linkedin/brooklin"]
image:https://img.shields.io/github/last-commit/linkedin/brooklin.svg?style=popout[link="https://github.com/linkedin/brooklin/commits/master"]
image:https://img.shields.io/github/issues/linkedin/brooklin/bug.svg?color=orange?style=popout[link="https://github.com/linkedin/brooklin/labels/bug"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamConstants;
import com.linkedin.datastream.common.DatastreamException;
import com.linkedin.datastream.common.DatastreamMetadataConstants;
import com.linkedin.datastream.metrics.MetricsAware;
import com.linkedin.datastream.server.DatastreamTask;
Expand Down Expand Up @@ -131,12 +132,25 @@ default String getDestinationName(Datastream datastream) {
* with its set of initializations.
*
* NOTE: This method is called by the Rest.li service before the datastream is written to ZooKeeper, so please make
* sure this call doesn't block for more then few seconds otherwise the REST call will timeout.
* sure this call doesn't block for more than few seconds otherwise the REST call will timeout.
* @param stream Datastream being initialized
* @param allDatastreams all existing datastreams in the system of connector type of the datastream that is being
* initialized.
*/
default void postDatastreamInitialize(Datastream stream, List<Datastream> allDatastreams)
throws DatastreamValidationException {
}

/**
* Hook that can be used to do any additional operations once the datastream has been created, update or deleted
* successfully on the ZooKeeper. This method will be invoked for datastream state change too.
*
* NOTE: This method is called by the Rest.li service after the datastream is written to/deleted from ZooKeeper. So
* please make sure this call doesn't block for more than few seconds otherwise the REST call will timeout. If you
* have non-critical work, see if that can be done as an async operation or on a separate thread.
* @param stream the datastream
* @throws DatastreamException on fail to perform post datastream state change operations successfully.
*/
default void postDatastreamStateChangeAction(Datastream stream) throws DatastreamException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ private void doUpdateDatastreams(Map<String, Datastream> datastreamMap) {
// are updated before we touch the "assignments" node to avoid race condition
for (String key : datastreamMap.keySet()) {
_store.updateDatastream(key, datastreamMap.get(key), false);

// invoke post datastream state change action for recently updated datastream
invokePostDSStateChangeAction(datastreamMap.get(key));
}
_coordinator.broadcastDatastreamUpdate();
} catch (DatastreamException e) {
Expand Down Expand Up @@ -359,6 +362,8 @@ public ActionResult<Void> pause(@PathKeysParam PathKeys pathKeys,
if (DatastreamStatus.READY.equals(datastream.getStatus())) {
d.setStatus(DatastreamStatus.PAUSED);
_store.updateDatastream(d.getName(), d, true);
// invoke post datastream state change action for recently paused datastream
invokePostDSStateChangeAction(d);
} else {
LOG.warn("Cannot pause datastream {}, as it is not in READY state. State: {}", d, datastream.getStatus());
}
Expand Down Expand Up @@ -467,12 +472,16 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,
d.setStatus(DatastreamStatus.STOPPING);
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
// invoke post datastream state change action for recently stopped datastream
invokePostDSStateChangeAction(d);
} else if (DatastreamStatus.STOPPING.equals(d.getStatus())) {
// this check helps in preventing any datastream from being stuck in STOPPING state indefinitely
LOG.warn("Datastream {} is already in {} state. Notifying leader to initiate transition", d,
d.getStatus());
_store.updateDatastream(d.getName(), d, true);
_store.deleteDatastreamNumTasks(d.getName());
// invoke post datastream state change action for recently stopped datastream
invokePostDSStateChangeAction(d);
} else {
LOG.warn("Cannot stop datastream {}, as it is not in READY/PAUSED state. State: {}", d, d.getStatus());
}
Expand All @@ -491,6 +500,14 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,

LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName()));

// above check guarantees us that datastreams will be in STOPPED state only, so not need to fetch the latest state
// from store explicitly
datastreamsToStop.forEach(ds -> {
// set the status as STOPPED as the original status would be READY, PAUSED or STOPPING
ds.setStatus(DatastreamStatus.STOPPED);
invokePostDSStateChangeAction(ds);
});

return new ActionResult<>(HttpStatus.S_200_OK);
}

Expand Down Expand Up @@ -528,6 +545,8 @@ public ActionResult<Void> resume(@PathKeysParam PathKeys pathKeys,
DatastreamStatus.STOPPED.equals(datastream.getStatus())) {
d.setStatus(DatastreamStatus.READY);
_store.updateDatastream(d.getName(), d, true);
// invoke post datastream state change action for recently resumed datastream
invokePostDSStateChangeAction(d);
} else {
LOG.warn("Will not resume datastream {}, as it is not already in PAUSED/STOPPED state", d);
}
Expand Down Expand Up @@ -712,7 +731,8 @@ public ActionResult<Void> resumeSourcePartitions(@PathKeysParam PathKeys pathKey

@Override
public UpdateResponse delete(String datastreamName) {
if (null == _store.getDatastream(datastreamName)) {
final Datastream datastream = _store.getDatastream(datastreamName);
if (null == datastream) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND,
"Datastream requested to be deleted does not exist: " + datastreamName);
}
Expand All @@ -725,6 +745,9 @@ public UpdateResponse delete(String datastreamName) {
_store.deleteDatastream(datastreamName);
DELETE_CALL_LATENCY_MS.set(Duration.between(startTime, Instant.now()).toMillis());

// invoke post datastream state change action for recently deleted datastream
invokePostDSStateChangeAction(datastream);

return new UpdateResponse(HttpStatus.S_200_OK);
} catch (Exception e) {
_dynamicMetricsManager.createOrUpdateMeter(CLASS_NAME, CALL_ERROR, 1);
Expand Down Expand Up @@ -893,6 +916,10 @@ public CreateResponse create(Datastream datastream) {
CREATE_CALL_LATENCY_MS.set(delta.toMillis());

LOG.info("Datastream persisted to zookeeper, total time used: {} ms", delta.toMillis());

// invoke post datastream state change action for recently created datastream
invokePostDSStateChangeAction(datastream);

return new CreateResponse(datastream.getName(), HttpStatus.S_201_CREATED);
} catch (IllegalArgumentException e) {
_dynamicMetricsManager.createOrUpdateMeter(CLASS_NAME, CALL_ERROR, 1);
Expand Down Expand Up @@ -920,6 +947,17 @@ public CreateResponse create(Datastream datastream) {
return null;
}

private void invokePostDSStateChangeAction(Datastream datastream) {
try {
LOG.debug("Invoke post datastream state change action datastream={}", datastream);
_coordinator.invokePostDataStreamStateChangeAction(datastream);
LOG.info("Invoked post datastream state change action datastream={}", datastream);
} catch (DatastreamException e) {
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
"Failed to perform post datastream state change action datastream=" + datastream, e);
}
}

/**
* Get the list of metrics emitted by this class
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,64 @@ public void testDatastreamWithBYOT() throws Exception {
zkClient.close();
}

@Test
public void testInvokePostDataStreamStateChangeAction() throws Exception {
String testCluster = "testDatastreamCreateUpdateDelete";
String connectorType = "connectorType";

TestHookConnector connector1 = new TestHookConnector("connector1", connectorType);
Coordinator coordinator1 = createCoordinator(_zkConnectionString, testCluster);
coordinator1.addConnector(connectorType, connector1, new BroadcastStrategy(Optional.empty()),
false, new SourceBasedDeduper(), null);
coordinator1.start();

TestHookConnector connector2 = new TestHookConnector("connector2", connectorType);
Coordinator coordinator2 = createCoordinator(_zkConnectionString, testCluster);
coordinator2.addConnector(connectorType, connector2, new BroadcastStrategy(Optional.empty()),
false, new SourceBasedDeduper(), null);
coordinator2.start();
ZkClient zkClient = new ZkClient(_zkConnectionString);

// create datastream
Datastream[] list = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType,
"datastream1", "datastream2");
Datastream datastream1 = list[0];
LOG.info("Created datastream1: {}", datastream1);
Datastream datastream2 = list[1];
LOG.info("Created datastream2: {}", datastream2);

coordinator1.invokePostDataStreamStateChangeAction(datastream1);
Assert.assertTrue(coordinator1.getIsLeader().getAsBoolean());
// post datastream state change action method should be invoked for created datastream
Assert.assertEquals(connector1.getPostDSStatechangeActionInvokeCount(), 1);
Assert.assertEquals(connector2.getPostDSStatechangeActionInvokeCount(), 0);

// wait for datastream to be READY
PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1")
.getStatus()
.equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS);
datastream1 = DatastreamTestUtils.getDatastream(zkClient, testCluster, datastream1.getName());

// update datastream
datastream1.getMetadata().put("key", "value");
datastream1.getSource().setConnectionString("newSource");

LOG.info("Updating datastream: {}", datastream1);
CachedDatastreamReader datastreamCache = new CachedDatastreamReader(zkClient, testCluster);
ZookeeperBackedDatastreamStore dsStore = new ZookeeperBackedDatastreamStore(datastreamCache, zkClient, testCluster);
DatastreamResources datastreamResources = new DatastreamResources(dsStore, coordinator1);
datastreamResources.update(datastream1.getName(), datastream1);

Assert.assertEquals(connector1.getPostDSStatechangeActionInvokeCount(), 2);
Assert.assertEquals(connector2.getPostDSStatechangeActionInvokeCount(), 0);

coordinator1.stop();
coordinator1.getDatastreamCache().getZkclient().close();
coordinator2.stop();
coordinator2.getDatastreamCache().getZkclient().close();
zkClient.close();
}

/**
* Test datastream creation with Connector-managed destination; coordinator should not create or delete topics.
*/
Expand Down Expand Up @@ -3110,6 +3168,7 @@ public class TestHookConnector implements Connector {
List<DatastreamTask> _tasks = new ArrayList<>();
String _instance = "";
String _name;
private int _postDSStatechangeActionInvokeCount;

/**
* Constructor for TestHookConnector
Expand Down Expand Up @@ -3141,6 +3200,10 @@ public List<DatastreamTask> getTasks() {
return _tasks;
}

public int getPostDSStatechangeActionInvokeCount() {
return _postDSStatechangeActionInvokeCount;
}

@Override
public void start(CheckpointProvider checkpointProvider) {
_isStarted = true;
Expand Down Expand Up @@ -3178,6 +3241,12 @@ public void validateUpdateDatastreams(List<Datastream> datastreams, List<Datastr
}
}

@Override
public void postDatastreamStateChangeAction(Datastream stream) throws DatastreamException {
++_postDSStatechangeActionInvokeCount;
Assert.assertNotNull(stream);
}

@Override
public String toString() {
return "Connector " + _name + ", StatusId: " + _connectorType + ", Instance: " + _instance;
Expand Down
Loading

0 comments on commit 4d0b312

Please sign in to comment.