Newer
Older
package core
import (
"fmt"
)
// A Collector schedules the execution of various Jobs.
type Collector struct {
// The Importer we forward the ULO/RDF bytes to.
// Collection of all ever sumbmitted jobs. Aquire
// mu before accessing this collection.
func (c *Collector) Submit(j *Job) error {
if _, loaded := c.jobs.LoadOrStore(id, j); loaded {
return fmt.Errorf("Id=%v already submitted", id)
j.mu.Lock()
defer j.mu.Unlock()
go c.execute(j)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (c *Collector) execute(j *Job) {
// Aquire lock as we need to set the status initially.
j.mu.Lock()
// Set state to submitted. Later we'll want to do some kind
// of scheduling at which point a job can spend a lot of time
// in this state. But in the current version we don't do any
// clever scheduling and as such the job enters the Active
// state right away.
j.state = Submitted
// Yep, we are starting already!
j.state = Active
// Now we give up the lock as we run the actual payload.
j.mu.Unlock()
// Run the actual payload.
err := j.Collect(c)
// Set completion state.
j.mu.Lock()
if err != nil {
j.state = Failed
j.failure = err
} else {
j.state = Successful
}
j.wr.Release()
// We are done mutating the Job.
j.mu.Unlock()
}