Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

[Feature] Recurring / rescheduling tasks #15

Closed
wants to merge 577 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
577 commits
Select commit Hold shift + click to select a range
f1c9c03
Add custom octopoes health checks
jpbruinsslot Jun 23, 2022
6cf5760
Blackify
jpbruinsslot Jun 23, 2022
bdeaa41
add initial deb files
errieman Jun 26, 2022
b87e9a1
update workflow file
errieman Jun 26, 2022
84a7ce6
set debhelper-compat version to 12
errieman Jun 26, 2022
3a8d9c4
Continue with tests
jpbruinsslot Jun 27, 2022
424ad72
Blackify
jpbruinsslot Jun 27, 2022
8576de5
Update API docs
jpbruinsslot Jun 27, 2022
33c6396
Fix mypy issue
jpbruinsslot Jun 27, 2022
8e20ecf
Revert to abc.ABC
jpbruinsslot Jun 27, 2022
11df294
Rename to openapi.json
jpbruinsslot Jun 27, 2022
b8ac7fe
Use regular def functions instead of async
jpbruinsslot Jun 27, 2022
750381c
Let InvalidPrioritizedItemError subclass from ValueError
jpbruinsslot Jun 27, 2022
92a4dd2
Blackify
jpbruinsslot Jun 27, 2022
6401c52
Merge branch 'fix/health-checks' into chore/code-style
jpbruinsslot Jun 27, 2022
4121e75
build test package
errieman Jun 27, 2022
b2189f5
remove service dependency on socket
errieman Jun 27, 2022
a88650c
Start with pylint suggestions
jpbruinsslot Jun 27, 2022
777f8f4
Update scheduler/server/server.py
jpbruinsslot Jun 28, 2022
8d9022f
also cleanup logging config
errieman Jun 28, 2022
8a06a7e
Implement pylint, mypy suggestions
jpbruinsslot Jun 28, 2022
15de053
Merge pull request #80 from minvws/feature/scheduler-api-endpoint
jpbruinsslot Jun 29, 2022
c930750
Squash
Donnype Jun 29, 2022
65b0854
Update scheduler/connectors/services/octopoes.py
jpbruinsslot Jun 29, 2022
7db00d5
Merge pull request #82 from minvws/fix/health-checks
jpbruinsslot Jun 29, 2022
e16da84
Merge branch 'develop' into chore/code-style
jpbruinsslot Jun 29, 2022
1367361
add default config to package
errieman Jun 29, 2022
7039d4f
Merge pull request #85 from minvws/chore/code-style
Donnype Jun 29, 2022
9906c73
Merge pull request #88 from minvws/refactor/non-root-docker
Donnype Jun 29, 2022
68594cd
Merge branch 'develop' into feature/deb-installer
errieman Jun 29, 2022
5f8fbbb
fix permissions and default config
errieman Jun 29, 2022
b6adfe9
fix run user of mula
errieman Jun 29, 2022
1076699
remove nginx from dependencies
errieman Jun 29, 2022
d03f136
add kat user in postinst
errieman Jun 29, 2022
ff33801
fix ownership of /usr/share/kat-mula enable mula after install
errieman Jun 29, 2022
206954a
fix format of copyright file
errieman Jun 29, 2022
28797da
fix name of service started
errieman Jun 29, 2022
ae0653e
queue vhost name, permissions
errieman Jun 29, 2022
b7276e1
fix syntax in postinst
errieman Jun 30, 2022
51ea073
fix if statement to check if kat vhost exists
errieman Jun 30, 2022
4e5e1e7
set boefje populate default to true
errieman Jul 1, 2022
e6b82b8
remove test code
errieman Jul 1, 2022
5734fad
Merge pull request #84 from minvws/feature/deb-installer
underdarknl Jul 1, 2022
cd3cd39
Add start time to the expiring dict
jpbruinsslot Jul 4, 2022
158e26a
Blackify
jpbruinsslot Jul 4, 2022
57899f1
Re-add sqalchemy code
jpbruinsslot Jul 4, 2022
5660479
configure rabbitmq user
errieman Jul 7, 2022
ca03e1d
Merge branch 'feature/sqlalchemy' into feature/job-status
jpbruinsslot Jul 12, 2022
f43c0a1
Introducing datastore persistance
jpbruinsslot Jul 12, 2022
e00881f
Fix checking on started_at from boefje_meta
jpbruinsslot Jul 13, 2022
f1dcc81
Continue work on job status persistence
jpbruinsslot Jul 13, 2022
55397c3
Merge pull request #96 from minvws/fix/last-run-boefje
jpbruinsslot Jul 14, 2022
a1cd60a
Implement the post execution hooks
jpbruinsslot Jul 14, 2022
28bd875
Integrate and fix datastore connection
jpbruinsslot Jul 18, 2022
2312e5d
Update models and migration
jpbruinsslot Jul 19, 2022
ba89ec5
Refactor/Restructure dispatching
jpbruinsslot Jul 19, 2022
6aaf6d0
Solve id problems
jpbruinsslot Jul 20, 2022
5c99ce0
Update id and fix hashing
jpbruinsslot Jul 25, 2022
9bd2915
Fix id issue, update rest api views
jpbruinsslot Jul 26, 2022
92c564a
Update integration tests
jpbruinsslot Jul 26, 2022
2466c22
Implement sqlite
jpbruinsslot Jul 27, 2022
8118dfe
Implement sqlite, and implement completed normalizer task
jpbruinsslot Jul 27, 2022
3cb0f46
Making tests work again
jpbruinsslot Jul 28, 2022
952ff81
Merge pull request #91 from minvws/chore/expiring-dict
jpbruinsslot Aug 1, 2022
dace2ce
Fix api tests
jpbruinsslot Aug 1, 2022
c98cb2f
Merge pull request #94 from minvws/feature/deb-config-queue
errieman Aug 1, 2022
0081303
Implement mypy suggestions
jpbruinsslot Aug 1, 2022
05b21ce
Mypy, pylint, black
jpbruinsslot Aug 1, 2022
0856387
Merge branch 'develop' into feature/job-status
jpbruinsslot Aug 1, 2022
ab9e438
Fix datetime
jpbruinsslot Aug 1, 2022
f4ca702
Fix tests
jpbruinsslot Aug 2, 2022
9739808
Update reference
jpbruinsslot Aug 2, 2022
1b53309
templated repos url in changelog
errieman Aug 3, 2022
359ca8e
Merge pull request #100 from minvws/deb-reposurl
jpbruinsslot Aug 3, 2022
408250c
Provide current user id to docker builds, defaulting to 1000
Donnype Aug 3, 2022
8996fb4
Update design document
jpbruinsslot Aug 3, 2022
08450bb
Merge pull request #101 from minvws/fix/use-host-user-id-in-container
Donnype Aug 7, 2022
65f103e
Correctly use sessions
jpbruinsslot Aug 8, 2022
d82050c
Remove unused test
jpbruinsslot Aug 8, 2022
5c0a311
Blackify
jpbruinsslot Aug 8, 2022
8b49417
Update docs
jpbruinsslot Aug 8, 2022
2267c6c
Merge branch 'feature/job-status' into docs/update
jpbruinsslot Aug 8, 2022
ec3905f
Merge pull request #97 from minvws/feature/job-status
jpbruinsslot Aug 8, 2022
64ffbcd
Update design
jpbruinsslot Aug 8, 2022
f0082bd
Update design and api spec
jpbruinsslot Aug 8, 2022
ee45a1e
Merge branch 'develop' into docs/update
jpbruinsslot Aug 9, 2022
ebb8ab0
Grammar, and fix diagram
jpbruinsslot Aug 9, 2022
6a8ecd5
Boefje error handling
jpbruinsslot Aug 9, 2022
18f481e
Set default ordering on task list
jpbruinsslot Aug 9, 2022
a272dd6
Fix ordering issue
jpbruinsslot Aug 9, 2022
c5ef7b1
Make null safe
jpbruinsslot Aug 9, 2022
2713486
Blackify
jpbruinsslot Aug 9, 2022
d9f3598
Merge pull request #106 from minvws/feature/error-handling
underdarknl Aug 9, 2022
b9f09c5
Merge pull request #105 from minvws/docs/update
underdarknl Aug 9, 2022
ab252b9
Add tests for updating status boefje tasks
jpbruinsslot Aug 9, 2022
aed5555
Merge branch 'develop' into feature/error-handling-tests
jpbruinsslot Aug 9, 2022
86810e0
Update docstring
jpbruinsslot Aug 9, 2022
1c5f51d
Blackify
jpbruinsslot Aug 9, 2022
bd72433
Check on completed and failed if a tasks is still running
jpbruinsslot Aug 10, 2022
121cb44
Add comments
jpbruinsslot Aug 10, 2022
d3336a3
Fix exclusive OR problem
jpbruinsslot Aug 10, 2022
137ee94
Fix syntax error
jpbruinsslot Aug 10, 2022
8e4eb00
Fix status
jpbruinsslot Aug 10, 2022
314fa56
Add dispatched
jpbruinsslot Aug 10, 2022
04c1126
Blackify
jpbruinsslot Aug 10, 2022
c932d63
Fix spelling
jpbruinsslot Aug 10, 2022
7c8ea27
Set database session to default
jpbruinsslot Aug 10, 2022
8591eb9
Merge pull request #110 from minvws/feature/error-handling
jpbruinsslot Aug 10, 2022
6005eeb
Fix pylint suggestions
jpbruinsslot Aug 11, 2022
ce6e115
Update datastore update statement
jpbruinsslot Aug 16, 2022
7b45fa8
Make thread-safe
jpbruinsslot Aug 16, 2022
dc2ec5c
Blackify
jpbruinsslot Aug 16, 2022
e99dee2
Merge branch 'develop' into feature/error-handling-tests
jpbruinsslot Aug 16, 2022
5768631
Merge pull request #108 from minvws/feature/error-handling-tests
jpbruinsslot Aug 17, 2022
3bad0b8
Using a global session for sqlalchmy
jpbruinsslot Aug 17, 2022
35970d5
Merge pull request #117 from minvws/fix/transaction-rollback
noamblitz Aug 18, 2022
65526d4
Merge branch 'develop' into chore/pylint
jpbruinsslot Aug 29, 2022
1b7ecdb
Fix type
jpbruinsslot Aug 29, 2022
4aa7e27
Blackify
jpbruinsslot Aug 29, 2022
d6ce8ea
Start with a pq implementation with a database backend
jpbruinsslot Aug 29, 2022
a0bb726
Fix normalizer jobs started when boefje tasks failed
jpbruinsslot Aug 31, 2022
9fc8db1
Merge pull request #112 from minvws/chore/pylint
jpbruinsslot Aug 31, 2022
9116ab2
Add commit to update
jpbruinsslot Aug 31, 2022
b29681d
Merge pull request #120 from minvws/fix/update-commit
underdarknl Aug 31, 2022
63d1f75
Start session for every method
jpbruinsslot Sep 1, 2022
179c689
Use sessionmaker.begin()
jpbruinsslot Sep 1, 2022
8d7d727
Blackify
jpbruinsslot Sep 1, 2022
31347cb
Refactoring to support database backed priority queue
jpbruinsslot Sep 2, 2022
caca6e6
Merge pull request #121 from minvws/fix/database-session
jpbruinsslot Sep 2, 2022
8536223
Build production suitable container images in CI
dekkers Sep 5, 2022
e4c81fc
Match all methods with pq
jpbruinsslot Sep 5, 2022
cc4b77b
Remove obsolete sesssion
jpbruinsslot Sep 6, 2022
5b30f5a
Merge pull request #123 from minvws/fix/remove-obsolute-session
jpbruinsslot Sep 6, 2022
ee69ab9
Add more debugging statements
jpbruinsslot Sep 6, 2022
97bcdae
Merge pull request #124 from minvws/container-image
dekkers Sep 7, 2022
2a0db70
Remove references to dispatcher
jpbruinsslot Sep 7, 2022
c2c4358
Support using Postgres as database (#125)
dekkers Sep 7, 2022
0657e99
Merge branch 'develop' into fix/dispatched-status
jpbruinsslot Sep 8, 2022
22a3a22
Restructure hashing / id of items on priority queue
jpbruinsslot Sep 8, 2022
4a45f00
Fix upgrading deb package (#118)
errieman Sep 8, 2022
a8fa2e8
Tie up datastore backend
jpbruinsslot Sep 8, 2022
de447e5
Blackify
jpbruinsslot Sep 12, 2022
1e40a9a
Restructuring
jpbruinsslot Sep 13, 2022
3df9ab3
Making it work
jpbruinsslot Sep 14, 2022
b19275e
Merge pull request #126 from minvws/chore/remove-dispatcher
jpbruinsslot Sep 15, 2022
e5ed70b
Merge branch 'develop' into fix/dispatched-status
jpbruinsslot Sep 15, 2022
55826a9
Merge branch 'fix/dispatched-status' into poc/pq-filtering
jpbruinsslot Sep 15, 2022
ca4e8c1
Fix integration
jpbruinsslot Sep 19, 2022
8222267
Continue work
jpbruinsslot Sep 20, 2022
2d1b8e7
Integration test
Donnype Sep 20, 2022
65af25f
Update tests
jpbruinsslot Sep 21, 2022
c4f1cff
Writing tests
jpbruinsslot Sep 23, 2022
8f20e9e
Add interval for monitor orgs through configuration
jpbruinsslot Sep 28, 2022
0932caa
Update reference configuration
jpbruinsslot Sep 28, 2022
e31f1ce
Blackify
jpbruinsslot Sep 28, 2022
a56ca66
Merge pull request #129 from minvws/feat/monitor-org-timeout
jpbruinsslot Sep 28, 2022
e64c485
Access queues directly from schedulers dict
jpbruinsslot Sep 30, 2022
adb3e09
Correct misuse of value in for loop
jpbruinsslot Sep 30, 2022
33835a3
Add start and stop thread when orgs are added/removed
jpbruinsslot Sep 30, 2022
4f2b6da
Make sure when rabbitmq isnt present that we continue checking octopoes
jpbruinsslot Sep 30, 2022
3d13236
blacken
noamblitz Oct 3, 2022
fe13e13
Update scheduler/schedulers/boefje.py
jpbruinsslot Oct 3, 2022
d5765ed
Update scheduler/schedulers/boefje.py
jpbruinsslot Oct 3, 2022
23ced42
Fix expiring dict
jpbruinsslot Oct 3, 2022
8ecdb72
Revert
jpbruinsslot Oct 3, 2022
4bef739
Blackify
jpbruinsslot Oct 3, 2022
37a5bec
Tests and fixes
jpbruinsslot Oct 5, 2022
50630ec
Merge pull request #130 from minvws/feat/monitor-org-timeout
jpbruinsslot Oct 5, 2022
a0e6117
Merge branch 'develop' into poc/pq-filtering
jpbruinsslot Oct 5, 2022
56da5a7
Docstrings
jpbruinsslot Oct 6, 2022
327d198
Merge branch 'develop' into poc/pq-filtering
jpbruinsslot Oct 6, 2022
ddd0336
Make rest api work
jpbruinsslot Oct 11, 2022
96923a5
Docs, tests, mypy
jpbruinsslot Oct 13, 2022
e118721
Blackify
jpbruinsslot Oct 17, 2022
0654987
Pylint, mypy, black
jpbruinsslot Oct 17, 2022
8d3e475
Code review implementation
jpbruinsslot Oct 18, 2022
daaa6fd
Explicit fixes
jpbruinsslot Oct 20, 2022
149d496
Update api docs
jpbruinsslot Oct 20, 2022
a196f45
Create dependabot.yml
sigio Oct 21, 2022
2dbed3d
Automated update: change workflows to use checkout@v3
sigio Nov 1, 2022
7e6ce6a
Manual cleanups
sigio Nov 1, 2022
5b930c6
Merge pull request #136 from minvws/rdo-workflow-tune
ammar92 Nov 1, 2022
101636c
Add migrations
jpbruinsslot Nov 2, 2022
87c9899
Blackify
jpbruinsslot Nov 8, 2022
c6b6f8e
Fix spelling mistake
jpbruinsslot Nov 8, 2022
a291780
Merge pull request #135 from minvws/sigio-patch-1
ammar92 Nov 8, 2022
53ee167
Bump alembic from 1.7.6 to 1.8.1
dependabot[bot] Nov 8, 2022
9dea9ef
Bump psutil from 5.9.1 to 5.9.4
dependabot[bot] Nov 8, 2022
88f6f43
Bump psycopg2-binary from 2.9.3 to 2.9.5
dependabot[bot] Nov 8, 2022
6e5e53c
Merge pull request #138 from minvws/dependabot/pip/alembic-1.8.1
ammar92 Nov 10, 2022
fde13bb
Merge pull request #139 from minvws/dependabot/pip/psutil-5.9.4
ammar92 Nov 10, 2022
b688dd3
Merge pull request #142 from minvws/dependabot/pip/psycopg2-binary-2.9.5
ammar92 Nov 10, 2022
df0f95c
Bump celery from 5.2.3 to 5.2.7
dependabot[bot] Nov 10, 2022
9d4e742
Merge pull request #141 from minvws/dependabot/pip/celery-5.2.7
ammar92 Nov 10, 2022
bd18903
Bump fastapi from 0.73.0 to 0.86.0
dependabot[bot] Nov 10, 2022
9dcff68
Merge pull request #140 from minvws/dependabot/pip/fastapi-0.86.0
ammar92 Nov 10, 2022
c84d721
Fix scheduler missing bytes task creation
jpbruinsslot Nov 14, 2022
9a3f755
Blackify
jpbruinsslot Nov 14, 2022
4b2feee
Fix server response
jpbruinsslot Nov 14, 2022
2814002
Fix decoding error for boefjes
jpbruinsslot Nov 14, 2022
b69e793
Refactor scan profile / ooi
jpbruinsslot Nov 14, 2022
f7e6541
Update tests
jpbruinsslot Nov 14, 2022
6e33cec
Merge branch 'fix/octopoes-object-type' into fix/task-not-found-bytes
jpbruinsslot Nov 14, 2022
e77455a
Add extra logging
jpbruinsslot Nov 14, 2022
e3dcfea
Blackify
jpbruinsslot Nov 14, 2022
6994ea0
Merge branch 'fix/task-not-found-bytes' into poc/pq-filtering
jpbruinsslot Nov 15, 2022
5613364
Start with keeping state of ooi in scheduler
jpbruinsslot Oct 25, 2022
e815088
Restructure
jpbruinsslot Oct 27, 2022
0f0f951
Adding, and refactoring tests
jpbruinsslot Nov 7, 2022
ef4ea43
Update test
jpbruinsslot Nov 7, 2022
38f9042
Remove random endpoint calls
jpbruinsslot Nov 8, 2022
dc562f6
Fixes for integration
jpbruinsslot Nov 8, 2022
19f4541
Change to ooi_type
jpbruinsslot Nov 10, 2022
b0a44ff
Remove git artifacts
jpbruinsslot Nov 15, 2022
e8f35a1
Fix merge conflicts
jpbruinsslot Nov 15, 2022
71284c8
Merge branch 'poc/pq-filtering' into feature/ooi-state
jpbruinsslot Nov 15, 2022
9c682b2
Fix merge conflicts
jpbruinsslot Nov 15, 2022
7ae8e91
Blackify
jpbruinsslot Nov 15, 2022
1c6bb2f
Fix database dsn name in env.py (PR from open) (#151)
dekkers Nov 17, 2022
98efa0a
Merge pull request #148 from minvws/fix/task-not-found-bytes
jpbruinsslot Nov 21, 2022
b0121a6
Integrate with octopoes changes
jpbruinsslot Nov 21, 2022
f3899d8
Scheduler id should be optional because of rocky
jpbruinsslot Nov 21, 2022
25aaec0
Intergrate with octopoes
jpbruinsslot Nov 22, 2022
f75b6e7
Tests, mypy, pylint
jpbruinsslot Nov 23, 2022
e00dbc9
Docs
jpbruinsslot Nov 28, 2022
d58dec3
Implement suggestions code review
jpbruinsslot Nov 29, 2022
4aae22e
Point to raw_data in normalizer tasks.
Donnype Dec 5, 2022
174e396
Debian package for mula (#133)
errieman Dec 5, 2022
2a18ae7
Draft of implementing scheduled jobs
jpbruinsslot Dec 5, 2022
b8f9140
Merge branch 'develop' into poc/pq-filtering
jpbruinsslot Dec 6, 2022
82bfb3c
Merge pull request #122 from minvws/poc/pq-filtering
jpbruinsslot Dec 6, 2022
ff8bb8d
Break stuff
jpbruinsslot Dec 6, 2022
38823ce
Fix for hash of normalizer task
Donnype Dec 7, 2022
530c325
Merge branch 'develop' into feature/dispatch-normalizers-with-raw-dat…
jpbruinsslot Dec 7, 2022
d303ac6
Remove git artifacts
jpbruinsslot Dec 7, 2022
9c190b3
Structure
jpbruinsslot Dec 7, 2022
d0b04fa
Do not always overwrite the uuid field
Donnype Dec 12, 2022
eb6be85
Fix duplicate tasks
jpbruinsslot Dec 12, 2022
6d36e3f
Blackify
jpbruinsslot Dec 12, 2022
fe675ed
Merge pull request #155 from minvws/feature/dispatch-normalizers-with…
noamblitz Dec 13, 2022
d3e0d47
Fix version
jpbruinsslot Dec 13, 2022
7793f97
Merge pull request #161 from minvws/fix/version-fix
noamblitz Dec 13, 2022
52882d7
Merge branch 'develop' into feature/scheduled-jobs
jpbruinsslot Dec 14, 2022
258c2c3
Job store
jpbruinsslot Dec 14, 2022
9c16a56
Merge remote-tracking branch 'old/feature/scheduled-jobs' into featur…
jpbruinsslot Dec 15, 2022
196c336
Fix merge artifacts
jpbruinsslot Dec 15, 2022
f061836
Fix url's and merge artifacts
jpbruinsslot Dec 15, 2022
d93d83e
Implement scheduled job store
jpbruinsslot Dec 15, 2022
280ea36
Filtering scheduled jobs
jpbruinsslot Dec 19, 2022
afaeaa1
Update boefje scheduler for scheduled jobs
jpbruinsslot Dec 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ to use whatever you want.

### Prerequisites

By the use of environment variables we load in the configuration of the
By the use of environment variables we load in the configuration of the
scheduler. Look at the [.env-dist](.env-dist) file for the application
configuration settings, to build a `.env` file. Refer to the
[`configuration.md`](docs/configuration.md) file for more information on the
Expand Down
43 changes: 27 additions & 16 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ graph TB
Scheduler["Scheduler<br/>[system]"]
TaskRunner["Task Runner<br/>[software system]"]

Rocky--"Create object"-->Octopoes
Rocky--"Create scan job<br/>HTTP POST"--->Scheduler

Octopoes--"Get random oois<br/>HTTP GET"-->Scheduler
Octopoes--"Scan Profile Mutation<br/>AMQP"-->RabbitMQ

RabbitMQ--"Get latest created oois<br/>Get latest raw files<br/>AMQP"-->Scheduler
RabbitMQ--"1. Get scan profile mutation<br/>2. Get latest raw files<br/>AMQP"-->Scheduler

Katalogus--"Get available plugins<br/>HTTP GET"-->Scheduler
Katalogus--"1. Get organizations</br>2. Get available plugins<br/>HTTP GET"-->Scheduler
Bytes--"Get last run boefje<br/>HTTP GET"-->Scheduler

Scheduler--"Pop task of queue"-->TaskRunner
Expand All @@ -73,11 +72,19 @@ Following we review how different dataflows, from the `boefjes` and the
`normalizers` are implemented within the `Scheduler` system. The following
events within a KAT installation will trigger dataflows in the `Scheduler`:

App:

* When a plugin is enabled or disabled (`monitor_organisations`)

* When an organisation is created or deleted (`monitor_organisations`)

* When a scan level is increased (`get_latest_object`)
Boefje scheduler:

* When a new boefje has been added (`get_new_boefjes_by_org_id`)

* When a scan level is increased (`get_scan_profile_mutation`)

Normalizer scheduler:

* When a raw file is created (`get_latest_raw_data`)

Expand Down Expand Up @@ -172,21 +179,25 @@ flowchart TB
* The `BoefjeScheduler` implementation of the `populate_queue()` method will:

- Continuously get the latest scan level changes of ooi's from a message
queue that was sent by octopoes (`get_latest_objects()`). The tasks
created from these ooi's (`tasks = ooi * boefjes`) from this queue will
queue that is sent from octopoes received on a rabbitmq message queue
(`get_scan_profile_mutations`). The tasks created from these ooi's
(`tasks = ooi * boefjes`) from this queue will
get the priority of 2.

- To fill up the queue, and to enforce that we reschedule tasks we get
random ooi's from octopoes (`get_random_objects`). The tasks of from these
ooi's (`tasks = ooi * boefjes`) will get the priority that has been
- Check newly enabled/added boefjes, and check the ooi's in the datastore
that match the boefje and tasks that need to be created.

- To fill up the queue, and to enforce that we reschedule tasks we reference
the scheduler internal ooi datastore. We only consider oois that have been
processed by the scheduler after the set grace period. The tasks of from
these ooi's (`tasks = ooi * boefjes`) will get the priority that has been
calculated by the ranker. At the moment a task will get the priority of 3,
when 7 days have gone by (e.g. how longer it hasn't been checked the
higher the priority it will get). For everything that hasn't been check
before the 7 days it will scale the priority appropriately.
when 7 days have gone by (e.g. how longer it hasn't been checked the higher
the priority it will get). For everything that hasn't been check before the
7 days it will scale the priority appropriately.

- In order for a created tasks from `get_latest_objects()` and
`get_random_objects()` to be elligible for execution, the task adhere to
the following (`create_tasks_for_ooi()`):
- In order for a created tasks o be elligible for execution, the tasks adhere
to the following (`create_tasks_for_ooi()`):

* Should not have run within the 'grace period', meaning a task should not
be scheduled again within the last 24 hours (can be configured).
Expand Down
1 change: 1 addition & 0 deletions scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Settings(BaseSettings):
host_bytes_password: str = Field(..., env="BYTES_PASSWORD")
host_octopoes: str = Field(..., env="OCTOPOES_API")
host_scan_profile: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
host_mutation: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
host_raw_data: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
host_normalizer_meta: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")

Expand Down
2 changes: 1 addition & 1 deletion scheduler/connectors/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .listeners import Listener, RabbitMQ
from .normalizer_meta import NormalizerMeta
from .ooi import ScanProfileMutation
from .raw_data import RawData
from .scan_profile import ScanProfile
31 changes: 31 additions & 0 deletions scheduler/connectors/listeners/ooi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List, Optional

from scheduler.connectors.errors import exception_handler
from scheduler.models import ScanProfileMutation as ScanProfileMutationModel

from .listeners import RabbitMQ


class ScanProfileMutation(RabbitMQ):
name = "scan_profile_mutation"

@exception_handler
def get_scan_profile_mutation(self, queue: str) -> Optional[ScanProfileMutationModel]:
response = self.get(queue)
if response is None:
return None

return ScanProfileMutationModel(**response)

@exception_handler
def get_scan_profile_mutations(self, queue: str, n: int) -> Optional[List[ScanProfileMutationModel]]:
oois: List[ScanProfileMutationModel] = []

for _ in range(n):
ooi = self.get_scan_profile_mutation(queue=queue)
if ooi is None:
break

oois.append(ooi)

return oois
64 changes: 52 additions & 12 deletions scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Dict, List

from scheduler.connectors.errors import exception_handler
from scheduler.models import Boefje, Organisation, Plugin
Expand All @@ -13,10 +13,37 @@ class Katalogus(HTTPService):
def __init__(self, host: str, source: str, timeout: int = 5):
super().__init__(host, source, timeout)

# Example:
#
# {
# "organisation_id": {
# "plugin-id": {}
# }
# }
self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=30)

# Example:
#
# {
# "organisation_id": {
# "plugin-type": {
# "plugin-id": {}
# }
# }
# }
self.organisations_boefje_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=30)
self.organisations_normalizer_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=30)

# Example:
#
# {
# "organisation_id": {
# "plugin-id": {}
# }
# }
self.organisations_new_boefjes_cache: Dict = {}

# Initialize the caches
self._flush_organisations_plugin_cache()
self._flush_organisations_normalizer_type_cache()
self._flush_organisations_boefje_type_cache()
Expand All @@ -26,9 +53,19 @@ def _flush_organisations_plugin_cache(self) -> None:
orgs = self.get_organisations()

for org in orgs:
self.organisations_plugin_cache[org.id] = {
plugin.id: plugin for plugin in self.get_plugins_by_organisation(org.id)
}
if org.id not in self.organisations_plugin_cache:
self.organisations_plugin_cache[org.id] = {}

plugins = self.get_plugins_by_organisation(org.id)
for plugin in plugins:
if plugin.id in self.organisations_plugin_cache[org.id]:
continue

# Add new boefje to organisation plugin cache and new boefjes cache
self.organisations_plugin_cache[org.id][plugin.id] = plugin

# Add new boefje to new boefjes cache
self.organisations_new_boefjes_cache.setdefault(org.id, {})[plugin.id] = plugin

def _flush_organisations_boefje_type_cache(self) -> None:
"""boefje.consumes -> plugin type boefje"""
Expand All @@ -39,18 +76,14 @@ def _flush_organisations_boefje_type_cache(self) -> None:
self.organisations_boefje_type_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue

# NOTE: backwards compatability, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
if plugin.type != "boefje" and plugin.enabled is False: # TODO: CHECK THIS enabled
continue

for type_ in plugin.consumes:
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)

