Skip to content

Commit

Permalink
KAFKA-2795: fix potential NPE in GroupMetadataManager.addGroup
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <[email protected]>

Reviewers: Onur Karaman, Guozhang Wang

Closes #488 from hachikuji/KAFKA-2795
  • Loading branch information
hachikuji authored and Geoff Anderson committed Nov 18, 2015
1 parent 9c97206 commit 9b331c9
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ class GroupMetadataManager(val brokerId: Int,
/* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
private val offsetExpireLock = new ReentrantReadWriteLock()

/* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */
private val offsetRemoveLock = new ReentrantReadWriteLock()

/* shutting down flag */
private val shuttingDown = new AtomicBoolean(false)

Expand Down Expand Up @@ -116,12 +113,12 @@ class GroupMetadataManager(val brokerId: Int,
* Add a group or get the group associated with the given groupId if it already exists
*/
def addGroup(groupId: String, protocolType: String): GroupMetadata = {
addGroup(groupId, new GroupMetadata(groupId, protocolType))
}

private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = {
groupsCache.putIfNotExists(groupId, group)
groupsCache.get(groupId)
val newGroup = new GroupMetadata(groupId, protocolType)
val currentGroup = groupsCache.putIfNotExists(groupId, newGroup)
if (currentGroup != null)
currentGroup
else
newGroup
}

/**
Expand Down

0 comments on commit 9b331c9

Please sign in to comment.