Skip to content
Snippets Groups Projects
Commit a9bb6868 authored by Andreas Schärtl's avatar Andreas Schärtl
Browse files

ulocollect: wait for completion

- This now works just fine.

- Next up is (1) refactoring, (2) cleanup of the received ULO/RDF
  files and (3) better state information (for the web interface)
parent f682bd30
No related branches found
No related tags found
No related merge requests found
......@@ -7,13 +7,14 @@ import (
// A Collector schedules the execution of various Jobs.
type Collector struct {
// The Importer we forward the ULO/RDF bytes to.
im Importer
Destination Importer
// Collection of all ever sumbmitted jobs. Aquire
// mu before accessing this collection.
jobs JobMap
}
// Submit j for execution.
func (c *Collector) Submit(j *Job) error {
id := j.Id()
......@@ -21,6 +22,52 @@ func (c *Collector) Submit(j *Job) error {
return fmt.Errorf("Id=%v already submitted", id)
}
go j.Collect(c)
j.mu.Lock()
defer j.mu.Unlock()
go c.execute(j)
return nil
}
func (c *Collector) execute(j *Job) {
// Aquire lock as we need to set the status initially.
j.mu.Lock()
// Set state to submitted. Later we'll want to do some kind
// of scheduling at which point a job can spend a lot of time
// in this state. But in the current version we don't do any
// clever scheduling and as such the job enters the Active
// state right away.
j.state = Submitted
// Yep, we are starting already!
j.state = Active
// Now we give up the lock as we run the actual payload.
j.mu.Unlock()
// Run the actual payload.
err := j.Collect(c)
// Set completion state.
j.mu.Lock()
if err != nil {
j.state = Failed
j.failure = err
} else {
j.state = Successful
}
j.wr.Release()
// We are done mutating the Job.
j.mu.Unlock()
}
......@@ -7,5 +7,7 @@ func NewDummyJob() *Job {
self.Log("called!")
return nil
},
id: generateJobId(),
state: Created,
}
}
......@@ -14,6 +14,7 @@ func NewFileSystemJob(path string) *Job {
return &Job{
collectfunc: makeFsCollectFunc(path),
id: generateJobId(),
state: Created,
}
}
......@@ -28,7 +29,7 @@ func makeFsCollectFunc(path string) func(*Job, *Collector) error {
}
log.Println(path)
return importLocalFile(c.im, path)
return importLocalFile(c.Destination, path)
})
}
}
......
......@@ -16,6 +16,10 @@ type Job struct {
// or in failure.
collectfunc func(self *Job, c *Collector) error
// Waiting room for people to wait on completion of this
// job.
wr waitingroom
// Global lock for all following members.
mu sync.Mutex
......@@ -90,3 +94,9 @@ func (bj *Job) Logf(format string, v ...interface{}) {
func (bj *Job) Logs() io.Reader {
panic("not yet implemented")
}
// Wait for this job to complete. If this job was never submitted
// before a call to Wait, this call to Wait will never return.
func (bj *Job) Wait() {
bj.wr.Wait()
}
package core
import "sync"
type waitingroom struct {
initer sync.Once
cond *sync.Cond
released bool
}
// Wait for anyone to call Release.
func (wr *waitingroom) Wait() {
wr.initialize()
wr.cond.L.Lock()
defer wr.cond.L.Unlock()
for !wr.released {
wr.cond.Wait()
}
}
// Release all goroutines currently stuck in Wait. Also
// results in all later calls to Wait to simply not wait
// at all.
func (wr *waitingroom) Release() {
wr.initialize()
wr.cond.L.Lock()
defer wr.cond.L.Unlock()
wr.released = true
wr.cond.Broadcast()
}
// Initialize if necessary.
func (wr *waitingroom) initialize() {
wr.initer.Do(func() {
wr.cond = sync.NewCond(&sync.Mutex{})
wr.released = false
})
}
......@@ -4,21 +4,21 @@ import (
"fmt"
"gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core"
"log"
"time"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
j := core.NewFileSystemJob("/etc")
c := core.Collector{}
i := core.DummyImporter{}
j := core.NewFileSystemJob("/mnt/ramdisk")
c := core.Collector{
Destination: i,
}
if err := c.Submit(j); err != nil {
fmt.Println(err)
}
fmt.Println("/done")
// TODO: make jobs awaitable
time.Sleep(5 * time.Second)
j.Wait()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment