From 749e35a6f8c3672b0d55292ab6fe4daf50797c05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Sch=C3=A4rtl?= <andreas@schaertl.me> Date: Mon, 11 May 2020 13:34:58 +0200 Subject: [PATCH] ulocollect: update core s.t. it runs jobs We now run jobs w/o limits. This is probably bad in a hosted setup but we can worry about that later should it ever become a problem. As it stands the implementation is easier with a simple goroutine rather than a complicated pool of threads. --- ulocollect/core/base_job.go | 68 --------------- ulocollect/core/collector.go | 56 ++---------- ulocollect/core/concurrent_buffer.go | 30 +++++-- ulocollect/core/dummy_job.go | 11 +++ ulocollect/core/job.go | 124 +++++++++++++++++++-------- ulocollect/core/job_map.go | 34 ++++++++ ulocollect/go.mod | 2 +- ulocollect/main.go | 22 +++++ 8 files changed, 187 insertions(+), 160 deletions(-) delete mode 100644 ulocollect/core/base_job.go create mode 100644 ulocollect/core/dummy_job.go create mode 100644 ulocollect/core/job_map.go create mode 100644 ulocollect/main.go diff --git a/ulocollect/core/base_job.go b/ulocollect/core/base_job.go deleted file mode 100644 index 17a67e3..0000000 --- a/ulocollect/core/base_job.go +++ /dev/null @@ -1,68 +0,0 @@ -package core - -import ( - "fmt" - "github.com/pkg/errors" - "log" - "io" - "bytes" - "sync" -) - -type basejob struct { - // Function to call on Collect - collectfunc func(c *Collector) error - - // Global lock for all following members. - mu sync.Mutex - - // Unique identifier of this job. - id JobId - - // The state the job is currently in. - state JobState - - // When state is set to Failed, contains the reason - // for this failure. - failure error - - // Recorded logs for this job. - logged bytes.Buffer -} - -func (bj *basejob) Id() JobId { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.id -} - -func (bj *basejob) State() JobState { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.state -} - -func (bj *basejob) Error() error { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.failure -} - -func (bj *basejob) Collect(c *Collector) error { - return errors.New("not implemented") -} - -func (bj *basejob) Log(v ...interface{}) { - log.Output(3, fmt.Sprint(v...)) -} - -func (bj *basejob) Logf(format string, v ...interface{}) { - log.Output(3, fmt.Sprintf(format, v...)) -} - -func (bj *basejob) Logs() io.Reader { - panic("not yet implemented") -} diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go index 26f6b2f..48b9a39 100644 --- a/ulocollect/core/collector.go +++ b/ulocollect/core/collector.go @@ -2,7 +2,6 @@ package core import ( "fmt" - "sync" ) // A Collector schedules the execution of various Jobs. @@ -10,63 +9,18 @@ type Collector struct { // The Importer we forward the ULO/RDF bytes to. im Importer - // Global lock. Aquire it before changing any state - // in Collector. - mu sync.Mutex - - // Whether this Collector is currently processing request, - // i.e. whether Collector.loop is running. Aquire mu - // before accessing this member. - looping bool - // Collection of all ever sumbmitted jobs. Aquire // mu before accessing this collection. - jobs map[JobId]Job - - // Here we keep duplicate references to the submitted jobs. - // We need to keep some kind of order and also want to be - // able to report on the current state of all queues. As - // such we have to use lists and cannot use channels here. - - // List of submitted, but not currently running, jobs. - // Aquire mu before accessing this collection. - submitted []Job - - // List of active jobs. Aquire mu before accessing this - // collection. - active []Job - - // List of completed, i.e. successful or failed, jobs. - // Aquire mu before accessing this collection. - completed []Job + jobs JobMap } -func (c *Collector) Submit(j Job) error { - c.mu.Lock() - defer c.mu.Unlock() - - // check whether j was already submitted previously - +func (c *Collector) Submit(j *Job) error { id := j.Id() - if _, ok := c.jobs[id]; ok { - return fmt.Errorf("id=%v already submitted", id) - } - - // it's a new job, enqueue it - - c.jobs[id] = j - c.submitted = append(c.submitted, j) - - // if the collector isn't running, start it - - if !c.looping { - go c.loop() - c.looping = true + if _, loaded := c.jobs.LoadOrStore(id, j); loaded { + return fmt.Errorf("Id=%v already submitted", id) } + go j.Collect(c) return nil } - -func (c *Collector) loop() { -} diff --git a/ulocollect/core/concurrent_buffer.go b/ulocollect/core/concurrent_buffer.go index 1fb3b2f..4751aff 100644 --- a/ulocollect/core/concurrent_buffer.go +++ b/ulocollect/core/concurrent_buffer.go @@ -9,26 +9,46 @@ import ( // thread-safe methods. type concurrentbuffer struct { buf bytes.Buffer - mu sync.Mutex + mu sync.RWMutex } +// Implement io.Reader. func (b *concurrentbuffer) Read(p []byte) (n int, err error) { b.mu.Lock() - b.mu.Unlock() + defer b.mu.Unlock() return b.buf.Read(p) } +// Implement io.Writer. func (b *concurrentbuffer) Write(p []byte) (n int, err error) { b.mu.Lock() - b.mu.Unlock() + defer b.mu.Unlock() return b.buf.Write(p) } -func (b *concurrentbuffer) String() string { +// Truncate the buffer to 0 elements. +func (b *concurrentbuffer) Reset() { b.mu.Lock() - b.mu.Unlock() + defer b.mu.Unlock() + + b.buf.Reset() +} + +// Return a deep copy of the underlying bytes. +func (b *concurrentbuffer) Bytes() []byte { + b.mu.RLock() + defer b.mu.RUnlock() + + src := b.buf.Bytes() + return append([]byte(nil), src...) +} + +// Return a printable version of the buffer. +func (b *concurrentbuffer) String() string { + b.mu.RLock() + defer b.mu.RUnlock() return b.buf.String() } diff --git a/ulocollect/core/dummy_job.go b/ulocollect/core/dummy_job.go new file mode 100644 index 0000000..56bec64 --- /dev/null +++ b/ulocollect/core/dummy_job.go @@ -0,0 +1,11 @@ +package core + +// Return a new dummy job. Running it does nothing. +func NewDummyJob() *Job { + return &Job{ + collectfunc: func(self *Job, c *Collector) error { + self.Log("called!") + return nil + }, + } +} diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go index d321433..73c9ac2 100644 --- a/ulocollect/core/job.go +++ b/ulocollect/core/job.go @@ -1,38 +1,92 @@ package core -import "io" - -// A single collection job. Defines as an interface as there are all -// kinds of different collection jobs as the respective source -// repository is different. -type Job interface { - // Return the unique identifier of this job. - Id() JobId - - // Return the current state of this job. - State() JobState - - // When State is Failed, return the error that caused - // the failure. When State is not failed, this method - // always returns nil. - Error() error - - // This is a synchronous operation that might take a long - // time. Instead of invoking Collect directly, you should - // create a Collector and Submit this job to it. - Collect(c *Collector) error - - // Write to the log associated with this Job. The log message - // will be output with the standard logger as well as recorded - // to the buffer accessible with Job.Logs(). - Log(v ...interface{}) - - // Write a formatted string to the log associated with this - // Job. The log message will be output with the standard - // logger as well as recorded to the buffer accessible with - // Job.Logs(). - Logf(format string, v ...interface{}) - - // Return the log stream of this job. - Logs() io.Reader +import ( + "fmt" + "github.com/pkg/errors" + "io" + "log" + "sync" +) + +type Job struct { + // Function to call on Collect. This is the function + // that will (1) update the state of this job, (2) + // do the actual importing, (3) write to the Job-internal + // log and eventually (4) finish either with success + // or in failure. + collectfunc func(self *Job, c *Collector) error + + // Global lock for all following members. + mu sync.Mutex + + // Unique identifier of this job. + id JobId + + // The state the job is currently in. + state JobState + + // When state is set to Failed, contains the reason + // for this failure. + failure error + + // Recorded logs for this job. + logged concurrentbuffer +} + +// Return the unique identifier of this job. +func (bj *Job) Id() JobId { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.id +} + +// Return the current state of this job. +func (bj *Job) State() JobState { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.state +} + +// When State is Failed, return the error that caused +// the failure. When State is not failed, this method +// always returns nil. +func (bj *Job) Error() error { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.failure +} + +// This is a synchronous operation that might take a long +// time. Instead of invoking Collect directly, you should +// create a Collector and Submit this job to it. +func (bj *Job) Collect(c *Collector) error { + if bj.collectfunc == nil { + return errors.New("collectfunc not set on Job") + } + + bj.collectfunc(bj, c) + return nil +} + +// Write to the log associated with this Job. The log message +// will be output with the standard logger as well as recorded +// to the buffer accessible with Job.Logs(). +func (bj *Job) Log(v ...interface{}) { + log.Output(2, fmt.Sprint(v...)) +} + +// Write a formatted string to the log associated with this +// Job. The log message will be output with the standard +// logger as well as recorded to the buffer accessible with +// Job.Logs(). +func (bj *Job) Logf(format string, v ...interface{}) { + log.Output(2, fmt.Sprintf(format, v...)) +} + +// Return the log stream of this job. +func (bj *Job) Logs() io.Reader { + panic("not yet implemented") } diff --git a/ulocollect/core/job_map.go b/ulocollect/core/job_map.go new file mode 100644 index 0000000..732f8d1 --- /dev/null +++ b/ulocollect/core/job_map.go @@ -0,0 +1,34 @@ +package core + +import "sync" + +// A wrapper around sync.Map for *Job. +type JobMap struct { + wrapped sync.Map +} + +func (m *JobMap) Store(key JobId, value *Job) { + m.wrapped.Store(key, value) +} + +func (m *JobMap) Load(key JobId) (value *Job, ok bool) { + if value, ok := m.wrapped.Load(key); !ok { + return nil, false + } else { + return value.(*Job), true + } +} + +func (m *JobMap) LoadOrStore(key JobId, value *Job) (actual *Job, loaded bool) { + if iactual, iloaded := m.wrapped.LoadOrStore(key, value); iactual != nil { + return iactual.(*Job), iloaded + } else { + return nil, iloaded + } +} + +func (m *JobMap) Range(f func(key JobId, value *Job) bool) { + m.wrapped.Range(func(ikey, ivalue interface{}) bool { + return f(ikey.(JobId), ivalue.(*Job)) + }) +} diff --git a/ulocollect/go.mod b/ulocollect/go.mod index 0981a5f..c07cb77 100644 --- a/ulocollect/go.mod +++ b/ulocollect/go.mod @@ -1,4 +1,4 @@ -module gl.kwarc.info/supervision/schaertl_andreas/uoimport +module gl.kwarc.info/supervision/schaertl_andreas/ulocollect go 1.13 diff --git a/ulocollect/main.go b/ulocollect/main.go new file mode 100644 index 0000000..02cdaf0 --- /dev/null +++ b/ulocollect/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + "gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core" + "log" + "time" +) + +func main() { + log.SetFlags(log.LstdFlags | log.Lshortfile) + + j := core.NewDummyJob() + c := core.Collector{} + + if err := c.Submit(j); err != nil { + fmt.Println(err) + } + + fmt.Println("/done") + time.Sleep(5 * time.Second) +} -- GitLab