Skip to content

Commit

Permalink
Revert "fix and refactor deactivating unreported datasets (#4388)" (#…
Browse files Browse the repository at this point in the history
…4402)

This reverts commit 0df5d80.
  • Loading branch information
youri-k authored and fm3 committed Jan 9, 2020
1 parent 0df5d80 commit d1f7aa5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 68 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.md).
- Changed the error message when importing a dataset without resolution directories. [#4389](https://github.com/scalableminds/webknossos/pull/4389)

### Fixed
- Fixed the deactivation of datasets if no datasets are present. [#4388](https://github.com/scalableminds/webknossos/pull/4388)
-

### Removed
-
Expand Down
4 changes: 2 additions & 2 deletions app/controllers/WKDataStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class WKDataStoreController @Inject()(dataSetService: DataSetService,
request.body.validate[List[InboxDataSource]] match {
case JsSuccess(dataSources, _) =>
for {
existingIds <- dataSetService.updateDataSources(dataStore, dataSources)(GlobalAccessContext)
_ <- dataSetService.deactivateUnreportedDataSources(existingIds)(GlobalAccessContext)
_ <- dataSetService.deactivateUnreportedDataSources(dataStore.name, dataSources)(GlobalAccessContext)
_ <- dataSetService.updateDataSources(dataStore, dataSources)(GlobalAccessContext)
} yield {
JsonOk
}
Expand Down
33 changes: 12 additions & 21 deletions app/models/binary/DataSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,34 +279,25 @@ class DataSetDAO @Inject()(sqlClient: SQLClient,
_ <- dataSetDataLayerDAO.updateLayers(id, source)
} yield ()

def deactivateUnreported(existingDataSetIds: List[ObjectId], unreportedStatus: String): Fox[Unit] = {
def inclusionPredicate(columnName: String = "_id") =
if (existingDataSetIds.isEmpty) "true"
else s"$columnName not in ${writeStructTupleWithQuotes(existingDataSetIds.map(_.id))}"
val deleteResolutionsQuery =
sqlu"""delete from webknossos.dataSet_resolutions where #${inclusionPredicate("_dataSet")}"""
val deleteLayersQuery =
sqlu"""delete from webknossos.dataSet_layers where #${inclusionPredicate("_dataSet")}"""
val setToUnusableQuery =
sqlu"""update webknossos.datasets
set isUsable = false, status = $unreportedStatus, scale = NULL, inboxSourceHash = NULL
where #${inclusionPredicate()}"""
for {
_ <- run(DBIO.sequence(List(deleteResolutionsQuery, deleteLayersQuery, setToUnusableQuery)).transactionally)
} yield ()
}

def deactivateAllForDataStore(dataStoreName: String, unreportedStatus: String): Fox[Unit] = {
def deactivateUnreported(names: List[String],
organizationId: ObjectId,
dataStoreName: String,
unreportedStatus: String): Fox[Unit] = {
val inclusionPredicate =
if (names.isEmpty) "true" else s"name not in ${writeStructTupleWithQuotes(names.map(sanitize))}"
val deleteResolutionsQuery =
sqlu"""delete from webknossos.dataSet_resolutions where _dataSet in
(select _id from webknossos.dataSets where _dataStore = ${dataStoreName})"""
(select _id from webknossos.dataSets where _dataStore = ${dataStoreName} and _organization = ${organizationId}
and #${inclusionPredicate})"""
val deleteLayersQuery =
sqlu"""delete from webknossos.dataSet_layers where _dataSet in
(select _id from webknossos.dataSets where _dataStore = ${dataStoreName})"""
(select _id from webknossos.dataSets where _dataStore = ${dataStoreName} and _organization = ${organizationId}
and #${inclusionPredicate})"""
val setToUnusableQuery =
sqlu"""update webknossos.datasets
set isUsable = false, status = $unreportedStatus, scale = NULL, inboxSourceHash = NULL
where _dataStore = ${dataStoreName}"""
where _dataStore = ${dataStoreName} and _organization = ${organizationId}
and #${inclusionPredicate}"""
for {
_ <- run(DBIO.sequence(List(deleteResolutionsQuery, deleteLayersQuery, setToUnusableQuery)).transactionally)
} yield ()
Expand Down
99 changes: 55 additions & 44 deletions app/models/binary/DataSetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
dataSource: InboxDataSource,
publication: Option[ObjectId] = None,
isActive: Boolean = false
): Fox[ObjectId] = {
): Fox[Unit] = {
implicit val ctx = GlobalAccessContext
val newId = ObjectId.generate
val details =
Expand Down Expand Up @@ -92,7 +92,7 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
))
_ <- dataSetDataLayerDAO.updateLayers(newId, dataSource)
_ <- dataSetAllowedTeamsDAO.updateAllowedTeamsForDataSet(newId, List())
} yield newId
} yield ()
}

def addForeignDataSet(dataStoreName: String, dataSetName: String, organizationName: String)(
Expand All @@ -116,41 +116,39 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
}

def updateDataSources(dataStore: DataStore, dataSources: List[InboxDataSource])(
implicit ctx: DBAccessContext): Fox[List[ObjectId]] = {
implicit ctx: DBAccessContext): Fox[List[Unit]] = {
logger.info(
s"[${dataStore.name}] Available datasets: " +
s"${dataSources.count(_.isUsable)} (usable), ${dataSources.count(!_.isUsable)} (unusable)")

val groupedByOrga = dataSources.groupBy(_.id.team).toList

Fox
.serialCombined(groupedByOrga) { orgaTuple: (String, List[InboxDataSource]) =>
organizationDAO
.findOneByName(orgaTuple._1)
.futureBox
.flatMap {
case Full(organization) =>
for {
foundDatasets <- dataSetDAO.findAllByNamesAndOrganization(orgaTuple._2.map(_.id.name), organization._id)
foundDatasetsByName = foundDatasets.groupBy(_.name)
existingIds <- Fox.serialCombined(orgaTuple._2)(dataSource =>
updateDataSource(dataStore, dataSource, foundDatasetsByName))
} yield existingIds
case _ =>
logger.info(
s"Ignoring ${orgaTuple._2.length} reported datasets for non-existing organization ${orgaTuple._1}")
Fox.successful(List.empty)
}
.toFox
}
.map(_.flatten)
Fox.serialCombined(groupedByOrga) { orgaTuple: (String, List[InboxDataSource]) =>
for {
organizationBox <- organizationDAO.findOneByName(orgaTuple._1).futureBox
result <- organizationBox match {
case Full(organization) =>
for {
foundDatasets <- dataSetDAO.findAllByNamesAndOrganization(orgaTuple._2.map(_.id.name), organization._id)
foundDatasetsByName = foundDatasets.groupBy(_.name)
_ <- Fox.serialSequence(orgaTuple._2)(dataSource =>
updateDataSource(dataStore, dataSource, foundDatasetsByName))
} yield ()
case _ =>
logger.info(
s"Ignoring ${orgaTuple._2.length} reported datasets for non-existing organization ${orgaTuple._1}")
Fox.successful(())
}

} yield ()
}
}

private def updateDataSource(
dataStore: DataStore,
dataSource: InboxDataSource,
foundDatasets: Map[String, List[DataSet]]
)(implicit ctx: DBAccessContext): Fox[ObjectId] = {
)(implicit ctx: DBAccessContext): Fox[Unit] = {
val foundDataSetOpt = foundDatasets.get(dataSource.id.name).flatMap(_.headOption)
foundDataSetOpt match {
case Some(foundDataSet) if foundDataSet._dataStore == dataStore.name =>
Expand All @@ -163,36 +161,32 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
}

private def updateKnownDataSource(foundDataSet: DataSet, dataSource: InboxDataSource, dataStore: DataStore)(
implicit ctx: DBAccessContext): Future[Box[ObjectId]] =
implicit ctx: DBAccessContext): Future[Box[Unit]] =
if (foundDataSet.inboxSourceHash.contains(dataSource.hashCode))
Fox.successful(foundDataSet._id)
Fox.successful(())
else
dataSetDAO
.updateDataSourceByNameAndOrganizationName(foundDataSet._id,
dataStore.name,
dataSource.hashCode,
dataSource,
dataSource.isUsable)(GlobalAccessContext)
.map(_ => foundDataSet._id)
dataSetDAO.updateDataSourceByNameAndOrganizationName(foundDataSet._id,
dataStore.name,
dataSource.hashCode,
dataSource,
dataSource.isUsable)(GlobalAccessContext)

private def updateDataSourceDifferentDataStore(
foundDataSet: DataSet,
dataSource: InboxDataSource,
dataStore: DataStore)(implicit ctx: DBAccessContext): Future[Box[ObjectId]] =
dataStore: DataStore)(implicit ctx: DBAccessContext): Future[Box[Unit]] =
// The dataSet is already present (belonging to the same organization), but reported from a different datastore
(for {
originalDataStore <- dataStoreDAO.findOneByName(foundDataSet._dataStore)
} yield {
if (originalDataStore.isScratch && !dataStore.isScratch) {
logger.info(
s"Replacing dataset ${foundDataSet.name} from scratch datastore ${originalDataStore.name} by the one from ${dataStore.name}")
dataSetDAO
.updateDataSourceByNameAndOrganizationName(foundDataSet._id,
dataStore.name,
dataSource.hashCode,
dataSource,
dataSource.isUsable)(GlobalAccessContext)
.map(_ => foundDataSet._id)
dataSetDAO.updateDataSourceByNameAndOrganizationName(foundDataSet._id,
dataStore.name,
dataSource.hashCode,
dataSource,
dataSource.isUsable)(GlobalAccessContext)
} else {
logger.info(
s"Dataset ${foundDataSet.name}, as reported from ${dataStore.name} is already present from datastore ${originalDataStore.name} and will not be replaced.")
Expand All @@ -215,8 +209,25 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
}
} else Fox.successful(None)

def deactivateUnreportedDataSources(existingDataSetIds: List[ObjectId])(implicit ctx: DBAccessContext): Fox[Unit] =
dataSetDAO.deactivateUnreported(existingDataSetIds, unreportedStatus)
def deactivateUnreportedDataSources(dataStoreName: String, dataSources: List[InboxDataSource])(
implicit ctx: DBAccessContext): Fox[List[Unit]] = {
val dataSourcesByOrganizationName: Map[String, List[InboxDataSource]] = dataSources.groupBy(_.id.team)
Fox.serialCombined(dataSourcesByOrganizationName.keys.toList) { organizationName =>
for {
organizationBox <- organizationDAO.findOneByName(organizationName).futureBox
_ <- organizationBox match {
case Full(organization) =>
dataSetDAO.deactivateUnreported(dataSourcesByOrganizationName(organizationName).map(_.id.name),
organization._id,
dataStoreName,
unreportedStatus)
case _ =>
logger.info(s"Ignoring reported dataset for non-existing organization $organizationName")
Fox.successful(())
}
} yield ()
}
}

def getSharingToken(dataSetName: String, organizationId: ObjectId)(implicit ctx: DBAccessContext) = {

Expand Down

0 comments on commit d1f7aa5

Please sign in to comment.