Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use UDF specific names for server info file #35

Merged
merged 1 commit into from
Mar 28, 2024

Conversation

BulkBeing
Copy link
Collaborator

This change also makes the start_with_shutdown API for all servers a bit more nicer.

@BulkBeing BulkBeing requested review from yhl25 and vigith March 25, 2024 11:24
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this now run against numaflow main branch?

let shutdown = async {
shutdown_rx.await.unwrap();
};
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await });

tokio::time::sleep(Duration::from_millis(50)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i forget why we need this sleep.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done just to ensure the server is started completely before sending requests to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we write a small utility func to do a ping and use that? else there could be flaky tests in the future. we do not have to do this in this PR.

@BulkBeing
Copy link
Collaborator Author

does this now run against numaflow main branch?

I haven't tried running a full pipeline since the only change was the server info filename. I copied it from numaflow-go.

@BulkBeing
Copy link
Collaborator Author

Tested with v1.2.0-rc1 release of numaflow:

kubectl apply -n numaflow-system -f https://github.com/numaproj/numaflow/releases/download/v1.2.0-rc1/install.yaml

And ran a source that pulls from kafka:

➜  kubectl logs -f kafka-source-rust-in-0-rzxng -c numa
{"level":"info","ts":"2024-03-28T06:51:36.390248207Z","logger":"numaflow.Source-processor","caller":"commands/processor.go:48","msg":"Starting vertex data processor","version":"Version: v1.2.0-rc1, BuildDate: 2024-03-26T16:15:00Z, GitCommit: 0a1a2e8b85f98e15d3e6a71eb8c07f591e796ea6, GitTag: v1.2.0-rc1, GitTreeState: clean, GoVersion: go1.21.8, Compiler: gc, Platform: linux/arm64"}
{"level":"info","ts":"2024-03-28T06:51:36.39428027Z","logger":"numaflow.Source-processor","caller":"fetch/source_fetcher.go:43","msg":"Creating a new source watermark fetcher","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:36.394451894Z","logger":"numaflow.Source-processor","caller":"fetch/processor_manager.go:143","msg":"Refreshing ActiveProcessors ticker started","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:36.395726262Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:264","msg":"Successfully created watcher","pipeline":"kafka-source-rust","vertex":"in","kvName":"default-kafka-source-rust-in_SOURCE_PROCESSORS","watcher":"default-kafka-source-rust-in_SOURCE_PROCESSORS"}
{"level":"info","ts":"2024-03-28T06:51:36.395787054Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:199","msg":"Watcher initialization and subscription got nil value","pipeline":"kafka-source-rust","vertex":"in","kvName":"default-kafka-source-rust-in_SOURCE_PROCESSORS"}
{"level":"info","ts":"2024-03-28T06:51:36.396084427Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:264","msg":"Successfully created watcher","pipeline":"kafka-source-rust","vertex":"in","kvName":"default-kafka-source-rust-in_SOURCE_OT","watcher":"default-kafka-source-rust-in_SOURCE_OT"}
{"level":"info","ts":"2024-03-28T06:51:36.39611551Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:199","msg":"Watcher initialization and subscription got nil value","pipeline":"kafka-source-rust","vertex":"in","kvName":"default-kafka-source-rust-in_SOURCE_OT"}
2024/03/28 06:51:36 Server info file /var/run/numaflow/sourcer-server-info is not ready...
2024/03/28 06:51:37 ServerInfo: &{uds rust 0.0.1 map[]}
2024/03/28 06:51:37 UDS Client: unix:/var/run/numaflow/source.sock
{"level":"info","ts":"2024-03-28T06:51:37.400538181Z","logger":"numaflow.Source-processor","caller":"sources/source.go:250","msg":"Start processing source messages","pipeline":"kafka-source-rust","vertex":"in","isbs":"jetstream","to":["default-kafka-source-rust-out-0"]}
{"level":"info","ts":"2024-03-28T06:51:37.400588097Z","logger":"numaflow.Source-processor","caller":"udsource/user_defined_source.go:166","msg":"Starting user-defined source...","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:37.400618014Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:237","msg":"Generating self-signed certificate","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:37.400637264Z","logger":"numaflow.Source-processor","caller":"forward/data_forward.go:132","msg":"Starting forwarder...","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:37.401199844Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:269","msg":"Not enabling pprof debug endpoints","pipeline":"kafka-source-rust","vertex":"in"}
{"level":"info","ts":"2024-03-28T06:51:37.40123476Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:282","msg":"Starting metrics HTTPS server","pipeline":"kafka-source-rust","vertex":"in"}

@vigith vigith merged commit 819ddad into numaproj:main Mar 28, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants