-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ADR for file uploads #207
ADR for file uploads #207
Conversation
This pull request is being automatically deployed with Vercel (learn more). marxan – ./app🔍 Inspect: https://vercel.com/vizzuality1/marxan/CBhxNwzb9gsSc3EtiKFaVioJPZKY marxan-storybook – ./app🔍 Inspect: https://vercel.com/vizzuality1/marxan-storybook/EwXUPwVAEnJZLAJk6ZE3R8WBEKYX |
@aagm @kgajowy @Dyostiq @alexeh please see a draft of ADR for file uploads strategies, outlining the context. We currently support dispatching BullMQ job requests from the API service only, which is basically the source of this discussion - if we could (and wanted to) place jobs on a queue from the geoprocessing service (for itself), then we could simply reverse-proxy all requests to the geoprocessing service and let it deal with job dispatching, while already having the file locally (this is one of the options I outlined in the ADR, in fact). |
First, thank you for putting this adr document together, is amazing. Secondly and waiting for Dominik suggestions, IMO, as a safety net and latest resurce i will put directly proxy to geoprocessing. |
My suggestions,
Also, we can utilize a saga/process manager to decouple storage in the geoprocessing service. Pseudocode: import { EventBus } from '@nestjs/cqrs';
import * as fs from 'fs'
declare const procssingQueue: any
declare const eventBus: EventBus
class ProcessingRequested {
constructor(public readonly id, public readonly data) {
}
}
processingQueue.on('something', (id, data) => {
eventBus.publish(new ProcessingRequested(id, data))
})
class FileUploaded {
constructor(id, origin) {}
}
// object storage
declare const newObjectEvents: any
newObjectEvents.subscribe((id) => {
eventBus.publish(new FileUploaded(id, 'object storage'))
})
// file storage
fs.watch('...', () => {
eventBus.publish(new FileUploaded('id', 'file storage'))
})
// http/grpc/whatever push/pull
class Controller {
handleFile() {
// will handle the same way as file storage
}
}
// the saga
class Saga {
sagaState: any = {}
constructor(
private readonly sagaStateRepo: any,
private readonly finalizer: any,
private readonly process: any,
private readonly streamResolver: any
) {
}
@On(ProcessingRequested)
handleNewProcessing(event: ProcessingRequested) {
const state = this.sageStateRepo.get(event.id)
state.handle(event)
state.save()
}
@On(FileUploaded)
handleFileUploaded(event: FileUploaded) {
const state = this.sagaStateRepo.get(event.id)
state.handle(event)
if (state.readyForProcessing) {
const stream = this.streamResolver(event)
this.process(state, stream)
this.finalizer.cleanup(state)
}
state.save()
}
@Schedule('every 15 minutes')
cleanup() {
const states = this.sagaStateRepo.getAllStale()
this.finalizer.cleanup(states)
eventBus.publish(new ProcessingFailed())
}
} Actually, it may overcomplicate things. We need to be careful while designing it and be sure that we'll benefit from the decoupling. |
Another idea – use HTTP pull, but provide a generic link – maybe even in the message in the queue. It can be a signed link to an s3 bucket, or to another service, or API instance. If we want to treat geoprocessing only as a processing service, we keep the entire file management on the API side, the geoprocessing is not aware of where the file comes from, just gets an HTTP link from which it can read the stream. Then, we have geoprocessing and file storage decoupled. It all depends on where we want to assign the responsibility. |
@Dyostiq thank you for writing up your suggestions - I have added them to the ADR options, except this as I am not sure I understand:
do you mean to use an in-cluster (compose or k8s) blob storage service such as Minio, and use an S3 interface throughout? or to use bucket storage via a fuse interface (so similar to your first suggestion, but in all cases rather than using shared volumes in local dev environments and bucket storage via fuse in Kubernetes clusters)? I haven't added the saga suggestion to the ADR as I think it could be useful as a reference for implementation if relevant, but it may be orthogonal to the choice of backing storage. |
@hotzevzl yes the thing with saga is an implementation detail, as I said, but I wrote the comment to keep the idea somewhere. With the second option what I meant is to always use the s3 interface for files, and have storage in API/geoprocessing available through the interface, and we can then switch seamlessly to regular s3/azure/minIO and so on. |
0f71560
to
558fcb4
Compare
See https://vizzuality.atlassian.net/browse/MARXAN-438