Skip to content

Commit

Permalink
#4158 - Exception when annotating something after a longer pause
Browse files Browse the repository at this point in the history
- Added new CAS Doctor check for unreachable feature structures
- Added new CAS Doctor repair for unreachable feature structures
- Improved logging in storage system
- Improves storage utilities
  • Loading branch information
reckart committed Sep 20, 2023
1 parent f80ea97 commit 45a95c3
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,6 @@ public static void clearCasMetadata(CAS aCas) throws IllegalStateException
cmds.forEach(aCas::removeFsFromIndexes);
}

public static long getLastChanged(CAS aCas)
{
Type casMetadataType = getType(aCas, CASMetadata.class);
Feature feature = casMetadataType.getFeatureByBaseName("lastChangedOnDisk");
return aCas.select(casMetadataType).map(cmd -> cmd.getLongValue(feature)).findFirst()
.orElse(-1l);
}

public static void addOrUpdateCasMetadata(CAS aCas, long aTimeStamp, SourceDocument aDocument,
String aUsername)
throws IOException
Expand Down Expand Up @@ -132,6 +124,41 @@ else if (cmds.size() == 1) {
aCas.addFsToIndexes(cmd);
}

public static Optional<FeatureStructure> getCasMetadataFS(CAS aCas)
{
return Optional.ofNullable(CasUtil.selectSingle(aCas, getType(aCas, CASMetadata.class)));
}

public static long getLastChanged(CAS aCas)
{
Type casMetadataType = getType(aCas, CASMetadata.class);
Feature feature = casMetadataType.getFeatureByBaseName("lastChangedOnDisk");
return aCas.select(casMetadataType).map(cmd -> cmd.getLongValue(feature)).findFirst()
.orElse(-1l);
}

public static Optional<String> getUsername(CAS aCas)
{
try {
FeatureStructure fs = CasUtil.selectSingle(aCas, getType(aCas, CASMetadata.class));
return Optional.ofNullable(FSUtil.getFeature(fs, "username", String.class));
}
catch (IllegalArgumentException e) {
return Optional.empty();
}
}

public static Optional<Long> getSourceDocumentId(CAS aCas)
{
try {
FeatureStructure fs = CasUtil.selectSingle(aCas, getType(aCas, CASMetadata.class));
return Optional.ofNullable(FSUtil.getFeature(fs, "sourceDocumentId", Long.class));
}
catch (IllegalArgumentException e) {
return Optional.empty();
}
}

public static Optional<String> getSourceDocumentName(CAS aCas)
{
try {
Expand All @@ -142,4 +169,26 @@ public static Optional<String> getSourceDocumentName(CAS aCas)
return Optional.empty();
}
}

public static Optional<Long> getProjectId(CAS aCas)
{
try {
FeatureStructure fs = CasUtil.selectSingle(aCas, getType(aCas, CASMetadata.class));
return Optional.ofNullable(FSUtil.getFeature(fs, "projectId", Long.class));
}
catch (IllegalArgumentException e) {
return Optional.empty();
}
}

