Skip to content

Commit

Permalink
Started split of StreamPipelineService
Browse files Browse the repository at this point in the history
  • Loading branch information
c8y3 committed Nov 6, 2024
1 parent ac220d2 commit 62d664f
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (C) 2018 Airbus CyberSecurity (SAS)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package com.airbus_cyber_security.graylog.wizard.alert.business;

import com.airbus_cyber_security.graylog.wizard.alert.model.FieldRule;
import com.airbus_cyber_security.graylog.wizard.alert.rest.models.AlertRuleStream;
import com.airbus_cyber_security.graylog.wizard.database.Description;
import com.google.common.collect.Maps;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import org.bson.types.ObjectId;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.resources.streams.requests.CreateStreamRequest;
import org.graylog2.streams.StreamRuleImpl;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class StreamFacade {

private static final Logger LOG = LoggerFactory.getLogger(StreamFacade.class);

private final StreamService streamService;
private final String indexSetID;
private final StreamRuleService streamRuleService;
private final ClusterEventBus clusterEventBus;
private final FieldRulesUtilities fieldRulesUtilities;

@Inject
public StreamFacade(org.graylog2.streams.StreamService streamService,
StreamRuleService streamRuleService,
ClusterEventBus clusterEventBus,
IndexSetRegistry indexSetRegistry,
FieldRulesUtilities fieldRulesUtilities) {
this.streamService = streamService;
this.streamRuleService = streamRuleService;
this.clusterEventBus = clusterEventBus;
this.fieldRulesUtilities = fieldRulesUtilities;
this.indexSetID = indexSetRegistry.getDefault().getConfig().id();
}

public Stream createStream(Stream.MatchingType matchingType, String title, String userName, List<FieldRule> fieldRules) throws ValidationException {
Stream stream = this.createStream(matchingType, title, userName);
this.createStreamRule(fieldRules, stream.getId());
return stream;
}

public Stream createStream(Stream.MatchingType matchingType, String title, String userName) throws ValidationException {
LOG.debug("Create Stream: " + title);
CreateStreamRequest request = CreateStreamRequest.create(title, Description.COMMENT_ALERT_WIZARD,
Collections.emptyList(), "", matchingType.name(), false, indexSetID);
Stream stream = this.streamService.create(request, userName);
stream.setDisabled(false);

if (!stream.getIndexSet().getConfig().isWritable()) {
throw new BadRequestException("Assigned index set must be writable!");
}
this.streamService.save(stream);

return stream;
}

public void updateStream(Stream stream, AlertRuleStream alertRuleStream, String title) throws ValidationException {
LOG.debug("Update Stream: " + stream.getId());
stream.setTitle(title);
try {
stream.setMatchingType(alertRuleStream.getMatchingType());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid matching type '" + alertRuleStream.getMatchingType()
+ "' specified. Should be one of: " + Arrays.toString(Stream.MatchingType.values()));
}
this.streamService.save(stream);

//TODO do it better (don't destroy if update)
// Destroy existing stream rules
for (StreamRule streamRule: stream.getStreamRules()) {
this.streamRuleService.destroy(streamRule);
}
// Create stream rules.
createStreamRule(alertRuleStream.getFieldRules(), stream.getId());

this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
}

private void createStreamRule(List<FieldRule> fieldRules, String streamID) throws ValidationException {
List<FieldRule> streamFieldRules = this.getStreamFieldRules(fieldRules);
for (FieldRule fieldRule: streamFieldRules) {
Map<String, Object> streamRuleData = Maps.newHashMapWithExpectedSize(6);

if (fieldRule.getType() >= 0) {
streamRuleData.put(StreamRuleImpl.FIELD_TYPE, fieldRule.getType());
streamRuleData.put(StreamRuleImpl.FIELD_INVERTED, false);
} else {
streamRuleData.put(StreamRuleImpl.FIELD_TYPE, innerAbs(fieldRule.getType()));
streamRuleData.put(StreamRuleImpl.FIELD_INVERTED, true);
}
streamRuleData.put(StreamRuleImpl.FIELD_FIELD, fieldRule.getField());
streamRuleData.put(StreamRuleImpl.FIELD_VALUE, fieldRule.getValue());
streamRuleData.put(StreamRuleImpl.FIELD_STREAM_ID, new ObjectId(streamID));
streamRuleData.put(StreamRuleImpl.FIELD_DESCRIPTION, Description.COMMENT_ALERT_WIZARD);

StreamRule newStreamRule = this.streamRuleService.create(streamRuleData);
this.streamRuleService.save(newStreamRule);
}
}

private List<FieldRule> getStreamFieldRules(List<FieldRule> fieldRules) {
List<FieldRule> streamFieldRules = new ArrayList<FieldRule>();
for (FieldRule fieldRule: fieldRules) {
if (this.fieldRulesUtilities.isListFieldRule(fieldRule)) {
continue;
}
streamFieldRules.add(fieldRule);
}
return streamFieldRules;
}

