Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler): scheduled tasks does not persist on server restart #7959

Merged
merged 6 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions kolibri/core/analytics/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,18 @@ def schedule_ping(
server=DEFAULT_SERVER_URL,
checkrate=DEFAULT_PING_CHECKRATE,
interval=DEFAULT_PING_INTERVAL,
job_id=None,
):
started = local_now()
scheduler.schedule(
started,
_ping,
interval=interval * 60,
repeat=None,
started=started,
server=server,
checkrate=checkrate,
)
# If pinging is not disabled by the environment
if not conf.OPTIONS["Deployment"]["DISABLE_PING"]:
started = local_now()
scheduler.schedule(
started,
_ping,
interval=interval * 60,
repeat=None,
started=started,
server=server,
checkrate=checkrate,
job_id=job_id,
)
6 changes: 4 additions & 2 deletions kolibri/core/deviceadmin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ def perform_vacuum(database=db.DEFAULT_DB_ALIAS):
logger.info("Sqlite database Vacuum finished.")


def schedule_vacuum():
def schedule_vacuum(job_id=None):
current_dt = local_now()
vacuum_time = current_dt.replace(hour=3, minute=0, second=0, microsecond=0)
if vacuum_time < current_dt:
# If it is past 3AM, change the day to tomorrow.
vacuum_time = vacuum_time + timedelta(days=1)
# Repeat indefinitely
scheduler.schedule(vacuum_time, perform_vacuum, repeat=None, interval=24 * 60 * 60)
scheduler.schedule(
vacuum_time, perform_vacuum, repeat=None, interval=24 * 60 * 60, job_id=job_id
)
7 changes: 6 additions & 1 deletion kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def __init__(self, func, *args, **kwargs):
kwargs["cancellable"] = func.cancellable
kwargs["extra_metadata"] = func.extra_metadata.copy()
func = func.func
self.job_id = uuid.uuid4().hex

job_id = kwargs.pop("job_id", None)
if job_id is None:
job_id = uuid.uuid4().hex

self.job_id = job_id
self.state = kwargs.pop("state", State.QUEUED)
self.traceback = ""
self.exception = None
Expand Down
23 changes: 13 additions & 10 deletions kolibri/utils/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .system import pid_exists
from kolibri.core.content.utils import paths
from kolibri.core.content.zip_wsgi import get_application
from kolibri.core.deviceadmin.utils import schedule_vacuum
from kolibri.core.tasks.main import initialize_workers
from kolibri.core.tasks.main import scheduler
from kolibri.utils import conf
Expand Down Expand Up @@ -67,6 +66,10 @@
# Currently non-configurable until we know how to properly handle this
LISTEN_ADDRESS = "0.0.0.0"

# Constant job_id for scheduled jobs that we want to keep track of across server restarts
SCH_PING_JOB_ID = 0
SCH_VACUUM_JOB_ID = 1


class NotRunning(Exception):
"""
Expand All @@ -86,22 +89,22 @@ def __init__(self, bus, port):
self.workers = None

def start(self):
# Initialize the iceqube scheduler to handle scheduled tasks
scheduler.clear_scheduler()

if not conf.OPTIONS["Deployment"]["DISABLE_PING"]:

# schedule the pingback job
# schedule the pingback job if not already scheduled
if SCH_PING_JOB_ID not in scheduler:
from kolibri.core.analytics.utils import schedule_ping

schedule_ping()
schedule_ping(job_id=SCH_PING_JOB_ID)

# schedule the vacuum job if not already scheduled
if SCH_VACUUM_JOB_ID not in scheduler:
from kolibri.core.deviceadmin.utils import schedule_vacuum

# schedule the vacuum job
schedule_vacuum()
schedule_vacuum(job_id=SCH_VACUUM_JOB_ID)

# Initialize the iceqube engine to handle queued tasks
self.workers = initialize_workers()

# Initialize the iceqube scheduler to handle scheduled tasks
scheduler.start_scheduler()

# Register the Kolibri zeroconf service so it will be discoverable on the network
Expand Down