diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go index 48b9a397716d84013b0bf949fad4eacba4a06a2c..19bd6c1d827755ba852eb52e9112b703bbd3636c 100644 --- a/ulocollect/core/collector.go +++ b/ulocollect/core/collector.go @@ -7,13 +7,14 @@ import ( // A Collector schedules the execution of various Jobs. type Collector struct { // The Importer we forward the ULO/RDF bytes to. - im Importer + Destination Importer // Collection of all ever sumbmitted jobs. Aquire // mu before accessing this collection. jobs JobMap } +// Submit j for execution. func (c *Collector) Submit(j *Job) error { id := j.Id() @@ -21,6 +22,52 @@ func (c *Collector) Submit(j *Job) error { return fmt.Errorf("Id=%v already submitted", id) } - go j.Collect(c) + j.mu.Lock() + defer j.mu.Unlock() + + go c.execute(j) return nil } + +func (c *Collector) execute(j *Job) { + // Aquire lock as we need to set the status initially. + + j.mu.Lock() + + // Set state to submitted. Later we'll want to do some kind + // of scheduling at which point a job can spend a lot of time + // in this state. But in the current version we don't do any + // clever scheduling and as such the job enters the Active + // state right away. + + j.state = Submitted + + // Yep, we are starting already! + + j.state = Active + + // Now we give up the lock as we run the actual payload. + + j.mu.Unlock() + + // Run the actual payload. + + err := j.Collect(c) + + // Set completion state. + + j.mu.Lock() + + if err != nil { + j.state = Failed + j.failure = err + } else { + j.state = Successful + } + + j.wr.Release() + + // We are done mutating the Job. + + j.mu.Unlock() +} diff --git a/ulocollect/core/dummy_job.go b/ulocollect/core/dummy_job.go index 56bec64f84f71abe004f26b987099578adf72b1d..6da7aa73f99d8e30527719dcb2dcb04eee5b4ab4 100644 --- a/ulocollect/core/dummy_job.go +++ b/ulocollect/core/dummy_job.go @@ -7,5 +7,7 @@ func NewDummyJob() *Job { self.Log("called!") return nil }, + id: generateJobId(), + state: Created, } } diff --git a/ulocollect/core/fs_job.go b/ulocollect/core/fs_job.go index 8aa8d77cd23cf5ae88c91b2d7afc2b2fee9320b8..f29cc378ebb3f20e8a854966b87abf48c64d4da0 100644 --- a/ulocollect/core/fs_job.go +++ b/ulocollect/core/fs_job.go @@ -14,6 +14,7 @@ func NewFileSystemJob(path string) *Job { return &Job{ collectfunc: makeFsCollectFunc(path), id: generateJobId(), + state: Created, } } @@ -28,7 +29,7 @@ func makeFsCollectFunc(path string) func(*Job, *Collector) error { } log.Println(path) - return importLocalFile(c.im, path) + return importLocalFile(c.Destination, path) }) } } diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go index 73c9ac2af3e17a2772add4d266fd2a328f801ded..bfa0824e1518620740fb64fbfcfa6bb20cfe732d 100644 --- a/ulocollect/core/job.go +++ b/ulocollect/core/job.go @@ -16,6 +16,10 @@ type Job struct { // or in failure. collectfunc func(self *Job, c *Collector) error + // Waiting room for people to wait on completion of this + // job. + wr waitingroom + // Global lock for all following members. mu sync.Mutex @@ -90,3 +94,9 @@ func (bj *Job) Logf(format string, v ...interface{}) { func (bj *Job) Logs() io.Reader { panic("not yet implemented") } + +// Wait for this job to complete. If this job was never submitted +// before a call to Wait, this call to Wait will never return. +func (bj *Job) Wait() { + bj.wr.Wait() +} diff --git a/ulocollect/core/waiting_room.go b/ulocollect/core/waiting_room.go new file mode 100644 index 0000000000000000000000000000000000000000..678525c890fe243fb411e2813183883117cd3dcc --- /dev/null +++ b/ulocollect/core/waiting_room.go @@ -0,0 +1,43 @@ +package core + +import "sync" + +type waitingroom struct { + initer sync.Once + cond *sync.Cond + released bool +} + +// Wait for anyone to call Release. +func (wr *waitingroom) Wait() { + wr.initialize() + + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + + for !wr.released { + wr.cond.Wait() + } +} + +// Release all goroutines currently stuck in Wait. Also +// results in all later calls to Wait to simply not wait +// at all. +func (wr *waitingroom) Release() { + wr.initialize() + + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + + wr.released = true + + wr.cond.Broadcast() +} + +// Initialize if necessary. +func (wr *waitingroom) initialize() { + wr.initer.Do(func() { + wr.cond = sync.NewCond(&sync.Mutex{}) + wr.released = false + }) +} diff --git a/ulocollect/main.go b/ulocollect/main.go index 61a6a595486b25257ce50aee97d397ed7dc14bbe..6dc1fa37167bfbbaeb194ea562f0b96cc183d322 100644 --- a/ulocollect/main.go +++ b/ulocollect/main.go @@ -4,21 +4,21 @@ import ( "fmt" "gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core" "log" - "time" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) - j := core.NewFileSystemJob("/etc") - c := core.Collector{} + i := core.DummyImporter{} + j := core.NewFileSystemJob("/mnt/ramdisk") + c := core.Collector{ + Destination: i, + } if err := c.Submit(j); err != nil { fmt.Println(err) } fmt.Println("/done") - - // TODO: make jobs awaitable - time.Sleep(5 * time.Second) + j.Wait() }