self.logger.debug("flushed boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache)

def _flush_organisations_normalizer_type_cache(self) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("flushing normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)
Expand All @@ -60,12 +93,14 @@ def _flush_organisations_normalizer_type_cache(self) -> None:
self.organisations_normalizer_type_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
if plugin.type != "normalizer" and plugin.enabled is False: # TODO: CHECK THIS enabled
continue

for type_ in plugin.consumes:
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)

self.logger.debug("flushed normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)

@exception_handler
def get_boefjes(self) -> List[Boefje]:
url = f"{self.host}/boefjes"
Expand Down Expand Up @@ -115,3 +150,8 @@ def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_ty
except dict_utils.ExpiredError:
self._flush_organisations_normalizer_type_cache()
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])

def get_new_boefjes_by_org_id(self, organisation_id: str) -> List[Plugin]:
new_boefjes = self.organisations_new_boefjes_cache[organisation_id].values()
self.organisations_new_boefjes_cache[organisation_id] = {}
return new_boefjes
7 changes: 4 additions & 3 deletions scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def __init__(self) -> None:
orgs=svc_katalogus.get_organisations(),
)

lst_scan_profile = listeners.ScanProfile(
dsn=self.config.host_scan_profile,
lst_mutations = listeners.ScanProfileMutation(
dsn=self.config.host_mutation,
)

lst_raw_data = listeners.RawData(
Expand All @@ -71,7 +71,7 @@ def __init__(self) -> None:
services.Katalogus.name: svc_katalogus,
services.Octopoes.name: svc_octopoes,
services.Bytes.name: svc_bytes,
listeners.ScanProfile.name: lst_scan_profile,
listeners.ScanProfileMutation.name: lst_mutations,
listeners.RawData.name: lst_raw_data,
listeners.NormalizerMeta.name: lst_normalizer_meta,
}
Expand All @@ -83,3 +83,4 @@ def __init__(self) -> None:
datastore = sqlalchemy.SQLAlchemy(self.config.database_dsn)
self.task_store: stores.TaskStorer = sqlalchemy.TaskStore(datastore)
self.pq_store: stores.PriorityQueueStorer = sqlalchemy.PriorityQueueStore(datastore)
self.job_store: stores.JobStorer = sqlalchemy.JobStore(datastore)
3 changes: 2 additions & 1 deletion scheduler/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from .boefje import Boefje, BoefjeMeta
from .events import NormalizerMetaReceivedEvent, RawData, RawDataReceivedEvent
from .health import ServiceHealth
from .jobs import ScheduledJob, ScheduledJobORM
from .normalizer import Normalizer
from .ooi import OOI
from .ooi import OOI, OOIORM, MutationOperationType, ScanProfileMutation
from .organisation import Organisation
from .plugin import Plugin
from .queue import Filter, PrioritizedItem, PrioritizedItemORM, Queue
Expand Down
52 changes: 52 additions & 0 deletions scheduler/models/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime
import uuid
from typing import List, Optional

from pydantic import BaseModel, Field
from sqlalchemy import JSON, Boolean, Column, DateTime, String
from sqlalchemy.orm import relationship

from scheduler.utils import GUID

from .base import Base
from .queue import PrioritizedItem
from .tasks import Task


class ScheduledJob(BaseModel):
id: uuid.UUID
hash: str
enabled: bool
crontab: Optional[str]
scheduler_id: str
p_item: PrioritizedItem
tasks: List[Task] = []

next_check: Optional[datetime.datetime] = None

checked_at: Optional[datetime.datetime] = None

created_at: datetime.datetime = Field(
default_factory=datetime.datetime.utcnow)

modified_at: datetime.datetime = Field(
default_factory=datetime.datetime.utcnow)

class Config:
orm_mode = True


class ScheduledJobORM(Base):
__tablename__ = "scheduled_jobs"

id = Column(GUID, primary_key=True, default=uuid.uuid4)
hash = Column(String, nullable=False) # TODO: unique
enabled = Column(Boolean, nullable=False)
crontab = Column(String, nullable=True)
scheduler_id = Column(String, nullable=False)
p_item = Column(JSON, nullable=False)
tasks = relationship("TaskORM", back_populates="scheduled_job")

checked_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, nullable=False)
modified_at = Column(DateTime, nullable=False)
Loading