private int innerAbs(int value) {
if (value < 0) {
return -value;
} else {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package com.airbus_cyber_security.graylog.wizard.alert.business;

import com.airbus_cyber_security.graylog.wizard.alert.rest.models.AlertRuleStream;
import com.airbus_cyber_security.graylog.wizard.alert.model.FieldRule;
import com.airbus_cyber_security.graylog.wizard.database.Description;
import com.airbus_cyber_security.graylog.wizard.database.LookupService;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomStringUtils;
import org.bson.types.ObjectId;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
Expand All @@ -32,14 +29,8 @@
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.resources.streams.requests.CreateStreamRequest;
import org.graylog2.streams.StreamGuardException;
import org.graylog2.streams.StreamRuleImpl;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.events.StreamDeletedEvent;
import org.graylog2.streams.events.StreamsChangedEvent;
Expand All @@ -49,27 +40,22 @@
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

// TODO split into StreamService and PipelineService
// TODO move stream related code into StreamFacade
// and rename to PipelineFacade
public class StreamPipelineService {

private static final Logger LOG = LoggerFactory.getLogger(StreamPipelineService.class);
private static final String RANDOM_CHARS = "0123456789abcdef";
private static final int RANDOM_COUNT = 24;

private final StreamService streamService;
private final StreamRuleService streamRuleService;
private final ClusterEventBus clusterEventBus;
private final String indexSetID;
private final RuleService ruleService;
private final PipelineService pipelineService;
private final LookupService lookupService;
Expand All @@ -78,58 +64,21 @@ public class StreamPipelineService {

@Inject
public StreamPipelineService(StreamService streamService,
StreamRuleService streamRuleService,
ClusterEventBus clusterEventBus,
IndexSetRegistry indexSetRegistry,
RuleService ruleService,
PipelineService pipelineService,
LookupService lookupService,
PipelineStreamConnectionsService pipelineStreamConnectionsService,
FieldRulesUtilities fieldRulesUtilities) {
this.streamService = streamService;
this.streamRuleService = streamRuleService;
this.clusterEventBus = clusterEventBus;
this.indexSetID = indexSetRegistry.getDefault().getConfig().id();
this.ruleService = ruleService;
this.pipelineService = pipelineService;
this.pipelineStreamConnectionsService = pipelineStreamConnectionsService;
this.lookupService = lookupService;
this.fieldRulesUtilities = fieldRulesUtilities;
}

// TODO should have only non field rules here
private void createStreamRule(List<FieldRule> fieldRules, String streamID) throws ValidationException {
for (FieldRule fieldRule: fieldRules) {
if (this.fieldRulesUtilities.isListFieldRule(fieldRule)) {
continue;
}
Map<String, Object> streamRuleData = Maps.newHashMapWithExpectedSize(6);

if (fieldRule.getType() >= 0) {
streamRuleData.put(StreamRuleImpl.FIELD_TYPE, fieldRule.getType());
streamRuleData.put(StreamRuleImpl.FIELD_INVERTED, false);
} else {
streamRuleData.put(StreamRuleImpl.FIELD_TYPE, innerAbs(fieldRule.getType()));
streamRuleData.put(StreamRuleImpl.FIELD_INVERTED, true);
}
streamRuleData.put(StreamRuleImpl.FIELD_FIELD, fieldRule.getField());
streamRuleData.put(StreamRuleImpl.FIELD_VALUE, fieldRule.getValue());
streamRuleData.put(StreamRuleImpl.FIELD_STREAM_ID, new ObjectId(streamID));
streamRuleData.put(StreamRuleImpl.FIELD_DESCRIPTION, Description.COMMENT_ALERT_WIZARD);

StreamRule newStreamRule = this.streamRuleService.create(streamRuleData);
this.streamRuleService.save(newStreamRule);
}
}

private int innerAbs(int value) {
if (value < 0) {
return -value;
} else {
return value;
}
}

private String createStringField(FieldRule fieldRule, boolean negate) {
String rule = " (";
rule += "has_field(\"" + fieldRule.getField() + "\")";
Expand Down Expand Up @@ -214,49 +163,6 @@ public void deletePipeline(String pipelineID, String ruleID){
}
}

public Stream createStream(Stream.MatchingType matchingType, String title, String userName, List<FieldRule> fieldRules) throws ValidationException {
Stream stream = this.createStream(matchingType, title, userName);
this.createStreamRule(fieldRules, stream.getId());
return stream;
}

public Stream createStream(Stream.MatchingType matchingType, String title, String userName) throws ValidationException {
LOG.debug("Create Stream: " + title);
CreateStreamRequest request = CreateStreamRequest.create(title, Description.COMMENT_ALERT_WIZARD,
Collections.emptyList(), "", matchingType.name(), false, indexSetID);
Stream stream = this.streamService.create(request, userName);
stream.setDisabled(false);

if (!stream.getIndexSet().getConfig().isWritable()) {
throw new BadRequestException("Assigned index set must be writable!");
}
this.streamService.save(stream);

return stream;
}

public void updateStream(Stream stream, AlertRuleStream alertRuleStream, String title) throws ValidationException {
LOG.debug("Update Stream: " + stream.getId());
stream.setTitle(title);
try {
stream.setMatchingType(alertRuleStream.getMatchingType());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid matching type '" + alertRuleStream.getMatchingType()
+ "' specified. Should be one of: " + Arrays.toString(Stream.MatchingType.values()));
}
this.streamService.save(stream);

//TODO do it better (don't destroy if update)
// Destroy existing stream rules
for (StreamRule streamRule: stream.getStreamRules()) {
this.streamRuleService.destroy(streamRule);
}
// Create stream rules.
createStreamRule(alertRuleStream.getFieldRules(), stream.getId());

this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
}

public void deleteStreamFromIdentifier(String streamIdentifier){
try {
Stream stream = this.streamService.load(streamIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,24 @@
public class TriggeringConditionsService {

private final StreamPipelineService streamPipelineService;
private final StreamFacade streamService;
private final AlertListUtilsService alertListUtilsService;
private final FieldRulesUtilities fieldRulesUtilities;

@Inject
public TriggeringConditionsService(StreamPipelineService streamPipelineService,
public TriggeringConditionsService(StreamFacade streamService,
StreamPipelineService streamPipelineService,
AlertListUtilsService alertListUtilsService,
FieldRulesUtilities fieldRulesUtilities) {
this.streamService = streamService;
this.streamPipelineService = streamPipelineService;

this.alertListUtilsService = alertListUtilsService;
this.fieldRulesUtilities = fieldRulesUtilities;
}

public TriggeringConditions createTriggeringConditions(AlertRuleStream streamConfiguration, String title, String userName) throws ValidationException {
Stream filteringStream = this.streamPipelineService.createStream(streamConfiguration.getMatchingType(), title, userName, streamConfiguration.getFieldRules());
Stream filteringStream = this.streamService.createStream(streamConfiguration.getMatchingType(), title, userName, streamConfiguration.getFieldRules());
return createTriggeringConditionsFromStream(streamConfiguration, title, filteringStream.getId(), userName);
}

Expand All @@ -59,7 +63,7 @@ public TriggeringConditions updateTriggeringConditions(TriggeringConditions prev
// update filtering stream
String streamIdentifier = previousConditions.filteringStreamIdentifier();
Stream stream = this.streamPipelineService.loadStream(streamIdentifier);
this.streamPipelineService.updateStream(stream, streamConfiguration, alertTitle);
this.streamService.updateStream(stream, streamConfiguration, alertTitle);

if (!previousConditions.outputStreamIdentifier().equals(streamIdentifier)) {
this.streamPipelineService.deleteStreamFromIdentifier(previousConditions.outputStreamIdentifier());
Expand Down Expand Up @@ -135,7 +139,7 @@ private TriggeringConditions createTriggeringConditionsFromStream(AlertRuleStrea

if (!this.fieldRulesUtilities.hasStreamRules(streamConfiguration.getFieldRules())) {
PipelineDao graylogPipeline = this.streamPipelineService.createPipeline(title, matchingType, Stream.DEFAULT_STREAM_ID);
Stream outputStream = this.streamPipelineService.createStream(matchingType, title + " output", userName);
Stream outputStream = this.streamService.createStream(matchingType, title + " output", userName);
RuleDao pipelineRule = this.streamPipelineService.createPipelineRule(title, fieldRulesWithList, matchingType, outputStream.getId());
Pipeline pipeline = Pipeline.builder()
.identifier(graylogPipeline.id()).ruleIdentifier(pipelineRule.id()).fieldRules(fieldRulesWithList)
Expand All @@ -150,7 +154,7 @@ private TriggeringConditions createTriggeringConditionsFromStream(AlertRuleStrea
return builder.outputStreamIdentifier(filteringStreamIdentifier).pipeline(pipeline).build();
} else {
PipelineDao graylogPipeline = this.streamPipelineService.createPipeline(title, matchingType, filteringStreamIdentifier);
Stream outputStream = this.streamPipelineService.createStream(matchingType, title + " output", userName);
Stream outputStream = this.streamService.createStream(matchingType, title + " output", userName);
RuleDao pipelineRule = this.streamPipelineService.createPipelineRule(title, fieldRulesWithList, matchingType, outputStream.getId());
Pipeline pipeline = Pipeline.builder()
.identifier(graylogPipeline.id()).ruleIdentifier(pipelineRule.id()).fieldRules(fieldRulesWithList)
Expand Down

0 comments on commit 62d664f

Please sign in to comment.