Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package core
import (
"fmt"
"sync"
)
// This struct implements Scheduler. All actions are run
// locally and jobs only stored in memory. Only one submitted
// job is handled at any given time.
type LocalScheduler struct {
// Destination for all collection jobs.
Dest Importer
// Once for initialization.
on sync.Once
// History of all jobs ever submitted to this
// scheduler.
jobs sync.Map // JobId -> *localjob
// Queue of jobs to execute. LocalScheduler only runs
// one task at a given time.
todo chan *localjob
}
type localjob struct {
// The actual payload of the job.
fun Collecter
// Synchronization aid for WaitFor.
wr waitingroom
// Mutex that protects all following members of the struct.
mu sync.Mutex
// Unique id of the job as assigned by the LocalScheduler.
id JobId
// State of this job.
state JobState
// Error message when state = Failure. Otherwise nil.
failure error
}
func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) {
ls.ensureInitialized()
job := &localjob{
fun: fun,
id: generateJobId(),
state: Created,
}
if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded {
// this should never happen, well, stochastically speaking
return "", fmt.Errorf("generated id=%v not unique", job.id)
}
ls.todo <- job
return job.id, nil
}
func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return JobInfo{}, fmt.Errorf("not entry for id=%v", id)
} else {
lj := value.(*localjob)
lj.mu.Lock()
defer lj.mu.Unlock()
return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil
}
}
func (ls *LocalScheduler) WaitFor(id JobId) error {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return fmt.Errorf("not entry for id=%v", id)
} else {
value.(*localjob).wr.Wait()
return nil
}
}
func (ls *LocalScheduler) loop() {
for {
j, ok := <-ls.todo
if !ok {
return // done handling jobs
}
ls.execute(j)
}
}
func (ls *LocalScheduler) execute(j *localjob) {
// set state to Active
j.mu.Lock()
j.state = Active
j.mu.Unlock()
// run the payload
err := j.fun.Collect(ls.Dest)
// update state according to return
j.mu.Lock()
if err == nil {
j.state = Successful
} else {
j.state = Failed
j.failure = err
}
j.mu.Unlock()
// release all routines waiting for this job to finish
j.wr.Release()
}
func (ls *LocalScheduler) ensureInitialized() {
ls.on.Do(func() {
// initialize complicated struct members
ls.todo = make(chan *localjob)
// run concurrent tasks
go ls.loop()
})
}