Skip to content
Snippets Groups Projects
Commit d219537f authored by Andreas Schärtl's avatar Andreas Schärtl
Browse files

ulocollect: refactor job and scheduler classes

- Name interfaces in the canonical Golang way.

- Skip the complicated logging madness for now. I do want
  dedicated logging for each job, but not right now...
parent a9bb6868
No related branches found
No related tags found
No related merge requests found
package core
// A Collecter is something that can Collect ULO/RDF
// data from some source and pass it to an Importer.
type Collecter interface {
// Find ULO/RDF data and pass it to dest.
Collect(dest Importer) error
}
package core
import (
"fmt"
)
// A Collector schedules the execution of various Jobs.
type Collector struct {
// The Importer we forward the ULO/RDF bytes to.
Destination Importer
// Collection of all ever sumbmitted jobs. Aquire
// mu before accessing this collection.
jobs JobMap
}
// Submit j for execution.
func (c *Collector) Submit(j *Job) error {
id := j.Id()
if _, loaded := c.jobs.LoadOrStore(id, j); loaded {
return fmt.Errorf("Id=%v already submitted", id)
}
j.mu.Lock()
defer j.mu.Unlock()
go c.execute(j)
return nil
}
func (c *Collector) execute(j *Job) {
// Aquire lock as we need to set the status initially.
j.mu.Lock()
// Set state to submitted. Later we'll want to do some kind
// of scheduling at which point a job can spend a lot of time
// in this state. But in the current version we don't do any
// clever scheduling and as such the job enters the Active
// state right away.
j.state = Submitted
// Yep, we are starting already!
j.state = Active
// Now we give up the lock as we run the actual payload.
j.mu.Unlock()
// Run the actual payload.
err := j.Collect(c)
// Set completion state.
j.mu.Lock()
if err != nil {
j.state = Failed
j.failure = err
} else {
j.state = Successful
}
j.wr.Release()
// We are done mutating the Job.
j.mu.Unlock()
}
package core
import (
"bytes"
"sync"
)
// Implement something very much like bytes.Buffer, but with
// thread-safe methods.
type concurrentbuffer struct {
buf bytes.Buffer
mu sync.RWMutex
}
// Implement io.Reader.
func (b *concurrentbuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Read(p)
}
// Implement io.Writer.
func (b *concurrentbuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}
// Truncate the buffer to 0 elements.
func (b *concurrentbuffer) Reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.buf.Reset()
}
// Return a deep copy of the underlying bytes.
func (b *concurrentbuffer) Bytes() []byte {
b.mu.RLock()
defer b.mu.RUnlock()
src := b.buf.Bytes()
return append([]byte(nil), src...)
}
// Return a printable version of the buffer.
func (b *concurrentbuffer) String() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.buf.String()
}
package core
// Return a new dummy job. Running it does nothing.
func NewDummyJob() *Job {
return &Job{
collectfunc: func(self *Job, c *Collector) error {
self.Log("called!")
return nil
},
id: generateJobId(),
state: Created,
}
}
package core
import (
"bufio"
"github.com/pkg/errors"
"log"
"os"
"path/filepath"
"strings"
)
// A Collecter that crawls the local file system at Path
// for ULO/RDF files.
type FileSystemCollecter struct {
Path string
}
func (fsc *FileSystemCollecter) Collect(dest Importer) error {
return filepath.Walk(fsc.Path, func(path string, f os.FileInfo, err error) error {
return fsc.visit(dest, path, f, err)
})
}
// This method is called on every file found by Collect.
func (fsc *FileSystemCollecter) visit(dest Importer, path string, f os.FileInfo, err error) error {
if err != nil {
log.Print(err)
return nil
}
if f.IsDir() {
return nil
}
if strings.HasSuffix(path, ".rdf") {
return fsc.handleRdfFile(dest, path, f)
}
log.Printf("ignoring path=%v", path)
return nil
}
// Import ULO/RDF file at path with im.
func (fsc *FileSystemCollecter) handleRdfFile(im Importer, path string, f os.FileInfo) error {
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
reader := bufio.NewReader(fd)
if err := im.Import(reader); err != nil {
return errors.Wrapf(err, "importing path=%v failed", path)
}
return nil
}
package core
import (
"bufio"
"log"
"os"
"path/filepath"
"strings"
)
// Return a new Job that will scan path recursively for ULO/RDF files
// and pass them to the executing Importer.
func NewFileSystemJob(path string) *Job {
return &Job{
collectfunc: makeFsCollectFunc(path),
id: generateJobId(),
state: Created,
}
}
// Function that is called when collecting ULO/RDF from the local
// file system.
func makeFsCollectFunc(path string) func(*Job, *Collector) error {
return func(j *Job, c *Collector) error {
return filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
if err != nil {
j.Logf("error walking path=%v: %v", path, err)
return nil
}
log.Println(path)
return importLocalFile(c.Destination, path)
})
}
}
// Import file at path with im. Returns no error if the file at
// path is a not supported type.
func importLocalFile(im Importer, path string) error {
if strings.HasSuffix(path, ".rdf") {
return importLocalRdfFile(im, path)
}
return nil
}
// Import ULO/RDF file at path with im.
func importLocalRdfFile(im Importer, path string) error {
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
r := bufio.NewReader(fd)
return im.Import(r)
}
......@@ -2,9 +2,8 @@ package core
import "io"
// Represents an Importer either locally or somewhere on the
// network.
// An Importer is something that can import ULO/RDF data
// into some kind of storage, database or processing step.
type Importer interface {
// Import the given ULO/RDF byte stream.
Import(rdf io.Reader) error
}
package core
import (
"fmt"
"github.com/pkg/errors"
"io"
"log"
"sync"
)
type Job struct {
// Function to call on Collect. This is the function
// that will (1) update the state of this job, (2)
// do the actual importing, (3) write to the Job-internal
// log and eventually (4) finish either with success
// or in failure.
collectfunc func(self *Job, c *Collector) error
// Waiting room for people to wait on completion of this
// job.
wr waitingroom
// Global lock for all following members.
mu sync.Mutex
// Unique identifier of this job.
id JobId
// The state the job is currently in.
state JobState
// When state is set to Failed, contains the reason
// for this failure.
failure error
// Recorded logs for this job.
logged concurrentbuffer
}
// Return the unique identifier of this job.
func (bj *Job) Id() JobId {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.id
}
// Return the current state of this job.
func (bj *Job) State() JobState {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.state
}
// When State is Failed, return the error that caused
// the failure. When State is not failed, this method
// always returns nil.
func (bj *Job) Error() error {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.failure
}
// This is a synchronous operation that might take a long
// time. Instead of invoking Collect directly, you should
// create a Collector and Submit this job to it.
func (bj *Job) Collect(c *Collector) error {
if bj.collectfunc == nil {
return errors.New("collectfunc not set on Job")
}
bj.collectfunc(bj, c)
return nil
}
// Write to the log associated with this Job. The log message
// will be output with the standard logger as well as recorded
// to the buffer accessible with Job.Logs().
func (bj *Job) Log(v ...interface{}) {
log.Output(2, fmt.Sprint(v...))
}
// Write a formatted string to the log associated with this
// Job. The log message will be output with the standard
// logger as well as recorded to the buffer accessible with
// Job.Logs().
func (bj *Job) Logf(format string, v ...interface{}) {
log.Output(2, fmt.Sprintf(format, v...))
}
// Return the log stream of this job.
func (bj *Job) Logs() io.Reader {
panic("not yet implemented")
}
// Wait for this job to complete. If this job was never submitted
// before a call to Wait, this call to Wait will never return.
func (bj *Job) Wait() {
bj.wr.Wait()
}
package core
// Contains information about a submitted job. Each JobInfo
// is only a snapshot of a given state.
type JobInfo struct {
// The unique identifier of this job.
Id JobId
// The state the job was in when this JobInfo was
// created.
State JobState
// In case of State == Failed, contains the cause of
// the failure.
Error error
}
package core
import "sync"
// A wrapper around sync.Map for *Job.
type JobMap struct {
wrapped sync.Map
}
func (m *JobMap) Store(key JobId, value *Job) {
m.wrapped.Store(key, value)
}
func (m *JobMap) Load(key JobId) (value *Job, ok bool) {
if value, ok := m.wrapped.Load(key); !ok {
return nil, false
} else {
return value.(*Job), true
}
}
func (m *JobMap) LoadOrStore(key JobId, value *Job) (actual *Job, loaded bool) {
if iactual, iloaded := m.wrapped.LoadOrStore(key, value); iactual != nil {
return iactual.(*Job), iloaded
} else {
return nil, iloaded
}
}
func (m *JobMap) Range(f func(key JobId, value *Job) bool) {
m.wrapped.Range(func(ikey, ivalue interface{}) bool {
return f(ikey.(JobId), ivalue.(*Job))
})
}
package core
import (
"fmt"
"sync"
)
// This struct implements Scheduler. All actions are run
// locally and jobs only stored in memory. Only one submitted
// job is handled at any given time.
type LocalScheduler struct {
// Destination for all collection jobs.
Dest Importer
// Once for initialization.
on sync.Once
// History of all jobs ever submitted to this
// scheduler.
jobs sync.Map // JobId -> *localjob
// Queue of jobs to execute. LocalScheduler only runs
// one task at a given time.
todo chan *localjob
}
type localjob struct {
// The actual payload of the job.
fun Collecter
// Synchronization aid for WaitFor.
wr waitingroom
// Mutex that protects all following members of the struct.
mu sync.Mutex
// Unique id of the job as assigned by the LocalScheduler.
id JobId
// State of this job.
state JobState
// Error message when state = Failure. Otherwise nil.
failure error
}
func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) {
ls.ensureInitialized()
job := &localjob{
fun: fun,
id: generateJobId(),
state: Created,
}
if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded {
// this should never happen, well, stochastically speaking
return "", fmt.Errorf("generated id=%v not unique", job.id)
}
ls.todo <- job
return job.id, nil
}
func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return JobInfo{}, fmt.Errorf("not entry for id=%v", id)
} else {
lj := value.(*localjob)
lj.mu.Lock()
defer lj.mu.Unlock()
return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil
}
}
func (ls *LocalScheduler) WaitFor(id JobId) error {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return fmt.Errorf("not entry for id=%v", id)
} else {
value.(*localjob).wr.Wait()
return nil
}
}
func (ls *LocalScheduler) loop() {
for {
j, ok := <-ls.todo
if !ok {
return // done handling jobs
}
ls.execute(j)
}
}
func (ls *LocalScheduler) execute(j *localjob) {
// set state to Active
j.mu.Lock()
j.state = Active
j.mu.Unlock()
// run the payload
err := j.fun.Collect(ls.Dest)
// update state according to return
j.mu.Lock()
if err == nil {
j.state = Successful
} else {
j.state = Failed
j.failure = err
}
j.mu.Unlock()
// release all routines waiting for this job to finish
j.wr.Release()
}
func (ls *LocalScheduler) ensureInitialized() {
ls.on.Do(func() {
// initialize complicated struct members
ls.todo = make(chan *localjob)
// run concurrent tasks
go ls.loop()
})
}
package core
type Scheduler interface {
// Submit job for execution with this scheduler. Only returns
// an error when enqueuing failed. On success, returns a unique
// id for use with other methods of this scheduler.
Submit(job Collecter) (JobId, error)
// Return a JobInfo for the job with given id.
Info(id JobId) (JobInfo, error)
// Wait for completion of job with given id. Returns an error
// if the id is unknown.
WaitFor(id JobId) error
}
......@@ -2,6 +2,8 @@ package core
import "sync"
// Synchronization aid that lets goroutines wait for some
// job to finish. Similar to sync.Once.
type waitingroom struct {
initer sync.Once
cond *sync.Cond
......@@ -30,7 +32,6 @@ func (wr *waitingroom) Release() {
defer wr.cond.L.Unlock()
wr.released = true
wr.cond.Broadcast()
}
......
package main
import (
"fmt"
"gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core"
"log"
)
......@@ -9,16 +8,22 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
i := core.DummyImporter{}
j := core.NewFileSystemJob("/mnt/ramdisk")
c := core.Collector{
Destination: i,
s := core.LocalScheduler{
Dest: core.DummyImporter{},
}
if err := c.Submit(j); err != nil {
fmt.Println(err)
c := core.FileSystemCollecter{
Path: "/home/ats/Synchronized/Dokumente",
}
fmt.Println("/done")
j.Wait()
id, err := s.Submit(&c)
if err != nil {
log.Fatal(err)
}
log.Printf("submitted with id=%v", id)
if err := s.WaitFor(id); err != nil {
log.Fatal(err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment