Skip to content
Snippets Groups Projects
Commit 749e35a6 authored by Andreas Schärtl's avatar Andreas Schärtl
Browse files

ulocollect: update core s.t. it runs jobs

We now run jobs w/o limits. This is probably bad in a hosted setup but
we can worry about that later should it ever become a problem. As it
stands the implementation is easier with a simple goroutine rather than
a complicated pool of threads.
parent 8be03b53
No related branches found
No related tags found
No related merge requests found
package core
import (
"fmt"
"github.com/pkg/errors"
"log"
"io"
"bytes"
"sync"
)
type basejob struct {
// Function to call on Collect
collectfunc func(c *Collector) error
// Global lock for all following members.
mu sync.Mutex
// Unique identifier of this job.
id JobId
// The state the job is currently in.
state JobState
// When state is set to Failed, contains the reason
// for this failure.
failure error
// Recorded logs for this job.
logged bytes.Buffer
}
func (bj *basejob) Id() JobId {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.id
}
func (bj *basejob) State() JobState {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.state
}
func (bj *basejob) Error() error {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.failure
}
func (bj *basejob) Collect(c *Collector) error {
return errors.New("not implemented")
}
func (bj *basejob) Log(v ...interface{}) {
log.Output(3, fmt.Sprint(v...))
}
func (bj *basejob) Logf(format string, v ...interface{}) {
log.Output(3, fmt.Sprintf(format, v...))
}
func (bj *basejob) Logs() io.Reader {
panic("not yet implemented")
}
......@@ -2,7 +2,6 @@ package core
import (
"fmt"
"sync"
)
// A Collector schedules the execution of various Jobs.
......@@ -10,63 +9,18 @@ type Collector struct {
// The Importer we forward the ULO/RDF bytes to.
im Importer
// Global lock. Aquire it before changing any state
// in Collector.
mu sync.Mutex
// Whether this Collector is currently processing request,
// i.e. whether Collector.loop is running. Aquire mu
// before accessing this member.
looping bool
// Collection of all ever sumbmitted jobs. Aquire
// mu before accessing this collection.
jobs map[JobId]Job
// Here we keep duplicate references to the submitted jobs.
// We need to keep some kind of order and also want to be
// able to report on the current state of all queues. As
// such we have to use lists and cannot use channels here.
// List of submitted, but not currently running, jobs.
// Aquire mu before accessing this collection.
submitted []Job
// List of active jobs. Aquire mu before accessing this
// collection.
active []Job
// List of completed, i.e. successful or failed, jobs.
// Aquire mu before accessing this collection.
completed []Job
jobs JobMap
}
func (c *Collector) Submit(j Job) error {
c.mu.Lock()
defer c.mu.Unlock()
// check whether j was already submitted previously
func (c *Collector) Submit(j *Job) error {
id := j.Id()
if _, ok := c.jobs[id]; ok {
return fmt.Errorf("id=%v already submitted", id)
}
// it's a new job, enqueue it
c.jobs[id] = j
c.submitted = append(c.submitted, j)
// if the collector isn't running, start it
if !c.looping {
go c.loop()
c.looping = true
if _, loaded := c.jobs.LoadOrStore(id, j); loaded {
return fmt.Errorf("Id=%v already submitted", id)
}
go j.Collect(c)
return nil
}
func (c *Collector) loop() {
}
......@@ -9,26 +9,46 @@ import (
// thread-safe methods.
type concurrentbuffer struct {
buf bytes.Buffer
mu sync.Mutex
mu sync.RWMutex
}
// Implement io.Reader.
func (b *concurrentbuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
b.mu.Unlock()
defer b.mu.Unlock()
return b.buf.Read(p)
}
// Implement io.Writer.
func (b *concurrentbuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
b.mu.Unlock()
defer b.mu.Unlock()
return b.buf.Write(p)
}
func (b *concurrentbuffer) String() string {
// Truncate the buffer to 0 elements.
func (b *concurrentbuffer) Reset() {
b.mu.Lock()
b.mu.Unlock()
defer b.mu.Unlock()
b.buf.Reset()
}
// Return a deep copy of the underlying bytes.
func (b *concurrentbuffer) Bytes() []byte {
b.mu.RLock()
defer b.mu.RUnlock()
src := b.buf.Bytes()
return append([]byte(nil), src...)
}
// Return a printable version of the buffer.
func (b *concurrentbuffer) String() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.buf.String()
}
package core
// Return a new dummy job. Running it does nothing.
func NewDummyJob() *Job {
return &Job{
collectfunc: func(self *Job, c *Collector) error {
self.Log("called!")
return nil
},
}
}
package core
import "io"
// A single collection job. Defines as an interface as there are all
// kinds of different collection jobs as the respective source
// repository is different.
type Job interface {
// Return the unique identifier of this job.
Id() JobId
// Return the current state of this job.
State() JobState
// When State is Failed, return the error that caused
// the failure. When State is not failed, this method
// always returns nil.
Error() error
// This is a synchronous operation that might take a long
// time. Instead of invoking Collect directly, you should
// create a Collector and Submit this job to it.
Collect(c *Collector) error
// Write to the log associated with this Job. The log message
// will be output with the standard logger as well as recorded
// to the buffer accessible with Job.Logs().
Log(v ...interface{})
// Write a formatted string to the log associated with this
// Job. The log message will be output with the standard
// logger as well as recorded to the buffer accessible with
// Job.Logs().
Logf(format string, v ...interface{})
// Return the log stream of this job.
Logs() io.Reader
import (
"fmt"
"github.com/pkg/errors"
"io"
"log"
"sync"
)
type Job struct {
// Function to call on Collect. This is the function
// that will (1) update the state of this job, (2)
// do the actual importing, (3) write to the Job-internal
// log and eventually (4) finish either with success
// or in failure.
collectfunc func(self *Job, c *Collector) error
// Global lock for all following members.
mu sync.Mutex
// Unique identifier of this job.
id JobId
// The state the job is currently in.
state JobState
// When state is set to Failed, contains the reason
// for this failure.
failure error
// Recorded logs for this job.
logged concurrentbuffer
}
// Return the unique identifier of this job.
func (bj *Job) Id() JobId {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.id
}
// Return the current state of this job.
func (bj *Job) State() JobState {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.state
}
// When State is Failed, return the error that caused
// the failure. When State is not failed, this method
// always returns nil.
func (bj *Job) Error() error {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.failure
}
// This is a synchronous operation that might take a long
// time. Instead of invoking Collect directly, you should
// create a Collector and Submit this job to it.
func (bj *Job) Collect(c *Collector) error {
if bj.collectfunc == nil {
return errors.New("collectfunc not set on Job")
}
bj.collectfunc(bj, c)
return nil
}
// Write to the log associated with this Job. The log message
// will be output with the standard logger as well as recorded
// to the buffer accessible with Job.Logs().
func (bj *Job) Log(v ...interface{}) {
log.Output(2, fmt.Sprint(v...))
}
// Write a formatted string to the log associated with this
// Job. The log message will be output with the standard
// logger as well as recorded to the buffer accessible with
// Job.Logs().
func (bj *Job) Logf(format string, v ...interface{}) {
log.Output(2, fmt.Sprintf(format, v...))
}
// Return the log stream of this job.
func (bj *Job) Logs() io.Reader {
panic("not yet implemented")
}
package core
import "sync"
// A wrapper around sync.Map for *Job.
type JobMap struct {
wrapped sync.Map
}
func (m *JobMap) Store(key JobId, value *Job) {
m.wrapped.Store(key, value)
}
func (m *JobMap) Load(key JobId) (value *Job, ok bool) {
if value, ok := m.wrapped.Load(key); !ok {
return nil, false
} else {
return value.(*Job), true
}
}
func (m *JobMap) LoadOrStore(key JobId, value *Job) (actual *Job, loaded bool) {
if iactual, iloaded := m.wrapped.LoadOrStore(key, value); iactual != nil {
return iactual.(*Job), iloaded
} else {
return nil, iloaded
}
}
func (m *JobMap) Range(f func(key JobId, value *Job) bool) {
m.wrapped.Range(func(ikey, ivalue interface{}) bool {
return f(ikey.(JobId), ivalue.(*Job))
})
}
module gl.kwarc.info/supervision/schaertl_andreas/uoimport
module gl.kwarc.info/supervision/schaertl_andreas/ulocollect
go 1.13
......
package main
import (
"fmt"
"gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core"
"log"
"time"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
j := core.NewDummyJob()
c := core.Collector{}
if err := c.Submit(j); err != nil {
fmt.Println(err)
}
fmt.Println("/done")
time.Sleep(5 * time.Second)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment