Skip to content
Snippets Groups Projects
local_scheduler.go 2.55 KiB
Newer Older
package core

import (
	"fmt"
	"sync"
)

// This struct implements Scheduler. All actions are run
// locally and jobs only stored in memory. Only one submitted
// job is handled at any given time.
type LocalScheduler struct {
	// Destination for all collection jobs.
	Dest Importer

	// Once for initialization.
	on sync.Once

	// History of all jobs ever submitted to this
	// scheduler.
	jobs sync.Map // JobId -> *localjob

	// Queue of jobs to execute. LocalScheduler only runs
	// one task at a given time.
	todo chan *localjob
}

type localjob struct {
	// The actual payload of the job.
	fun Collecter

	// Synchronization aid for WaitFor.
	wr waitingroom

	// Mutex that protects all following members of the struct.
	mu sync.Mutex

	// Unique id of the job as assigned by the LocalScheduler.
	id JobId

	// State of this job.
	state JobState

	// Error message when state = Failure. Otherwise nil.
	failure error
}

func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) {
	ls.ensureInitialized()

	job := &localjob{
		fun:   fun,
		id:    generateJobId(),
		state: Created,
	}

	if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded {
		// this should never happen, well, stochastically speaking
		return "", fmt.Errorf("generated id=%v not unique", job.id)
	}

	ls.todo <- job
	return job.id, nil
}

func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) {
	ls.ensureInitialized()

	if value, ok := ls.jobs.Load(id); !ok {
		return JobInfo{}, fmt.Errorf("not entry for id=%v", id)
	} else {
		lj := value.(*localjob)

		lj.mu.Lock()
		defer lj.mu.Unlock()

		return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil
	}
}

func (ls *LocalScheduler) WaitFor(id JobId) error {
	ls.ensureInitialized()

	if value, ok := ls.jobs.Load(id); !ok {
		return fmt.Errorf("not entry for id=%v", id)
	} else {
		value.(*localjob).wr.Wait()
		return nil
	}
}

func (ls *LocalScheduler) loop() {
	for {
		j, ok := <-ls.todo

		if !ok {
			return // done handling jobs
		}

		ls.execute(j)
	}
}

func (ls *LocalScheduler) execute(j *localjob) {
	// set state to Active

	j.mu.Lock()
	j.state = Active
	j.mu.Unlock()

	// run the payload

	err := j.fun.Collect(ls.Dest)

	// update state according to return

	j.mu.Lock()

	if err == nil {
		j.state = Successful
	} else {
		j.state = Failed
		j.failure = err
	}

	j.mu.Unlock()

	// release all routines waiting for this job to finish

	j.wr.Release()
}

func (ls *LocalScheduler) ensureInitialized() {
	ls.on.Do(func() {
		// initialize complicated struct members
		ls.todo = make(chan *localjob)

		// run concurrent tasks
		go ls.loop()
	})
}