public static Optional<String> getProjectName(CAS aCas)
{
try {
FeatureStructure fs = CasUtil.selectSingle(aCas, getType(aCas, CASMetadata.class));
return Optional.ofNullable(FSUtil.getFeature(fs, "projectName", String.class));
}
catch (IllegalArgumentException e) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import static de.tudarmstadt.ukp.clarin.webanno.api.casstorage.CasAccessMode.UNMANAGED_NON_INITIALIZING_ACCESS;
import static de.tudarmstadt.ukp.inception.annotation.storage.CasStorageServiceImpl.RepairAndUpgradeFlags.ISOLATED_SESSION;
import static java.lang.System.currentTimeMillis;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.newSetFromMap;
import static java.util.Collections.synchronizedSet;
import static org.apache.commons.lang3.ArrayUtils.contains;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -44,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultEvictionPolicy;
Expand All @@ -70,6 +73,7 @@
import de.tudarmstadt.ukp.clarin.webanno.api.CasProvider;
import de.tudarmstadt.ukp.clarin.webanno.api.CasStorageService;
import de.tudarmstadt.ukp.clarin.webanno.api.CasUpgradeMode;
import de.tudarmstadt.ukp.clarin.webanno.api.annotation.util.WebAnnoCasUtil;
import de.tudarmstadt.ukp.clarin.webanno.api.casstorage.CasAccessMode;
import de.tudarmstadt.ukp.clarin.webanno.api.casstorage.CasSessionException;
import de.tudarmstadt.ukp.clarin.webanno.api.casstorage.CasStorageServiceAction;
Expand All @@ -84,6 +88,8 @@
import de.tudarmstadt.ukp.inception.annotation.storage.config.CasStorageCacheProperties;
import de.tudarmstadt.ukp.inception.annotation.storage.config.CasStorageServiceAutoConfiguration;
import de.tudarmstadt.ukp.inception.annotation.storage.driver.CasStorageDriver;
import de.tudarmstadt.ukp.inception.annotation.storage.driver.filesystem.CasPersistenceUtils;
import de.tudarmstadt.ukp.inception.annotation.storage.driver.filesystem.FileSystemCasStorageDriver;
import de.tudarmstadt.ukp.inception.schema.AnnotationSchemaService;

/**
Expand Down Expand Up @@ -295,16 +301,18 @@ public CAS readOrCreateCas(SourceDocument aDocument, String aUsername,
CasUpgradeMode aUpgradeMode, CasProvider aSupplier, CasAccessMode aAccessMode)
throws IOException, CasSessionException
{
LOG.debug("Reading annotations for [{}]@{} in {} with {} using {}", aUsername, aDocument,
aDocument.getProject(), aAccessMode, aUpgradeMode);

try (var logCtx = withProjectLogger(aDocument.getProject())) {
CasStorageSession session = CasStorageSession.get();

LOG.debug(
"CAS storage session [{}]: reading annotations for [{}]@{} in {} with {} using {}",
session.hashCode(), aUsername, aDocument, aDocument.getProject(), aAccessMode,
aUpgradeMode);

// If the CAS is already present in the current session and the access mode is
// compatible
// with the requested access mode, then we can return it immediately
// THOUGHT: As it is written now - if the access more already recorded in the session
// compatible with the requested access mode, then we can return it immediately
// THOUGHT: As it is written now - if the access mode already recorded in the session
// is insufficient, the access mode is upgraded because we simply continue after this
// IF-clause. I am not entirely sure this is valid.
// Case 1) CAS was added during the current session - the holder in the session is
Expand All @@ -316,6 +324,10 @@ public CAS readOrCreateCas(SourceDocument aDocument, String aUsername,
Optional<SessionManagedCas> mCas = session.getManagedState(aDocument.getId(),
aUsername);
if (mCas.isPresent() && mCas.get().getMode().alsoPermits(aAccessMode)) {
LOG.debug(
"CAS storage session [{}]: session already contains CAS [{}] for [{}]@{} with mode {}",
session.hashCode(), mCas.get().getCas().hashCode(), aUsername, aDocument,
mCas.get().getMode());
return mCas.get().getCas();
}

Expand Down Expand Up @@ -566,7 +578,16 @@ private CAS readOrCreateUnmanagedCas(SourceDocument aDocument, String aUsername,
CAS cas;
String source;

LOG.trace("Loading CAS [{}]@{} [{}]", aUsername, aDocument, aUpgradeMode);
if (LOG.isTraceEnabled()) {
if (CasStorageSession.exists()) {
var session = CasStorageSession.get();
LOG.trace("CAS storage session [{}]: loading CAS [{}]@{} [{}]", session.hashCode(),
aUsername, aDocument, aUpgradeMode);
}
else {
LOG.trace("Loading CAS [{}]@{} [{}]", aUsername, aDocument, aUpgradeMode);
}
}

// If the CAS exists on disk already, load it from there
if (driver.existsCas(aDocument, aUsername)) {
Expand Down Expand Up @@ -598,8 +619,17 @@ else if (aSupplier != null) {
var duration = currentTimeMillis() - start;

if (LOG.isDebugEnabled()) {
LOG.debug("Loaded CAS [{}] for [{}]@{} from {} in {}ms [{}]", cas.hashCode(), aUsername,
aDocument, source, duration, aUpgradeMode);
if (CasStorageSession.exists()) {
var session = CasStorageSession.get();
LOG.debug(
"CAS storage session [{}]: loaded CAS [{}] for [{}]@{} from {} in {}ms [{}]",
session.hashCode(), cas.hashCode(), aUsername, aDocument, source, duration,
aUpgradeMode);
}
else {
LOG.debug("Loaded CAS [{}] for [{}]@{} from {} in {}ms [{}]", cas.hashCode(),
aUsername, aDocument, source, duration, aUpgradeMode);
}
}

return cas;
Expand Down Expand Up @@ -1067,8 +1097,16 @@ private void realWriteCas(SourceDocument aDocument, String aUserName, CAS aCas)
{
analyze(aDocument.getProject(), aDocument.getName(), aDocument.getId(), aUserName, aCas);

LOG.debug("Writing annotations for [{}]@{} in {}", aUserName, aDocument,
aDocument.getProject());
if (CasStorageSession.exists()) {
var session = CasStorageSession.get();
LOG.debug("CAS storage session [{}]: writing annotations for [{}]@{} in {}",
session.hashCode(), aUserName, aDocument, aDocument.getProject());

}
else {
LOG.debug("Writing annotations for [{}]@{} in {}", aUserName, aDocument,
aDocument.getProject());
}

driver.writeCas(aDocument, aUserName, aCas);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static CasStorageSession openNested(boolean aIsolated)
}

/**
* @return the current session. Returns {@code null} if there is no current session.
* @return the current session.
* @throws CasSessionException
* if no session is available.
*/
Expand All @@ -143,6 +143,14 @@ public static CasStorageSession get() throws CasSessionException
return session;
}

