Commit 0785c72f authored by Cristian Maglie's avatar Cristian Maglie Committed by Cristian Maglie

Fixed race condition at the edge of builds. Using queue channel.

The last loop collecting the remaining objectFiles may be run before the
last jobs completes.
This commit replaces the two channels used to fill objectFiles and to
signal error with direct variable access guarded by mutex, this avoids
race conditions at the end and streamlines the whole process.

Also added a 'queue' channel to feed the goroutines, this is not strictly
part of the fix, but helps to fairly distribute the workload.
parent 764fa09f
...@@ -169,57 +169,57 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources ...@@ -169,57 +169,57 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources
if len(sources) == 0 { if len(sources) == 0 {
return objectFiles, nil return objectFiles, nil
} }
objectFilesChan := make(chan *paths.Path) var objectFilesMux sync.Mutex
errorsChan := make(chan error) var errors []error
doneChan := make(chan struct{}) var errorsMux sync.Mutex
ctx.Progress.Steps = ctx.Progress.Steps / float64(len(sources)) ctx.Progress.Steps = ctx.Progress.Steps / float64(len(sources))
var wg sync.WaitGroup
// Split jobs into batches of N jobs each; wait for the completion of a batch to start the next
par := ctx.Jobs
go func() { queue := make(chan *paths.Path)
for total := 0; total < len(sources); total += par { job := func(source *paths.Path) {
for i := total; i < total+par && i < len(sources); i++ {
wg.Add(1)
go func(source *paths.Path) {
defer wg.Done()
PrintProgressIfProgressEnabledAndMachineLogger(ctx) PrintProgressIfProgressEnabledAndMachineLogger(ctx)
objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe) objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe)
if err != nil { if err != nil {
errorsChan <- err errorsMux.Lock()
errors = append(errors, err)
errorsMux.Unlock()
} else { } else {
objectFilesChan <- objectFile objectFilesMux.Lock()
} objectFiles.Add(objectFile)
}(sources[i]) objectFilesMux.Unlock()
} }
wg.Wait()
} }
doneChan <- struct{}{} // Spawn jobs runners
}() var wg sync.WaitGroup
for i := 0; i < ctx.Jobs; i++ {
wg.Add(1)
go func() { go func() {
wg.Wait() for source := range queue {
doneChan <- struct{}{} job(source)
}
wg.Done()
}() }()
}
for { // Feed jobs until error or done
select { for _, source := range sources {
case objectFile := <-objectFilesChan: errorsMux.Lock()
objectFiles.Add(objectFile) gotError := len(errors) > 0
case err := <-errorsChan: errorsMux.Unlock()
return nil, i18n.WrapError(err) if gotError {
case <-doneChan: break
close(objectFilesChan)
for objectFile := range objectFilesChan {
objectFiles.Add(objectFile)
} }
objectFiles.Sort() queue <- source
return objectFiles, nil
} }
close(queue)
wg.Wait()
if len(errors) > 0 {
// output the first error
return nil, i18n.WrapError(errors[0])
} }
objectFiles.Sort()
return objectFiles, nil
} }
func compileFileWithRecipe(ctx *types.Context, sourcePath *paths.Path, source *paths.Path, buildPath *paths.Path, buildProperties *properties.Map, includes []string, recipe string) (*paths.Path, error) { func compileFileWithRecipe(ctx *types.Context, sourcePath *paths.Path, source *paths.Path, buildPath *paths.Path, buildProperties *properties.Map, includes []string, recipe string) (*paths.Path, error) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment