Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat(security): handle list mechanism response #136

Merged
merged 31 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,35 @@
*/
package com.xiaomi.infra.pegasus.rpc.async;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

import com.xiaomi.infra.pegasus.apps.negotiation_request;
import com.xiaomi.infra.pegasus.apps.negotiation_response;
import com.xiaomi.infra.pegasus.apps.negotiation_status;
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.operator.negotiation_operator;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import org.slf4j.Logger;

public class Negotiation {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class);
// Because negotiation message is always the first rpc sent to pegasus server,
// which will cost much more time. so we set negotiation timeout to 10s here
private static final int negotiationTimeoutMS = 10000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negotiation should be set dynamically by the user timeout. It's under the user's control that how long an RPC should last.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it must under the user's control. The negotiation process should be not visable to users

private static final List<String> expectedMechanisms =
new ArrayList<>(Collections.singletonList("GSSAPI"));

private negotiation_status status;
private ReplicaSession session;
private String serviceName; // used for SASL authentication
private String serviceFqdn; // name used for SASL authentication
private final HashMap<String, Object> props = new HashMap<String, Object>();
private final Subject subject;
public SaslWrapper saslWrapper;

public Negotiation(
ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) {
ReplicaSession session, Subject subject, String serviceName, String serviceFQDN) {
this.saslWrapper = new SaslWrapper(subject, serviceName, serviceFQDN);
this.session = session;
this.subject = subject;
this.serviceName = serviceName;
this.serviceFqdn = serviceFqdn;
this.props.put(Sasl.QOP, "auth");
}

