Skip to content

Commit

Permalink
Make Searchpe work with minimun resources (#240)
Browse files Browse the repository at this point in the history
* Limit resources configuration

* Change properties
  • Loading branch information
carlosthe19916 authored May 7, 2022
1 parent aa8e94a commit 82e1ed7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 68 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-narayana-jta</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
package io.github.project.openubl.searchpe.services;

import io.github.project.openubl.searchpe.utils.FileHelper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Path;
import java.util.UUID;

Expand All @@ -51,20 +54,33 @@ public File downloadFile() throws IOException {
Path workingDirectoryPath = workingDirectoryFile.toPath();

if (!workingDirectoryFile.exists()) {
workingDirectoryFile.mkdir();
LOGGER.infof("Directory %s created", workingDirectoryFile.toString());
boolean directoryCreated = workingDirectoryFile.mkdir();
if (directoryCreated) {
LOGGER.infof("Directory %s created", workingDirectoryFile.toString());
}
}

// Downloading
String zipFileName = UUID.randomUUID().toString() + ".zip";
File zipFile = workingDirectoryPath.resolve(zipFileName).toFile();
URL zipFileURL = new URL(zipURL);

LOGGER.infof("Downloading %s into %s", zipFileURL.toString(), zipFile);
FileUtils.copyURLToFile(zipFileURL, zipFile, connectionTimeout, readTimeout);
URL sourceZipFileURL = new URL(zipURL);

String targetZipFileName = UUID.randomUUID() + ".zip";
File targetZipFile = workingDirectoryPath.resolve(targetZipFileName).toFile();

LOGGER.infof("Downloading %s into %s", sourceZipFileURL.toString(), targetZipFile);
try (
FileOutputStream fileOutputStream = new FileOutputStream(targetZipFile);
ReadableByteChannel readableByteChannel = Channels.newChannel(sourceZipFileURL.openStream());
FileChannel fileChannel = fileOutputStream.getChannel();
) {
final long chunkSize = 1024 * 50;
long position = 0;
while (fileChannel.transferFrom(readableByteChannel, position, chunkSize) > 0) {
position += chunkSize;
}
}
LOGGER.infof("Download finished successfully");

return zipFile;
return targetZipFile;
}

public File unzip(File file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,28 @@
import io.github.project.openubl.searchpe.models.jpa.entity.Status;
import io.github.project.openubl.searchpe.models.jpa.entity.VersionEntity;
import io.github.project.openubl.searchpe.utils.DataHelper;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.RunOptions;
import org.apache.commons.io.FileUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

@ApplicationScoped
public class UpgradeDataService {
Expand All @@ -60,12 +59,6 @@ public class UpgradeDataService {
@Inject
FileService fileService;

@Inject
UserTransaction tx;

@Inject
EntityManager entityManager;

@Inject
Event<VersionEvent.DownloadingEvent> downloadingVersionEvent;

Expand All @@ -75,9 +68,6 @@ public class UpgradeDataService {
@Inject
Event<VersionEvent.ImportingDataEvent> importingVersionEvent;

@Inject
Event<VersionEvent.RecordsDataEvent> recordsEvent;

public void upgrade(Long versionId) {
File downloadedFile;
File unzippedFolder;
Expand Down Expand Up @@ -125,13 +115,15 @@ public void createContribuyentesFromFile(Long versionId, File file) throws IOExc
int totalCount = 0;
int batchCount = 0;

try (BufferedReader br = new BufferedReader(new FileReader(file, StandardCharsets.ISO_8859_1))) {
List<ContribuyenteEntity> contribuyentes = new ArrayList<>();
try (
FileReader fileReader = new FileReader(file, StandardCharsets.ISO_8859_1);
BufferedReader br = new BufferedReader(fileReader)
) {
String line;
boolean skip = true;

int batchSize = jdbcBatchSize;

tx.begin();
int batchSize = jdbcBatchSize * 10;

while ((line = br.readLine()) != null) {
if (skip) {
Expand All @@ -154,55 +146,65 @@ public void createContribuyentesFromFile(Long versionId, File file) throws IOExc
}
}

entityManager.persist(contribuyente);
contribuyentes.add(contribuyente);

totalCount = totalCount + 1;
batchCount = batchCount + 1;

// Time to save data
if (batchCount >= batchSize) {
batchCount = 0;

entityManager.flush();
entityManager.clear();
tx.commit();

recordsEvent.fire(new VersionEvent.DefaultRecordsDataEvent(versionId, totalCount));
saveProgress(versionId, contribuyentes).toCompletableFuture().get();

tx.begin();
// Reset
batchCount = 0;
contribuyentes.clear();
}
}

tx.commit();
} catch (NotSupportedException | HeuristicRollbackException | SystemException | RollbackException |
HeuristicMixedException e) {
LOGGER.error(e);
return;
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}


// Save remaining data
try {
tx.begin();
if (!contribuyentes.isEmpty()) {
saveProgress(versionId, contribuyentes).toCompletableFuture().get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

// Save final state of version
int records = totalCount;
QuarkusTransaction.run(QuarkusTransaction.runOptions()
.exceptionHandler((throwable) -> RunOptions.ExceptionResult.ROLLBACK)
.semantic(RunOptions.Semantic.DISALLOW_EXISTING), () -> {

VersionEntity version = VersionEntity.findById(versionId);
version.status = Status.COMPLETED;
version.updatedAt = new Date();
version.records = totalCount;

VersionEntity.persist(version);

tx.commit();
} catch (NotSupportedException | HeuristicRollbackException | HeuristicMixedException | RollbackException |
SystemException e) {
try {
tx.rollback();
} catch (SystemException se) {
LOGGER.error(se);
}
return;
}
version.records = records;
version.persist();
});

long endTime = Calendar.getInstance().getTimeInMillis();
LOGGER.infof("Import contribuyentes finished successfully in " + (endTime - startTime) + " milliseconds.");
}

private CompletionStage<Void> saveProgress(Long versionId, List<ContribuyenteEntity> contribuyentes) {
return CompletableFuture.runAsync(() -> {
QuarkusTransaction.run(QuarkusTransaction.runOptions()
.exceptionHandler((throwable) -> RunOptions.ExceptionResult.ROLLBACK)
.semantic(RunOptions.Semantic.DISALLOW_EXISTING), () -> {
VersionEntity version = VersionEntity.findById(versionId);
version.records = version.records + contribuyentes.size();
version.updatedAt = new Date();
version.persist();

ContribuyenteEntity.persist(contribuyentes);
});

LOGGER.infof("Chunk processed size=" + contribuyentes.size());
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static String[] readLine(String line, int size) {
}

public static Optional<ContribuyenteEntity> buildContribuyenteEntity(Long versionId, String[] columns) {
if (columns[0] == null || columns[1] == null) {
if (columns[0] == null || columns[1] == null || !columns[0].matches("^[0-9]{11}")) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public static void unzipFile(File zipFile, Path destinationPath) throws IOExcept
}

byte[] buffer = new byte[1024];
try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile))) {
try (
FileInputStream is = new FileInputStream(zipFile);
ZipInputStream zis = new ZipInputStream(is)
) {
ZipEntry zipEntry = zis.getNextEntry();
while (zipEntry != null) {
File newFile = destinationPath.resolve(zipEntry.getName()).toFile();
Expand Down
2 changes: 1 addition & 1 deletion src/main/kubernetes/pvc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storage: 10Gi
14 changes: 10 additions & 4 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ quarkus.datasource.jdbc.additional-jdbc-properties.reWriteBatchedInserts=true
quarkus.hibernate-orm.database.generation=none
quarkus.hibernate-orm.sql-load-script=no-file
quarkus.hibernate-orm.log.sql=false
quarkus.hibernate-orm.jdbc.statement-batch-size=5000
quarkus.hibernate-orm.jdbc.statement-batch-size=50

# Quartz
quarkus.quartz.clustered=false
Expand Down Expand Up @@ -93,9 +93,9 @@ quarkus.kubernetes.part-of=openubl
quarkus.kubernetes.name=searchpe

quarkus.kubernetes.resources.requests.memory=64Mi
quarkus.kubernetes.resources.requests.cpu=250m
quarkus.kubernetes.resources.requests.cpu=50m
quarkus.kubernetes.resources.limits.memory=512Mi
quarkus.kubernetes.resources.limits.cpu=1000m
quarkus.kubernetes.resources.limits.cpu=250m

quarkus.kubernetes.env.vars.QUARKUS_DATASOURCE_USERNAME=db_username
quarkus.kubernetes.env.vars.QUARKUS_DATASOURCE_PASSWORD=db_password
Expand All @@ -110,12 +110,17 @@ quarkus.kubernetes.env.vars.QUARKUS_HIBERNATE_SEARCH_ORM_ELASTICSEARCH_HOSTS=ela
quarkus.kubernetes.env.vars.QUARKUS_HIBERNATE_SEARCH_ORM_ELASTICSEARCH_VERSION=7
quarkus.kubernetes.env.vars.QUARKUS_PROFILE=prod

# Memory
quarkus.jib.native-argument=-Xmx384m,-Xms384m
quarkus.jib.jvm-arguments=-Xmx384m,-Xms384m

# Searchpe
searchpe.disable.authorization=true

searchpe.workspace.directory=workspace

searchpe.sunat.padronReducidoUrl=https://raw.githubusercontent.com/project-openubl/searchpe/master/src/test/resources/padron_reducido_ruc.zip
searchpe.sunat.filter=ACTIVO
searchpe.sunat.workspace.directory=searchpe/workspace
searchpe.sunat.workspace.connectionTimeout=100000
searchpe.sunat.workspace.readTimeout=100000

Expand All @@ -128,6 +133,7 @@ searchpe.oidc.ui-client-id=searchpe-ui
%test.quarkus.hibernate-search-orm.automatic-indexing.synchronization.strategy=sync

%prod.searchpe.sunat.padronReducidoUrl=http://www2.sunat.gob.pe/padron_reducido_ruc.zip
#%prod.searchpe.sunat.padronReducidoUrl=https://gitlab.com/carlosthe19916/test/-/raw/main/padron_reducido_ruc.zip?inline=false

%basic.searchpe.disable.authorization=false

Expand Down

0 comments on commit 82e1ed7

Please sign in to comment.