From a9bb6868c350c84233c0d83e2a532e5ea97d54b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Sch=C3=A4rtl?= <andreas@schaertl.me> Date: Mon, 11 May 2020 14:34:49 +0200 Subject: [PATCH] ulocollect: wait for completion - This now works just fine. - Next up is (1) refactoring, (2) cleanup of the received ULO/RDF files and (3) better state information (for the web interface) --- ulocollect/core/collector.go | 51 +++++++++++++++++++++++++++++++-- ulocollect/core/dummy_job.go | 2 ++ ulocollect/core/fs_job.go | 3 +- ulocollect/core/job.go | 10 +++++++ ulocollect/core/waiting_room.go | 43 +++++++++++++++++++++++++++ ulocollect/main.go | 12 ++++---- 6 files changed, 112 insertions(+), 9 deletions(-) create mode 100644 ulocollect/core/waiting_room.go diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go index 48b9a39..19bd6c1 100644 --- a/ulocollect/core/collector.go +++ b/ulocollect/core/collector.go @@ -7,13 +7,14 @@ import ( // A Collector schedules the execution of various Jobs. type Collector struct { // The Importer we forward the ULO/RDF bytes to. - im Importer + Destination Importer // Collection of all ever sumbmitted jobs. Aquire // mu before accessing this collection. jobs JobMap } +// Submit j for execution. func (c *Collector) Submit(j *Job) error { id := j.Id() @@ -21,6 +22,52 @@ func (c *Collector) Submit(j *Job) error { return fmt.Errorf("Id=%v already submitted", id) } - go j.Collect(c) + j.mu.Lock() + defer j.mu.Unlock() + + go c.execute(j) return nil } + +func (c *Collector) execute(j *Job) { + // Aquire lock as we need to set the status initially. + + j.mu.Lock() + + // Set state to submitted. Later we'll want to do some kind + // of scheduling at which point a job can spend a lot of time + // in this state. But in the current version we don't do any + // clever scheduling and as such the job enters the Active + // state right away. + + j.state = Submitted + + // Yep, we are starting already! + + j.state = Active + + // Now we give up the lock as we run the actual payload. + + j.mu.Unlock() + + // Run the actual payload. + + err := j.Collect(c) + + // Set completion state. + + j.mu.Lock() + + if err != nil { + j.state = Failed + j.failure = err + } else { + j.state = Successful + } + + j.wr.Release() + + // We are done mutating the Job. + + j.mu.Unlock() +} diff --git a/ulocollect/core/dummy_job.go b/ulocollect/core/dummy_job.go index 56bec64..6da7aa7 100644 --- a/ulocollect/core/dummy_job.go +++ b/ulocollect/core/dummy_job.go @@ -7,5 +7,7 @@ func NewDummyJob() *Job { self.Log("called!") return nil }, + id: generateJobId(), + state: Created, } } diff --git a/ulocollect/core/fs_job.go b/ulocollect/core/fs_job.go index 8aa8d77..f29cc37 100644 --- a/ulocollect/core/fs_job.go +++ b/ulocollect/core/fs_job.go @@ -14,6 +14,7 @@ func NewFileSystemJob(path string) *Job { return &Job{ collectfunc: makeFsCollectFunc(path), id: generateJobId(), + state: Created, } } @@ -28,7 +29,7 @@ func makeFsCollectFunc(path string) func(*Job, *Collector) error { } log.Println(path) - return importLocalFile(c.im, path) + return importLocalFile(c.Destination, path) }) } } diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go index 73c9ac2..bfa0824 100644 --- a/ulocollect/core/job.go +++ b/ulocollect/core/job.go @@ -16,6 +16,10 @@ type Job struct { // or in failure. collectfunc func(self *Job, c *Collector) error + // Waiting room for people to wait on completion of this + // job. + wr waitingroom + // Global lock for all following members. mu sync.Mutex @@ -90,3 +94,9 @@ func (bj *Job) Logf(format string, v ...interface{}) { func (bj *Job) Logs() io.Reader { panic("not yet implemented") } + +// Wait for this job to complete. If this job was never submitted +// before a call to Wait, this call to Wait will never return. +func (bj *Job) Wait() { + bj.wr.Wait() +} diff --git a/ulocollect/core/waiting_room.go b/ulocollect/core/waiting_room.go new file mode 100644 index 0000000..678525c --- /dev/null +++ b/ulocollect/core/waiting_room.go @@ -0,0 +1,43 @@ +package core + +import "sync" + +type waitingroom struct { + initer sync.Once + cond *sync.Cond + released bool +} + +// Wait for anyone to call Release. +func (wr *waitingroom) Wait() { + wr.initialize() + + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + + for !wr.released { + wr.cond.Wait() + } +} + +// Release all goroutines currently stuck in Wait. Also +// results in all later calls to Wait to simply not wait +// at all. +func (wr *waitingroom) Release() { + wr.initialize() + + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + + wr.released = true + + wr.cond.Broadcast() +} + +// Initialize if necessary. +func (wr *waitingroom) initialize() { + wr.initer.Do(func() { + wr.cond = sync.NewCond(&sync.Mutex{}) + wr.released = false + }) +} diff --git a/ulocollect/main.go b/ulocollect/main.go index 61a6a59..6dc1fa3 100644 --- a/ulocollect/main.go +++ b/ulocollect/main.go @@ -4,21 +4,21 @@ import ( "fmt" "gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core" "log" - "time" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) - j := core.NewFileSystemJob("/etc") - c := core.Collector{} + i := core.DummyImporter{} + j := core.NewFileSystemJob("/mnt/ramdisk") + c := core.Collector{ + Destination: i, + } if err := c.Submit(j); err != nil { fmt.Println(err) } fmt.Println("/done") - - // TODO: make jobs awaitable - time.Sleep(5 * time.Second) + j.Wait() } -- GitLab