/**
* @return if a session exists.
*/
public static boolean exists()
{
return activeSession.get() != null;
}

/**
* Closes this session.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.uima.cas.impl.Serialization.serializeCASComplete;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -103,8 +104,8 @@ public static void writeSerializedCasParanoid(CAS aCas, File aFile) throws IOExc
// Safeguard that we do NOT write a CAS which can afterwards not be read and thus
// would render the document broken within the project
// Reason we do this: https://issues.apache.org/jira/browse/UIMA-6162
CAS dummy = WebAnnoCasUtil.createCas();
deserializeCASComplete(serializer, (CASImpl) getRealCas(dummy));
CAS dummy = getRealCas(WebAnnoCasUtil.createCas());
deserializeCASComplete(serializer, (CASImpl) dummy);
// END SAFEGUARD --------------
}
catch (Exception e) {
Expand All @@ -120,6 +121,22 @@ public static void writeSerializedCasParanoid(CAS aCas, File aFile) throws IOExc
}
}

public static byte[] writeToByteArray(CAS aCas) throws IOException
{
try (var bos = new ByteArrayOutputStream()) {
write(bos, aCas);
return bos.toByteArray();
}
}

public static byte[] writeToCompressedByteArray(CAS aCas) throws IOException
{
try (var bos = new ByteArrayOutputStream()) {
writeSnappyCompressed(bos, aCas);
return bos.toByteArray();
}
}

private static void write(OutputStream aOut, CAS aCas) throws IOException, FileNotFoundException
{
var serializer = serializeCASComplete((CASImpl) getRealCas(aCas));
Expand Down Expand Up @@ -184,7 +201,7 @@ public static void readSerializedCas(CAS aCas, File aFile) throws IOException
}
}

private static void readSerializedCas(CAS aCas, InputStream is) throws IOException
public static void readSerializedCas(CAS aCas, InputStream is) throws IOException
{
try (var ois = new ObjectInputStream(is)) {
ois.setObjectInputFilter(SERIALIZED_CAS_INPUT_FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ public void importCas(SourceDocument aDocument, String aUser, InputStream aStrea
}
}

private File getCasFile(long aProjectId, long aDocumentId, String aUser) throws IOException
public File getCasFile(long aProjectId, long aDocumentId, String aUser) throws IOException
{
return new File(getAnnotationFolder(aProjectId, aDocumentId), aUser + SER_CAS_EXTENSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package de.tudarmstadt.ukp.clarin.webanno.api.annotation.util;

import static java.io.ObjectInputFilter.Config.createFilter;
import static java.lang.String.join;
import static org.apache.uima.cas.CAS.FEATURE_BASE_NAME_BEGIN;
import static org.apache.uima.cas.CAS.FEATURE_BASE_NAME_END;
import static org.apache.uima.cas.CAS.FEATURE_BASE_NAME_LANGUAGE;
Expand All @@ -28,6 +30,12 @@
import static org.apache.uima.fit.util.CasUtil.selectCovering;
import static org.apache.uima.fit.util.CasUtil.selectSingle;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -45,6 +53,8 @@
import org.apache.uima.cas.Type;
import org.apache.uima.cas.impl.CASCompleteSerializer;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.cas.impl.CASMgrSerializer;
import org.apache.uima.cas.impl.CASSerializer;
import org.apache.uima.cas.text.AnnotationFS;
import org.apache.uima.cas.text.AnnotationIndex;
import org.apache.uima.fit.util.CasUtil;
Expand All @@ -69,6 +79,13 @@ public class WebAnnoCasUtil
private static final boolean ENFORCE_CAS_THREAD_LOCK = System
.getProperty(PROP_ENFORCE_CAS_THREAD_LOCK, "true").equals("true");

private final static ObjectInputFilter SERIALIZED_CAS_INPUT_FILTER = createFilter(join(";", //
CASCompleteSerializer.class.getName(), //
CASSerializer.class.getName(), //
CASMgrSerializer.class.getName(), //
String.class.getName(), //
"!*"));

public static CAS createCas(TypeSystemDescription aTSD) throws ResourceInitializationException
{
CAS cas = CasCreationUtils.createCas(aTSD, null, null);
Expand Down Expand Up @@ -721,4 +738,49 @@ public static Set<FeatureStructure> findAllFeatureStructures(CAS aCas)
((CASImpl) aCas).walkReachablePlusFSsSorted(allFSes::add, null, null, null);
return allFSes;
}

public static byte[] casToByteArray(CASCompleteSerializer aSer) throws IOException
{
try (var bos = new ByteArrayOutputStream()) {
try (var oos = new ObjectOutputStream(bos)) {
oos.writeObject(aSer);
}
return bos.toByteArray();
}
}

public static byte[] casToByteArray(CAS aCas) throws IOException
{
// Index annotation document
var realCas = (CASImpl) getRealCas(aCas);
// UIMA-6162 Workaround: synchronize CAS during de/serialization
synchronized (realCas.getBaseCAS()) {
return casToByteArray(serializeCASComplete(realCas));
}
}

public static CAS byteArrayToCas(byte[] aByteArray) throws IOException
{
CAS cas;
try {
cas = createCas();
}
catch (ResourceInitializationException e) {
throw new IOException(e);
}

var realCas = (CASImpl) getRealCas(cas);
synchronized (realCas.getBaseCAS()) {
try (var ois = new ObjectInputStream(new ByteArrayInputStream(aByteArray))) {
ois.setObjectInputFilter(SERIALIZED_CAS_INPUT_FILTER);
var casCompleteSerializer = (CASCompleteSerializer) ois.readObject();
realCas.reinit(casCompleteSerializer);
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
}

return cas;
}
}
Loading

0 comments on commit 45a95c3

Please sign in to comment.