Skip to content
Snippets Groups Projects
Commit 8be03b53 authored by Andreas Schärtl's avatar Andreas Schärtl
Browse files

ulocollect: Sketching up core interfaces

- Not perfect yet. I think Job should be a struct so
  Collector can modify its members easily.
parent 79fb0cf3
No related branches found
No related tags found
No related merge requests found
package core
import (
"fmt"
"github.com/pkg/errors"
"log"
"io"
"bytes"
"sync"
)
type basejob struct {
// Function to call on Collect
collectfunc func(c *Collector) error
// Global lock for all following members.
mu sync.Mutex
// Unique identifier of this job.
id JobId
// The state the job is currently in.
state JobState
// When state is set to Failed, contains the reason
// for this failure.
failure error
// Recorded logs for this job.
logged bytes.Buffer
}
func (bj *basejob) Id() JobId {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.id
}
func (bj *basejob) State() JobState {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.state
}
func (bj *basejob) Error() error {
bj.mu.Lock()
defer bj.mu.Unlock()
return bj.failure
}
func (bj *basejob) Collect(c *Collector) error {
return errors.New("not implemented")
}
func (bj *basejob) Log(v ...interface{}) {
log.Output(3, fmt.Sprint(v...))
}
func (bj *basejob) Logf(format string, v ...interface{}) {
log.Output(3, fmt.Sprintf(format, v...))
}
func (bj *basejob) Logs() io.Reader {
panic("not yet implemented")
}
package core
import (
"fmt"
"sync"
)
// A Collector schedules the execution of various Jobs.
type Collector struct {
// The Importer we forward the ULO/RDF bytes to.
im Importer
// Global lock. Aquire it before changing any state
// in Collector.
mu sync.Mutex
// Whether this Collector is currently processing request,
// i.e. whether Collector.loop is running. Aquire mu
// before accessing this member.
looping bool
// Collection of all ever sumbmitted jobs. Aquire
// mu before accessing this collection.
jobs map[JobId]Job
// Here we keep duplicate references to the submitted jobs.
// We need to keep some kind of order and also want to be
// able to report on the current state of all queues. As
// such we have to use lists and cannot use channels here.
// List of submitted, but not currently running, jobs.
// Aquire mu before accessing this collection.
submitted []Job
// List of active jobs. Aquire mu before accessing this
// collection.
active []Job
// List of completed, i.e. successful or failed, jobs.
// Aquire mu before accessing this collection.
completed []Job
}
func (c *Collector) Submit(j Job) error {
c.mu.Lock()
defer c.mu.Unlock()
// check whether j was already submitted previously
id := j.Id()
if _, ok := c.jobs[id]; ok {
return fmt.Errorf("id=%v already submitted", id)
}
// it's a new job, enqueue it
c.jobs[id] = j
c.submitted = append(c.submitted, j)
// if the collector isn't running, start it
if !c.looping {
go c.loop()
c.looping = true
}
return nil
}
func (c *Collector) loop() {
}
package core
import (
"bytes"
"sync"
)
// Implement something very much like bytes.Buffer, but with
// thread-safe methods.
type concurrentbuffer struct {
buf bytes.Buffer
mu sync.Mutex
}
func (b *concurrentbuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
b.mu.Unlock()
return b.buf.Read(p)
}
func (b *concurrentbuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
b.mu.Unlock()
return b.buf.Write(p)
}
func (b *concurrentbuffer) String() string {
b.mu.Lock()
b.mu.Unlock()
return b.buf.String()
}
package core
import (
"io"
)
// Represents an Importer either locally or somewhere on the
// network.
type Importer interface {
// Import the given ULO/RDF byte stream.
Import(rdf io.Reader) error
}
package core
import "io"
// A single collection job. Defines as an interface as there are all
// kinds of different collection jobs as the respective source
// repository is different.
type Job interface {
// Return the unique identifier of this job.
Id() JobId
// Return the current state of this job.
State() JobState
// When State is Failed, return the error that caused
// the failure. When State is not failed, this method
// always returns nil.
Error() error
// This is a synchronous operation that might take a long
// time. Instead of invoking Collect directly, you should
// create a Collector and Submit this job to it.
Collect(c *Collector) error
// Write to the log associated with this Job. The log message
// will be output with the standard logger as well as recorded
// to the buffer accessible with Job.Logs().
Log(v ...interface{})
// Write a formatted string to the log associated with this
// Job. The log message will be output with the standard
// logger as well as recorded to the buffer accessible with
// Job.Logs().
Logf(format string, v ...interface{})
// Return the log stream of this job.
Logs() io.Reader
}
package core
import "github.com/google/uuid"
// Unique identifier for a Job.
//
// Hint: It's just a UUID.
type JobId string
// Implement the Stringer interface.
func (id JobId) String() string {
return string(id)
}
// Generate a new unique job id.
func generateJobId() JobId {
id := uuid.New().String()
return JobId(id)
}
package core
// Enum for the different kinds of states a job can be in.
type JobState string
const (
// The job has been created but was not yet submitted
// to a Collector.
Created JobState = "created"
// The job is currently queued for processing but was
// not yet started.
Submitted = "submitted"
// The job is currently running and should be making
// progress.
Active = "active"
// The job has successfully finished. No error occured
// during processing.
Successful = "successful"
// The job finished, but during execution errors occured.
// A failed job should not have made any committed changes
// byte the Importer.
Failed = "failed"
)
// This package contains the core of the ULO/RDF Collector
// as a Go library.
package core
module gl.kwarc.info/supervision/schaertl_andreas/uoimport
go 1.13
require (
github.com/google/uuid v1.1.1
github.com/kissen/stringset v1.0.0
github.com/pkg/errors v0.9.1
)
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kissen/stringset v1.0.0 h1:HyLlCU/U+XHSJpmVKjhLz4PWdFALhvJfFbRSlwZsJcA=
github.com/kissen/stringset v1.0.0/go.mod h1:Xsqah6oXc+ZO4GZgFblCNzHn6Pt6Z/wYUCnZfwXxeA0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment