Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is possible to modify a dataset once the serve is started? #67

Open
josephnowak opened this issue Mar 9, 2021 · 4 comments
Open

Is possible to modify a dataset once the serve is started? #67

josephnowak opened this issue Mar 9, 2021 · 4 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@josephnowak
Copy link

Hi, I'm interested in use xpublish to analyze some financial data that I have in a cluster. The thing is that the data is a time-series, so every day the data suffer a concatenation of data and when this happens the dataset that I made public does not show the new dates. I want to know how can I update that dataset without killing the serve and public again the datasets. I really can't find a method to do this in the documentation, sorry for annoying you with this but I think that this a great API for my use case.

@josephnowak
Copy link
Author

josephnowak commented Mar 9, 2021

I did a code that does exactly what I wanted, but It makes use of private attributes so I suppose It is not a good idea to use it and probably could produce some other problems, but It exemplifies what I want.

import numpy as np 
import xarray
import time
import xpublish
import zarr
import threading
import os
from fsspec.implementations.http import HTTPFileSystem


def compare_dataset(a, b):
	b = b.to_array()
	a = a.to_array()
	equals = True
	for name, coord in a.coords.items():
		equals &= coord.equals(b.coords[name])
	equals &= np.allclose(a.values, b.values, equal_nan=True)
	return equals



def generate_zarr(file_name, shape):
	a = np.random.rand(*shape)
	b = xarray.DataArray(
	    a,
	    dims=['index', 'columns'],
	    coords={'index': list(range(shape[0])), 'columns': list(range(shape[1]))}
	).to_dataset(name='test_arr').chunk({'index': 5})
	b.to_zarr(file_name, consolidated=True, mode='w')


generate_zarr('testing.zarr', (10, 5))
generate_zarr('testing2.zarr', (15, 7))
generate_zarr('testing3.zarr', (4, 7))

d = xarray.open_zarr('testing.zarr', consolidated=True)
d2 = xarray.open_zarr('testing2.zarr', consolidated=True)
d3 = xarray.open_zarr('testing3.zarr', consolidated=True)


# d.rest.serve(host='127.0.0.1', port=9000)
rest_collection = xpublish.Rest({'test': d})

# rest_collection.serve(host='127.0.0.1', port=9000)
p = threading.Thread(target=rest_collection.serve, kwargs={'host': '127.0.0.1', 'port': 9000})
p.start()

fs = HTTPFileSystem()

while True:
	key = int(input())
	if key == 1:	
		# Putting the original dataset
		d.attrs['_xpublish_id'] = 'test'
		rest_collection._datasets['test'] = d 

		# validating the results
		http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test')
		test_d = xarray.open_zarr(http_map, consolidated=True)
		print(compare_dataset(test_d, d))
	elif key == 2:
		d2.attrs['_xpublish_id'] = 'test'
		rest_collection._datasets['test'] = d2 

		# validating the results
		http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test')
		test_d = xarray.open_zarr(http_map, consolidated=True)
		print(compare_dataset(test_d, d2))
	elif key == 3:
		d3.attrs['_xpublish_id'] = 'test_evo'
		rest_collection._datasets['test_evo'] = d3 

		# validating the results
		http_map = fs.get_mapper('http://127.0.0.1:9000/datasets/test_evo')
		test_d = xarray.open_zarr(http_map, consolidated=True)
		print(compare_dataset(test_d, d3))
	elif key == 4:
		# Kill all
		os._exit(-1)

	# showing the datasets
	print(rest_collection._datasets)
	rest_collection.cache.clear()

@jhamman
Copy link
Contributor

jhamman commented Mar 10, 2021

I haven't done anything like this yet. I wonder if either @lsetiawan or @benbovy has?

@benbovy
Copy link
Contributor

benbovy commented Mar 10, 2021

@josephnowak you could update the dataset being served using a custom API endpoint, like in the example below where /add-random-var adds a new variable to the dataset and returns the variable name:

import string
import random

import numpy as np
import xarray as xr
import xpublish
from xpublish.dependencies import get_dataset
from xpublish.routers import base_router
from fastapi import APIRouter, Depends


update_router = APIRouter()


def create_random_var():
    vname = ''.join(random.choice(string.ascii_lowercase) for i in range(7))
    var = ('x', np.random.rand(100))
    return vname, var


@update_router.post("/add-random-var")
def add_random_var(dataset: xr.Dataset = Depends(get_dataset)) -> str:
    vname, var = create_random_var()
    dataset[vname] = var
    return vname
    

ds = xr.Dataset(dict([create_random_var()]))

ds.rest(routers=[base_router, update_router])

ds.rest.serve(host="127.0.0.1")

You could then have a service or a process that sends a request to this API endpoint every day.

I guess you could use some other features in FastAPI (authentication, origin, etc.) to restrict the access to this endpoint.

@benbovy
Copy link
Contributor

benbovy commented Mar 10, 2021

The current limitation is that you can not replace the whole dataset being served by another one. All updates have to be in-place.

Also, it's not possible yet to use the application to update a collection of datasets being served, but I think it would be straightforward to support it by adding a get_dataset_collection FastAPI dependency to Xpublish.

@jhamman jhamman added enhancement New feature or request help wanted Extra attention is needed labels Feb 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants