-
Notifications
You must be signed in to change notification settings - Fork 34
/
PaymentObserverBeans.java
102 lines (92 loc) · 4.04 KB
/
PaymentObserverBeans.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package org.stellar.anchor.platform.component.observer;
import java.util.List;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.stellar.anchor.api.exception.ServerErrorException;
import org.stellar.anchor.api.sep.AssetInfo;
import org.stellar.anchor.apiclient.PlatformApiClient;
import org.stellar.anchor.asset.AssetService;
import org.stellar.anchor.config.AppConfig;
import org.stellar.anchor.platform.config.PaymentObserverConfig;
import org.stellar.anchor.platform.config.RpcConfig;
import org.stellar.anchor.platform.data.JdbcSep24TransactionStore;
import org.stellar.anchor.platform.data.JdbcSep31TransactionStore;
import org.stellar.anchor.platform.data.JdbcSep6TransactionStore;
import org.stellar.anchor.platform.observer.PaymentListener;
import org.stellar.anchor.platform.observer.stellar.PaymentObservingAccountsManager;
import org.stellar.anchor.platform.observer.stellar.StellarPaymentObserver;
import org.stellar.anchor.platform.observer.stellar.StellarPaymentStreamerCursorStore;
import org.stellar.anchor.platform.service.PaymentOperationToEventListener;
@Configuration
public class PaymentObserverBeans {
@Bean
@SneakyThrows
public StellarPaymentObserver stellarPaymentObserver(
AssetService assetService,
List<PaymentListener> paymentListeners,
StellarPaymentStreamerCursorStore stellarPaymentStreamerCursorStore,
PaymentObservingAccountsManager paymentObservingAccountsManager,
AppConfig appConfig,
PaymentObserverConfig paymentObserverConfig) {
// validate assetService
if (assetService == null || assetService.listAllAssets() == null) {
throw new ServerErrorException("Asset service cannot be empty.");
}
List<AssetInfo> stellarAssets =
assetService.listAllAssets().stream()
.filter(asset -> asset.getSchema().equals(AssetInfo.Schema.stellar))
.collect(Collectors.toList());
if (stellarAssets.size() == 0) {
throw new ServerErrorException("Asset service should contain at least one Stellar asset.");
}
// validate paymentListeners
if (paymentListeners == null || paymentListeners.size() == 0) {
throw new ServerErrorException(
"The stellar payment observer service needs at least one listener.");
}
// validate paymentStreamerCursorStore
if (stellarPaymentStreamerCursorStore == null) {
throw new ServerErrorException("Payment streamer cursor store cannot be empty.");
}
// validate appConfig
if (appConfig == null) {
throw new ServerErrorException("AppConfig cannot be empty.");
}
if (paymentObserverConfig == null) {
throw new ServerErrorException("PaymentObserverConfig cannot be empty.");
}
StellarPaymentObserver stellarPaymentObserver =
new StellarPaymentObserver(
appConfig.getHorizonUrl(),
paymentObserverConfig.getStellar(),
paymentListeners,
paymentObservingAccountsManager,
stellarPaymentStreamerCursorStore);
// Add distribution wallet to the observing list as type RESIDENTIAL
for (AssetInfo assetInfo : stellarAssets) {
if (!paymentObservingAccountsManager.lookupAndUpdate(assetInfo.getDistributionAccount())) {
paymentObservingAccountsManager.upsert(
assetInfo.getDistributionAccount(),
PaymentObservingAccountsManager.AccountType.RESIDENTIAL);
}
}
stellarPaymentObserver.start();
return stellarPaymentObserver;
}
@Bean
public PaymentOperationToEventListener paymentOperationToEventListener(
JdbcSep31TransactionStore sep31TransactionStore,
JdbcSep24TransactionStore sep24TransactionStore,
JdbcSep6TransactionStore sep6TransactionStore,
PlatformApiClient platformApiClient,
RpcConfig rpcConfig) {
return new PaymentOperationToEventListener(
sep31TransactionStore,
sep24TransactionStore,
sep6TransactionStore,
platformApiClient,
rpcConfig);
}
}