-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial pass at proposal to change subscribers #727
Initial pass at proposal to change subscribers #727
Conversation
@eriknelson @jmrodri initial proposal based on our conversations. |
@maleck13 would it be possible to draw a picture of what it will look like? Even a picture on paper would work. Otherwise, I'll look at it tomorrow and see if I can get a picture done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar checking review. Another review coming regarding actual implementation proposal.
@@ -0,0 +1,189 @@ | |||
# Switch to using Fan in Fan out / Observer pattern for subscribers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fan in Fan out -> Fan-in & Fan-out
@@ -0,0 +1,189 @@ | |||
# Switch to using Fan in Fan out / Observer pattern for subscribers | |||
|
|||
This is not quite the same as a fan in fan out pattern but not a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fan in fan out -> fan-in / fan-out
|
||
**Observer:** suggestion to change the name from subscriber to observer | ||
as I think it more clearly conveys what they will be doing. | ||
For this doc I will continue to call them subscribers to avoid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comma after doc "For this doc, I "
To surmise: | ||
|
||
- **Shared Channel with single subscriber**: There is a single buffered channel per topic (topics being broker | ||
actions such as provision, bind etc). All jobs of the given type write to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add commas "All jobs, of the given type, write to this channel"
this channel. This has the potential to cause a bottleneck, particularly | ||
as there is currently only a single subscriber reading from this channel and | ||
this subscriber does not read the next message until it has completed | ||
all of its work. Currently there is not too much happening in these |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comma missing after Currently
The stop channel here would be tied to os signals | ||
[example](https://gist.github.com/reiki4040/be3705f307d3cd136e85). | ||
|
||
In this case we would still have a single channel per topic, but as we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comma after "In this case"
|
||
The work engine will now be the one reading messages from the channel | ||
and passing it on to the Observers. To do this I propose we add a public | ||
``Start(stop <-chan struct{})`` method, that would be called in it's own |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TYPO: it's -> its
#### 2) Create a channel per job | ||
|
||
Remove the single channel per topic and replace it with a channel per | ||
job. The work engine would be updated to create a channel per Job, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the comma after Job
job. The work engine would be updated to create a channel per Job, | ||
when starting a new job. The work engine would be responsible for | ||
the lifecycle of these channels. As above, the subscribers would be | ||
registered in a map and called in a non blocking way when a message was received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TYPO: non blocking -> nonblocking
|
||
## Considerations | ||
|
||
- perhaps want a config option to limit the number of in progress jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TYPO: in progress -> in-progress
@jmrodri Thanks for the quick response ! I will fix up the typos etc. Happy to do a picture, but I am mostly unavailable today and tomorrow due to training. In the first Option there would be a listener in the workengine that distributes to the observers for each topic. The image only shows provision. The difference for the second option is that there would be a channel and a listener created per job and disappear once the job was finished. If I get chance today I will attempt to draw that up. |
updated based on comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work!
@jmrodri @eriknelson any more comments here? |
On my review stack! |
@maleck13 This is pedantic, but is there a semantic difference between the pattern language of |
``Start(stop <-chan struct{})`` method, that would be called in its own | ||
go routine during ASB start up. I also propose adding a signal channel | ||
that will send a message to anything running in the background | ||
(could also use context.Context). This would signal for the background |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the use of context.
|
||
```go | ||
|
||
StartNewJob(token string, work Work, topic WorkTopic){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own clarification, the var a
in the Sudo Code is for the work engine itself.
so from this, we see that a keeps track of all of the channels per a token, and keeps track of all of the subscribers for a given topic. Just want to make sure there is no other concerns that the engine is keeping track of.
Also, do we need to think about passing a context around here? I envision a use case where a subscriber says hey some really odd happened here and needs to send a message to other subscribers. I believe that is the use case for context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes those are the two things that would need to be tracked by the engine, If we were to use a channel per job.
On context, it is normally used to signal timeout and cancellations. I would not be against accepting a context as part of a Jobs definition to enable cancellation or potentially timeouts.
Context can also carry values across API lines, but I am not such a fan of this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding the context for timeout is something that we should do.
I think that canceling a running subscriber if another subscriber fails is something we should consider.
@eriknelson @jmrodri do you guys have thoughts on timeouts or cancellations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that canceling a running subscriber if another subscriber fails is something we should consider.
My gut feeling is that they're intended to take a simple data structure that contains the result of some work and do something useful with it completely independent from other subscribers. I'm not sure I like the idea of them communicating with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concept of Jobs that can be cancelled is probably a proposal of its own and outside the remit of this work.
If we had jobs that could somehow be cancelled, then the Job itself would take the context, I don't think the subscribers would as they only react to messages passed in. In the case of cancellation, the likelihood is a message would be passed in with a state of cancelled rather than the subscriber watching the context itself.
@eriknelson Happy to leave them as subscribers, really just a suggestion as I like the term observer in the context of the Job. They are observing / watching the job. I agree though, I don't think there is really much difference and I can see that channel is essentially a message bus |
I believe the actions on me here are to
Remaining is a discussion about passing a context in to Job to allow for timeout and cancellation. (should this be a separate proposal perhaps)? Am I missing anything? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions.
My preferred approach is 2), but honestly 1) is attractive for it's simplicity.
Rather than passing the channel into the subscriber, we would rename and | ||
change this interface to accept a ```JobMsg```, value not a pointer to stop | ||
unexpected mutation, and return an error. | ||
The current subscribers would handle state persistence and would be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current subscribers would handle state persistence and would be renamed to reflect their role.
+1, this is the important aspect to me. I would like to keep the jobs decoupled from the consumers that do something useful with the result. It's a deliberate aspect of the original design that I still think is important.
select{ | ||
case msg := <-provisionChan: | ||
for _,s := range e.subscribers[ProvisionTopic]{ | ||
go s(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of an inconsistency here I think. (I'm going to use Subscriber
here for the sake of clarity, assume it is synonymous with Observer
). If we are moving away from subscriber's taking a channel as an argument with Notify(msg JobMsg)error
, this should be s.Notify(msg)
. Additionally, this is fire-and-forget; the engine doesn't have an opportunity to deal with errors that may come back. That may be desirable for this soln? The question I would ask is:
- Is it okay to just fire these off and ignore the outcome? If so, it doesn't make sense to return an error from
Notify
. How do we handle error scenarios in the subscribers then, if at all? - If it's not okay, how do we expect the engine to handle errors that have been returned from subscribers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I should not have had the error return on the interface, I will change this. As each WorkSubscriber is an independent function unaware of any of the other work that is happening, I think it makes sense for them to handle the error internally (similar to how the subscribers work now, marking the action as failed, cleaning up etc) as the only real options for the WorkEngine on receiving an error would be to log it.
job. The work engine would be updated to create a channel per Job | ||
when starting a new job. The work engine would be responsible for | ||
the lifecycle of these channels. As above, the subscribers would be | ||
registered in a map and called in a non-blocking way when a message was received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the expectation that there will be unique instances of registered subscriber types, per job? I.E., if I have an EtcdSub
, and a RabbitSub
, the work engine will instantiate unique ones for each job? Or each topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as they are intended to be stateless functions, they would be registered at startup for each topic. I hadn't intended for them to be created per job just called per job. I will make this clearer in the proposal
go func(){ | ||
// will auto stop once the channel is closed | ||
for msg := range a.jobs[token]{ | ||
for _, s := range a.subscribers[topic]{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm my understanding of this? It appears that there is a single instance of a given subscriber for each topic in this model, where there is only one topic per action. The subscribers listening on a given topic have their Notify
methods triggered when a JobMsg
comes in. It's spun off immediately into a go routine? Begs the question, why are subscribers structs at all, if the same instance is triggered for all jobs. Seems to me it should just be a stateless handler function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your understanding here is correct and inline with what I was thinking. One clarification: in the current proposal the subscribers would implement the WorkObserver / WorkSubscriber
interface allowing us to have many different types to be Subscribers. That said, the WorkSubscriber
could be declared as a func type
WorkSubscriber func(msg JobMsg)
for _, s := range a.subscribers[topic]{ | ||
// Note we could choose to run these sync but again if there was a slow | ||
// subscriber it would block receiving on the channel. | ||
go s(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment WRT Notify
and what we should do about errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
|
||
```go | ||
|
||
StartNewJob(token string, work Work, topic WorkTopic){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that canceling a running subscriber if another subscriber fails is something we should consider.
My gut feeling is that they're intended to take a simple data structure that contains the result of some work and do something useful with it completely independent from other subscribers. I'm not sure I like the idea of them communicating with each other.
We need a consensus on whether we'd like to pursue implementation 1 or 2. 2 feels closer to the pitch made on our call. I would personally prefer it because it feels more flexible as @maleck13 points out. Is it fair to say most people prefer 2? |
68f7868
to
fe112c2
Compare
Have updated with more details and removed the Observer name change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @maleck13, looking good. I think we should go with 2) unless someone wants to argue otherwise.
@maleck13 I'm considering this accepted given the acks and discussion. Thanks again. |
Great ! thanks guys |
Describe what this PR does and why we need it:
Proposal based on conversations with the team about the current subscriber pattern and some of its weaknesses.
This proposal is fairly complete in my opinion but keen to get feedback on the approaches outlined and to ensure I am fulfilling everyone's expectations around this change.
Which issue this PR fixes (This will close that issue when PR gets merged)
fixes #638