Skip to content

Commit

Permalink
[homematic] Solves stability issues with HmIP devices (openhab#7902)
Browse files Browse the repository at this point in the history
* Solves stability issues with HmIP devices

The CCU gets unresponsive if several HmIP devices are installed and
always "non-blocking" requests are used. By reducing the use to what is
absolutely necessary, the problem can be avoide.

Fixes openhab#7762

Signed-off-by: Martin Herbst <[email protected]>
  • Loading branch information
MHerbst authored and andrewfg committed Aug 31, 2020
1 parent 62e50ff commit 4e2fd4a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*/
public abstract class RpcClient<T> {
private final Logger logger = LoggerFactory.getLogger(RpcClient.class);
protected static final int MAX_RPC_RETRY = 1;
protected static final int MAX_RPC_RETRY = 3;
protected static final int RESP_BUFFER_SIZE = 8192;

protected HomematicConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.xml.parsers.ParserConfigurationException;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
Expand All @@ -31,6 +37,7 @@
import org.openhab.binding.homematic.internal.communicator.parser.RpcResponseParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

/**
* Client implementation for sending messages via XML-RPC to the Homematic server.
Expand Down Expand Up @@ -68,57 +75,74 @@ protected synchronized Object[] sendMessage(int port, RpcRequest<String> request
if (logger.isTraceEnabled()) {
logger.trace("Client XmlRpcRequest (port {}):\n{}", port, request);
}
return sendMessage(port, request, 0);
IOException reason = new IOException();
for (int rpcRetryCounter = 1; rpcRetryCounter <= MAX_RPC_RETRY; rpcRetryCounter++) {
try {
byte[] response = send(port, request);
Object[] data = new XmlRpcResponse(new ByteArrayInputStream(response), config.getEncoding())
.getResponseData();
return new RpcResponseParser(request).parse(data);
} catch (UnknownRpcFailureException | UnknownParameterSetException ex) {
throw ex;
} catch (SAXException | ParserConfigurationException ex) {
throw new IOException(ex);
} catch (IOException ex) {
reason = ex;
if ("init".equals(request.getMethodName())) { // no retries for "init" request
break;
}
logger.debug("XmlRpcMessage failed, sending message again {}/{}", rpcRetryCounter, MAX_RPC_RETRY);
}
}
throw reason;
}

/**
* Sends the message, retries if there was an error.
*/
private synchronized Object[] sendMessage(int port, RpcRequest<String> request, int rpcRetryCounter)
throws IOException {
private byte[] send(int port, RpcRequest<String> request) throws IOException {
byte[] ret = new byte[0];
try {
BytesContentProvider content = new BytesContentProvider(
request.createMessage().getBytes(config.getEncoding()));
String url = String.format("http://%s:%s", config.getGatewayAddress(), port);
if (port == config.getGroupPort()) {
url += "/groups";
}
InputStreamResponseListener respListener = new InputStreamResponseListener();
httpClient.POST(url).content(content)
.header(HttpHeader.CONTENT_TYPE, "text/xml;charset=" + config.getEncoding()).send(respListener);
Response resp = respListener.get(config.getTimeout(), TimeUnit.SECONDS);
ByteArrayOutputStream respData = new ByteArrayOutputStream(RESP_BUFFER_SIZE);
int httpStatus = resp.getStatus();
if (httpStatus == HttpStatus.OK_200) {
byte[] recvBuffer = new byte[RESP_BUFFER_SIZE];
InputStream input = respListener.getInputStream();
while (true) {
int read = input.read(recvBuffer);
if (read == -1) {
break;
Request req = httpClient.POST(url).content(content).timeout(config.getTimeout(), TimeUnit.SECONDS)
.header(HttpHeader.CONTENT_TYPE, "text/xml;charset=" + config.getEncoding());
try {
ret = req.send().getContent();
} catch (IllegalArgumentException e) { // Returned buffer too large
logger.info("Blocking XmlRpcRequest failed: {}, trying non-blocking request", e.getMessage());
InputStreamResponseListener respListener = new InputStreamResponseListener();
req.send(respListener);
Response resp = respListener.get(config.getTimeout(), TimeUnit.SECONDS);
ByteArrayOutputStream respData = new ByteArrayOutputStream(RESP_BUFFER_SIZE);

int httpStatus = resp.getStatus();
if (httpStatus == HttpStatus.OK_200) {
byte[] recvBuffer = new byte[RESP_BUFFER_SIZE];
try (InputStream input = respListener.getInputStream()) {
while (true) {
int read = input.read(recvBuffer);
if (read == -1) {
break;
}
respData.write(recvBuffer, 0, read);
}
ret = respData.toByteArray();
}
respData.write(recvBuffer, 0, read);
} else {
logger.warn("Non-blocking XmlRpcRequest failed, status code: {} / request: {}", httpStatus,
request);
resp.abort(new IOException());
}
} else {
logger.warn("XmlRpcRequest failure, status code: {} / request was: {}", httpStatus, request);
resp.abort(new Exception());
}
if (logger.isTraceEnabled()) {
logger.trace("Client XmlRpcResponse: (port {}):\n{}", port, respData.toString(config.getEncoding()));
}
Object[] data = new XmlRpcResponse(new ByteArrayInputStream(respData.toByteArray()), config.getEncoding())
.getResponseData();
return new RpcResponseParser(request).parse(data);
} catch (UnknownRpcFailureException | UnknownParameterSetException ex) {
throw ex;
} catch (Exception ex) {
if ("init".equals(request.getMethodName()) || rpcRetryCounter >= MAX_RPC_RETRY) {
throw new IOException(ex.getMessage(), ex);
} else {
rpcRetryCounter++;
logger.debug("XmlRpcMessage failure, sending message again {}/{}", rpcRetryCounter, MAX_RPC_RETRY);
return sendMessage(port, request, rpcRetryCounter);
String result = new String(ret, config.getEncoding());
logger.trace("Client XmlRpcResponse (port {}):\n{}", port, result);
}
} catch (UnsupportedEncodingException | ExecutionException | TimeoutException | InterruptedException e) {
throw new IOException(e);
}
return ret;
}
}

0 comments on commit 4e2fd4a

Please sign in to comment.