public void start() {
Expand All @@ -53,11 +55,14 @@ public void start() {
}

public void send(negotiation_status status, blob msg) {
// TODO: send negotiation message, using RecvHandler to handle the corresponding response.
negotiation_request request = new negotiation_request(status, msg);
negotiation_operator operator = new negotiation_operator(request);
session.asyncSend(
operator, new RecvHandler(operator), negotiationTimeoutMS, /* isBackupRequest */ false);
}

private class RecvHandler implements Runnable {
negotiation_operator op;
private negotiation_operator op;

RecvHandler(negotiation_operator op) {
this.op = op;
Expand All @@ -72,6 +77,7 @@ public void run() {
handleResponse();
} catch (Exception e) {
logger.error("Negotiation failed", e);
negotiationFailed();
}
}

Expand All @@ -81,19 +87,58 @@ private void handleResponse() throws Exception {
throw new Exception("RecvHandler received a null response, abandon it");
}

switch (resp.status) {
case SASL_LIST_MECHANISMS_RESP:
case SASL_SELECT_MECHANISMS_RESP:
case SASL_CHALLENGE:
case SASL_SUCC:
switch (status) {
case SASL_LIST_MECHANISMS:
onRecvMechanisms(resp);
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
break;
case SASL_SELECT_MECHANISMS:
case SASL_INITIATE:
case SASL_CHALLENGE_RESP:
// TBD(zlw):
break;
default:
throw new Exception("Received an unexpected response, status " + resp.status);
throw new Exception("unexpected negotiation status: " + resp.status);
}
}
}

public void onRecvMechanisms(negotiation_response response) throws Exception {
checkStatus(response.status, negotiation_status.SASL_LIST_MECHANISMS_RESP);

String[] matchMechanism = new String[1];
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
matchMechanism[0] = getMatchMechanism(new String(response.msg.data));
blob msg = new blob(saslWrapper.init(matchMechanism));

status = negotiation_status.SASL_SELECT_MECHANISMS;
send(status, msg);
}

public String getMatchMechanism(String respString) {
String matchMechanism = new String();
String[] serverSupportMechanisms = respString.split(",");
for (String serverSupportMechanism : serverSupportMechanisms) {
if (expectedMechanisms.contains(serverSupportMechanism)) {
matchMechanism = serverSupportMechanism;
break;
}
}

return matchMechanism;
}

public void checkStatus(negotiation_status status, negotiation_status expected_status)
throws Exception {
if (status != expected_status) {
throw new Exception("status is " + status + " while expect " + expected_status);
}
}

private void negotiationFailed() {
status = negotiation_status.SASL_AUTH_FAIL;
session.closeSession();
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

public negotiation_status get_status() {
public negotiation_status getStatus() {
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTa
}

private void write(final RequestEntry entry, VolatileFields cache) {
if (!interceptorManager.onSendMessage(this, entry)) {
return;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

cache
.nettyChannel
.writeAndFlush(entry)
Expand Down Expand Up @@ -381,6 +385,26 @@ public void run() {
TimeUnit.MILLISECONDS);
}

// return value:
// true - pend succeed
// false - pend failed
public boolean tryPendRequest(RequestEntry entry) {
// double check. the first one doesn't lock the lock.
// Because negotiationSucceed only transfered from false to true.
// So if it is true now, it will not change in the later.
// But if it is false now, maybe it will change soon. So we should use lock to protect it.
if (!this.negotiationSucceed) {
synchronized (negotiationPendingSend) {
if (!this.negotiationSucceed) {
negotiationPendingSend.offer(entry);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}
}

return false;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Your design here binds the responsibility of negotiation to ReplicaSession again. Why couldn't ReplicaSessionInterceptor hold the pending queue?

  2. negotiationPendingSend is declared final, but you offer items here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to hold pending queue by ReplicaSessionInterceptor.

  1. ReplicaSessionInterceptor is a singleton, which will serves all of the ReplicaSession. So if we do this like you said, we must use a coarse-grained lock, which will produce low effiency
  2. If we use ReplicaSessionInterceptor to do this, we must implement a lot of logic in it. For example timeout handling, which will mantain a timeout pendingResponse queue and a PendingSend, and so on. It's too complicated, and there is a lot of overlap code with ReplicaSession. And we should modify ReplicaSession to support it.


final class DefaultHandler extends SimpleChannelInboundHandler<RequestEntry> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Expand Down Expand Up @@ -445,6 +469,8 @@ static final class VolatileFields {
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private ReplicaSessionInterceptorManager interceptorManager;
private boolean negotiationSucceed;
final Queue<RequestEntry> negotiationPendingSend = new LinkedList<>();

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/SaslWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.xiaomi.infra.pegasus.rpc.async;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;

public class SaslWrapper {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
private SaslClient saslClient;
private Subject subject;
private String serviceName;
private String serviceFQDN;
private HashMap<String, Object> properties = new HashMap<>();

public SaslWrapper(Subject subject, String serviceName, String serviceFQDN) {
this.subject = subject;
this.serviceName = serviceName;
this.serviceFQDN = serviceFQDN;
this.properties.put(Sasl.QOP, "auth");
}

public byte[] init(String[] mechanims) throws PrivilegedActionException {
return Subject.doAs(
subject,
(PrivilegedExceptionAction<byte[]>)
() -> {
saslClient =
Sasl.createSaslClient(
mechanims, null, serviceName, serviceFQDN, properties, null);
return saslClient.getMechanismName().getBytes();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@
public interface ReplicaSessionInterceptor {
// The behavior when a rpc session is connected.
void onConnected(ReplicaSession session);

// The behavior when rpc session is sending a message.
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public void onConnected(ReplicaSession session) {
interceptor.onConnected(session);
}
}

public boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry) {
boolean ret = true;
for (ReplicaSessionInterceptor interceptor : interceptors) {
ret &= interceptor.onSendMessage(session, entry);
}
return ret;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.sun.security.auth.callback.TextCallbackHandler;
import com.xiaomi.infra.pegasus.operator.negotiation_operator;
import com.xiaomi.infra.pegasus.rpc.async.Negotiation;
import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;
import javax.security.auth.Subject;
Expand All @@ -41,7 +42,15 @@ public SecurityReplicaSessionInterceptor(String serviceName, String serviceFqdn)
this.serviceFqdn = serviceFqdn;

try {
// The LoginContext class provides the basic methods used to authenticate subjects, and
// provides a way to develop an application independent of the underlying authentication
// technology. For more details:
// https://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html#LoginContext

// let the LoginContext instantiate a new Subject
loginContext = new LoginContext("client", new TextCallbackHandler());

// authenticate the Subject
loginContext.login();

subject = loginContext.getSubject();
Expand All @@ -59,4 +68,13 @@ public void onConnected(ReplicaSession session) {
Negotiation negotiation = new Negotiation(session, subject, serviceName, serviceFqdn);
negotiation.start();
}

public boolean onSendMessage(ReplicaSession session, final ReplicaSession.RequestEntry entry) {
// tryPendRequest returns false means that the negotiation is succeed now
return isNegotiationRequest(entry) || !session.tryPendRequest(entry);
}

private boolean isNegotiationRequest(final ReplicaSession.RequestEntry entry) {
return entry.op instanceof negotiation_operator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,82 @@
*/
package com.xiaomi.infra.pegasus.rpc.async;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

import static com.xiaomi.infra.pegasus.apps.negotiation_status.SASL_LIST_MECHANISMS;
import static org.mockito.ArgumentMatchers.any;

import com.xiaomi.infra.pegasus.apps.negotiation_response;
import com.xiaomi.infra.pegasus.apps.negotiation_status;
import com.xiaomi.infra.pegasus.base.blob;
import javax.security.auth.Subject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

public class NegotiationTest {
private Negotiation negotiation = new Negotiation(null, new Subject(), "", "");

@Test
public void testStart() {
Negotiation negotiation = new Negotiation(null, null, "", "");
Negotiation mockNegotiation = Mockito.spy(negotiation);

Mockito.doNothing().when(mockNegotiation).send(any(), any());
mockNegotiation.start();
Assert.assertEquals(mockNegotiation.get_status(), SASL_LIST_MECHANISMS);
Assert.assertEquals(mockNegotiation.getStatus(), negotiation_status.SASL_LIST_MECHANISMS);
}

@Test
public void tetGetMatchMechanism() {
String matchMechanism = negotiation.getMatchMechanism("GSSAPI,ABC");
Assert.assertEquals(matchMechanism, "GSSAPI");

matchMechanism = negotiation.getMatchMechanism("TEST,ABC");
Assert.assertEquals(matchMechanism, "");
}

@Test
public void testCheckStatus() {
negotiation_status expectedStatus = negotiation_status.SASL_LIST_MECHANISMS;

Assertions.assertDoesNotThrow(
() -> negotiation.checkStatus(negotiation_status.SASL_LIST_MECHANISMS, expectedStatus));

Assertions.assertThrows(
Exception.class,
() ->
negotiation.checkStatus(negotiation_status.SASL_LIST_MECHANISMS_RESP, expectedStatus));
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

@Test
public void testRecvMechanisms() {
Negotiation mockNegotiation = Mockito.spy(negotiation);
SaslWrapper mockSaslWrapper = Mockito.mock(SaslWrapper.class);
mockNegotiation.saslWrapper = mockSaslWrapper;

Mockito.doNothing().when(mockNegotiation).send(any(), any());
Assertions.assertDoesNotThrow(
() -> {
Mockito.when(mockNegotiation.saslWrapper.init(any())).thenReturn(new byte[0]);
});

// normal case
Assertions.assertDoesNotThrow(
() -> {
negotiation_response response =
new negotiation_response(
negotiation_status.SASL_LIST_MECHANISMS_RESP, new blob(new byte[0]));
mockNegotiation.onRecvMechanisms(response);
Assert.assertEquals(
mockNegotiation.getStatus(), negotiation_status.SASL_SELECT_MECHANISMS);
});

// deal with wrong response.status
Assertions.assertThrows(
Exception.class,
() -> {
negotiation_response response =
new negotiation_response(
negotiation_status.SASL_LIST_MECHANISMS, new blob(new byte[0]));
mockNegotiation.onRecvMechanisms(response);
});
}
}