Unverified Commit 1b048093 authored by Cristian Maglie's avatar Cristian Maglie Committed by GitHub

Fixed start/stop handling of Discovery (#1346)

parent 91c296c8
......@@ -95,7 +95,6 @@ func New(id string, args ...string) (*PluggableDiscovery, error) {
process: proc,
incomingMessagesChan: messageChan,
outgoingCommandsPipe: stdin,
alive: true,
}
go disc.jsonDecodeLoop(stdout, messageChan)
return disc, nil
......@@ -118,6 +117,7 @@ func (disc *PluggableDiscovery) jsonDecodeLoop(in io.Reader, outChan chan<- *dis
disc.incomingMessagesError = err
disc.statusMutex.Unlock()
close(outChan)
// TODO: Try restarting process some times before closing it completely
}
for {
......@@ -202,6 +202,9 @@ func (disc *PluggableDiscovery) runProcess() error {
if err := disc.process.Start(); err != nil {
return err
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
disc.alive = true
return nil
}
......@@ -257,6 +260,13 @@ func (disc *PluggableDiscovery) Stop() error {
} else if msg.Message != "OK" || msg.Error {
return errors.Errorf("command failed: %s", msg.Message)
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventChan != nil {
close(disc.eventChan)
disc.eventChan = nil
}
disc.eventsMode = false
return nil
}
......@@ -272,6 +282,13 @@ func (disc *PluggableDiscovery) Quit() error {
} else if msg.Message != "OK" || msg.Error {
return errors.Errorf("command failed: %s", msg.Message)
}
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventChan != nil {
close(disc.eventChan)
disc.eventChan = nil
}
disc.alive = false
return nil
}
......@@ -296,9 +313,14 @@ func (disc *PluggableDiscovery) List() ([]*Port, error) {
// The event channel must be consumed as quickly as possible since it may block the
// discovery if it becomes full. The channel size is configurable.
func (disc *PluggableDiscovery) EventChannel(size int) <-chan *Event {
c := make(chan *Event, size)
disc.statusMutex.Lock()
defer disc.statusMutex.Unlock()
if disc.eventChan != nil {
// In case there is already an existing event channel in use we close it
// before creating a new one.
close(disc.eventChan)
}
c := make(chan *Event, size)
disc.eventChan = c
return c
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment