Skip to content
Snippets Groups Projects
collector.go 1.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • package core
    
    import (
    	"fmt"
    )
    
    // A Collector schedules the execution of various Jobs.
    type Collector struct {
    	// The Importer we forward the ULO/RDF bytes to.
    
    	Destination Importer
    
    
    	// Collection of all ever sumbmitted jobs. Aquire
    	// mu before accessing this collection.
    
    // Submit j for execution.
    
    func (c *Collector) Submit(j *Job) error {
    
    	if _, loaded := c.jobs.LoadOrStore(id, j); loaded {
    		return fmt.Errorf("Id=%v already submitted", id)
    
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	go c.execute(j)
    
    
    func (c *Collector) execute(j *Job) {
    	// Aquire lock as we need to set the status initially.
    
    	j.mu.Lock()
    
    	// Set state to submitted. Later we'll want to do some kind
    	// of scheduling at which point a job can spend a lot of time
    	// in this state. But in the current version we don't do any
    	// clever scheduling and as such the job enters the Active
    	// state right away.
    
    	j.state = Submitted
    
    	// Yep, we are starting already!
    
    	j.state = Active
    
    	// Now we give up the lock as we run the actual payload.
    
    	j.mu.Unlock()
    
    	// Run the actual payload.
    
    	err := j.Collect(c)
    
    	// Set completion state.
    
    	j.mu.Lock()
    
    	if err != nil {
    		j.state = Failed
    		j.failure = err
    	} else {
    		j.state = Successful
    	}
    
    	j.wr.Release()
    
    	// We are done mutating the Job.
    
    	j.mu.Unlock()
    }