package core import ( "fmt" "sync" ) // This struct implements Scheduler. All actions are run // locally and jobs only stored in memory. Only one submitted // job is handled at any given time. type LocalScheduler struct { // Destination for all collection jobs. Dest Importer // Once for initialization. on sync.Once // History of all jobs ever submitted to this // scheduler. jobs sync.Map // JobId -> *localjob // Queue of jobs to execute. LocalScheduler only runs // one task at a given time. todo chan *localjob } type localjob struct { // The actual payload of the job. fun Collecter // Synchronization aid for WaitFor. wr waitingroom // Mutex that protects all following members of the struct. mu sync.Mutex // Unique id of the job as assigned by the LocalScheduler. id JobId // State of this job. state JobState // Error message when state = Failure. Otherwise nil. failure error } func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) { ls.ensureInitialized() job := &localjob{ fun: fun, id: generateJobId(), state: Created, } if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded { // this should never happen, well, stochastically speaking return "", fmt.Errorf("generated id=%v not unique", job.id) } ls.todo <- job return job.id, nil } func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) { ls.ensureInitialized() if value, ok := ls.jobs.Load(id); !ok { return JobInfo{}, fmt.Errorf("not entry for id=%v", id) } else { lj := value.(*localjob) lj.mu.Lock() defer lj.mu.Unlock() return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil } } func (ls *LocalScheduler) WaitFor(id JobId) error { ls.ensureInitialized() if value, ok := ls.jobs.Load(id); !ok { return fmt.Errorf("not entry for id=%v", id) } else { value.(*localjob).wr.Wait() return nil } } func (ls *LocalScheduler) loop() { for { j, ok := <-ls.todo if !ok { return // done handling jobs } ls.execute(j) } } func (ls *LocalScheduler) execute(j *localjob) { // set state to Active j.mu.Lock() j.state = Active j.mu.Unlock() // run the payload err := j.fun.Collect(ls.Dest) // update state according to return j.mu.Lock() if err == nil { j.state = Successful } else { j.state = Failed j.failure = err } j.mu.Unlock() // release all routines waiting for this job to finish j.wr.Release() } func (ls *LocalScheduler) ensureInitialized() { ls.on.Do(func() { // initialize complicated struct members ls.todo = make(chan *localjob) // run concurrent tasks go ls.loop() }) }