Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8526
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Mar 14, 2023
1 parent feae75e commit 2a645b0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
7 changes: 6 additions & 1 deletion cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ func NewKafkaDDLSink(
// We must close adminClient when this func return cause by an error
// otherwise the adminClient will never be closed and lead to a goroutine leak.
defer func() {
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
if err != nil {
if closeErr := adminClient.Close(); closeErr != nil {
log.Error("Close admin client failed in kafka "+
"DDL sink", zap.Error(closeErr))
}
=======
if err != nil && adminClient != nil {
adminClient.Close()
>>>>>>> f689c92f05 (kafka(ticdc): fix kafka sink panic when kill the downstream kafka (#8526)):cdc/sink/ddlsink/mq/kafka_ddl_sink.go
}
}()

Expand Down Expand Up @@ -89,7 +94,7 @@ func NewKafkaDDLSink(
// Preventing leaks when error occurs.
// This also closes the client in p.Close().
defer func() {
if err != nil {
if err != nil && p != nil {
p.Close()
}
}()
Expand Down
7 changes: 6 additions & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ func NewKafkaDMLSink(
// We must close adminClient when this func return cause by an error
// otherwise the adminClient will never be closed and lead to a goroutine leak.
defer func() {
<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
if err != nil {
if closeErr := adminClient.Close(); closeErr != nil {
log.Error("Close admin client failed in kafka "+
"DML sink", zap.Error(closeErr))
}
=======
if err != nil && adminClient != nil {
adminClient.Close()
>>>>>>> f689c92f05 (kafka(ticdc): fix kafka sink panic when kill the downstream kafka (#8526)):cdc/sink/dmlsink/mq/kafka_dml_sink.go
}
}()

Expand Down Expand Up @@ -88,7 +93,7 @@ func NewKafkaDMLSink(
// Preventing leaks when error occurs.
// This also closes the client in p.Close().
defer func() {
if err != nil {
if err != nil && p != nil {
p.Close()
}
}()
Expand Down

0 comments on commit 2a645b0

Please sign in to comment.