From 8be03b530da3ba60e488cc8e3a9aebf587de15ac Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Andreas=20Sch=C3=A4rtl?= <andreas@schaertl.me>
Date: Mon, 11 May 2020 12:10:42 +0200
Subject: [PATCH] ulocollect: Sketching up core interfaces

- Not perfect yet. I think Job should be a struct so
  Collector can modify its members easily.
---
 ulocollect/core/base_job.go          | 68 ++++++++++++++++++++++++++
 ulocollect/core/collector.go         | 72 ++++++++++++++++++++++++++++
 ulocollect/core/concurrent_buffer.go | 34 +++++++++++++
 ulocollect/core/importer.go          | 12 +++++
 ulocollect/core/job.go               | 38 +++++++++++++++
 ulocollect/core/job_id.go            | 19 ++++++++
 ulocollect/core/job_state.go         | 27 +++++++++++
 ulocollect/core/package.go           |  3 ++
 ulocollect/go.mod                    |  6 +++
 ulocollect/go.sum                    |  6 +++
 10 files changed, 285 insertions(+)
 create mode 100644 ulocollect/core/base_job.go
 create mode 100644 ulocollect/core/collector.go
 create mode 100644 ulocollect/core/concurrent_buffer.go
 create mode 100644 ulocollect/core/importer.go
 create mode 100644 ulocollect/core/job.go
 create mode 100644 ulocollect/core/job_id.go
 create mode 100644 ulocollect/core/job_state.go
 create mode 100644 ulocollect/core/package.go
 create mode 100644 ulocollect/go.sum

diff --git a/ulocollect/core/base_job.go b/ulocollect/core/base_job.go
new file mode 100644
index 0000000..17a67e3
--- /dev/null
+++ b/ulocollect/core/base_job.go
@@ -0,0 +1,68 @@
+package core
+
+import (
+	"fmt"
+	"github.com/pkg/errors"
+	"log"
+	"io"
+	"bytes"
+	"sync"
+)
+
+type basejob struct {
+	// Function to call on Collect
+	collectfunc func(c *Collector) error
+
+	// Global lock for all following members.
+	mu sync.Mutex
+
+	// Unique identifier of this job.
+	id JobId
+
+	// The state the job is currently in.
+	state JobState
+
+	// When state is set to Failed, contains the reason
+	// for this failure.
+	failure error
+
+	// Recorded logs for this job.
+	logged bytes.Buffer
+}
+
+func (bj *basejob) Id() JobId {
+	bj.mu.Lock()
+	defer bj.mu.Unlock()
+
+	return bj.id
+}
+
+func (bj *basejob) State() JobState {
+	bj.mu.Lock()
+	defer bj.mu.Unlock()
+
+	return bj.state
+}
+
+func (bj *basejob) Error() error {
+	bj.mu.Lock()
+	defer bj.mu.Unlock()
+
+	return bj.failure
+}
+
+func (bj *basejob) Collect(c *Collector) error {
+	return errors.New("not implemented")
+}
+
+func (bj *basejob) Log(v ...interface{}) {
+	log.Output(3, fmt.Sprint(v...))
+}
+
+func (bj *basejob) Logf(format string, v ...interface{}) {
+	log.Output(3, fmt.Sprintf(format, v...))
+}
+
+func (bj *basejob) Logs() io.Reader {
+	panic("not yet implemented")
+}
diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go
new file mode 100644
index 0000000..26f6b2f
--- /dev/null
+++ b/ulocollect/core/collector.go
@@ -0,0 +1,72 @@
+package core
+
+import (
+	"fmt"
+	"sync"
+)
+
+// A Collector schedules the execution of various Jobs.
+type Collector struct {
+	// The Importer we forward the ULO/RDF bytes to.
+	im Importer
+
+	// Global lock. Aquire it before changing any state
+	// in Collector.
+	mu sync.Mutex
+
+	// Whether this Collector is currently processing request,
+	// i.e. whether Collector.loop is running. Aquire mu
+	// before accessing this member.
+	looping bool
+
+	// Collection of all ever sumbmitted jobs. Aquire
+	// mu before accessing this collection.
+	jobs map[JobId]Job
+
+	// Here we keep duplicate references to the submitted jobs.
+	// We need to keep some kind of order and also want to be
+	// able to report on the current state of all queues. As
+	// such we have to use lists and cannot use channels here.
+
+	// List of submitted, but not currently running, jobs.
+	// Aquire mu before accessing this collection.
+	submitted []Job
+
+	// List of active jobs. Aquire mu before accessing this
+	// collection.
+	active []Job
+
+	// List of completed, i.e. successful or failed, jobs.
+	// Aquire mu before accessing this collection.
+	completed []Job
+}
+
+func (c *Collector) Submit(j Job) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	// check whether j was already submitted previously
+
+	id := j.Id()
+
+	if _, ok := c.jobs[id]; ok {
+		return fmt.Errorf("id=%v already submitted", id)
+	}
+
+	// it's a new job, enqueue it
+
+	c.jobs[id] = j
+	c.submitted = append(c.submitted, j)
+
+	// if the collector isn't running, start it
+
+	if !c.looping {
+		go c.loop()
+		c.looping = true
+	}
+
+	return nil
+}
+
+func (c *Collector) loop() {
+}
diff --git a/ulocollect/core/concurrent_buffer.go b/ulocollect/core/concurrent_buffer.go
new file mode 100644
index 0000000..1fb3b2f
--- /dev/null
+++ b/ulocollect/core/concurrent_buffer.go
@@ -0,0 +1,34 @@
+package core
+
+import (
+	"bytes"
+	"sync"
+)
+
+// Implement something very much like bytes.Buffer, but with
+// thread-safe methods.
+type concurrentbuffer struct {
+	buf bytes.Buffer
+	mu sync.Mutex
+}
+
+func (b *concurrentbuffer) Read(p []byte) (n int, err error) {
+	b.mu.Lock()
+	b.mu.Unlock()
+
+	return b.buf.Read(p)
+}
+
+func (b *concurrentbuffer) Write(p []byte) (n int, err error) {
+	b.mu.Lock()
+	b.mu.Unlock()
+
+	return b.buf.Write(p)
+}
+
+func (b *concurrentbuffer) String() string {
+	b.mu.Lock()
+	b.mu.Unlock()
+
+	return b.buf.String()
+}
diff --git a/ulocollect/core/importer.go b/ulocollect/core/importer.go
new file mode 100644
index 0000000..ab13724
--- /dev/null
+++ b/ulocollect/core/importer.go
@@ -0,0 +1,12 @@
+package core
+
+import (
+	"io"
+)
+
+// Represents an Importer either locally or somewhere on the
+// network.
+type Importer interface {
+	// Import the given ULO/RDF byte stream.
+	Import(rdf io.Reader) error
+}
diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go
new file mode 100644
index 0000000..d321433
--- /dev/null
+++ b/ulocollect/core/job.go
@@ -0,0 +1,38 @@
+package core
+
+import "io"
+
+// A single collection job. Defines as an interface as there are all
+// kinds of different collection jobs as the respective source
+// repository is different.
+type Job interface {
+	// Return the unique identifier of this job.
+	Id() JobId
+
+	// Return the current state of this job.
+	State() JobState
+
+	// When State is Failed, return the error that caused
+	// the failure. When State is not failed, this method
+	// always returns nil.
+	Error() error
+
+	// This is a synchronous operation that might take a long
+	// time. Instead of invoking Collect directly, you should
+	// create a Collector and Submit this job to it.
+	Collect(c *Collector) error
+
+	// Write to the log associated with this Job. The log message
+	// will be output with the standard logger as well as recorded
+	// to the buffer accessible with Job.Logs().
+	Log(v ...interface{})
+
+	// Write a formatted string to the log associated with this
+	// Job. The log message will be output with the standard
+	// logger as well as recorded to the buffer accessible with
+	// Job.Logs().
+	Logf(format string, v ...interface{})
+
+	// Return the log stream of this job.
+	Logs() io.Reader
+}
diff --git a/ulocollect/core/job_id.go b/ulocollect/core/job_id.go
new file mode 100644
index 0000000..05cfac5
--- /dev/null
+++ b/ulocollect/core/job_id.go
@@ -0,0 +1,19 @@
+package core
+
+import "github.com/google/uuid"
+
+// Unique identifier for a Job.
+//
+// Hint: It's just a UUID.
+type JobId string
+
+// Implement the Stringer interface.
+func (id JobId) String() string {
+	return string(id)
+}
+
+// Generate a new unique job id.
+func generateJobId() JobId {
+	id := uuid.New().String()
+	return JobId(id)
+}
diff --git a/ulocollect/core/job_state.go b/ulocollect/core/job_state.go
new file mode 100644
index 0000000..e22a8bf
--- /dev/null
+++ b/ulocollect/core/job_state.go
@@ -0,0 +1,27 @@
+package core
+
+// Enum for the different kinds of states a job can be in.
+type JobState string
+
+const (
+	// The job has been created but was not yet submitted
+	// to a Collector.
+	Created JobState = "created"
+
+	// The job is currently queued for processing but was
+	// not yet started.
+	Submitted = "submitted"
+
+	// The job is currently running and should be making
+	// progress.
+	Active = "active"
+
+	// The job has successfully finished. No error occured
+	// during processing.
+	Successful = "successful"
+
+	// The job finished, but during execution errors occured.
+	// A failed job should not have made any committed changes
+	// byte the Importer.
+	Failed = "failed"
+)
diff --git a/ulocollect/core/package.go b/ulocollect/core/package.go
new file mode 100644
index 0000000..9b3ebdf
--- /dev/null
+++ b/ulocollect/core/package.go
@@ -0,0 +1,3 @@
+// This package contains the core of the ULO/RDF Collector
+// as a Go library.
+package core
diff --git a/ulocollect/go.mod b/ulocollect/go.mod
index 58a3793..0981a5f 100644
--- a/ulocollect/go.mod
+++ b/ulocollect/go.mod
@@ -1,3 +1,9 @@
 module gl.kwarc.info/supervision/schaertl_andreas/uoimport
 
 go 1.13
+
+require (
+	github.com/google/uuid v1.1.1
+	github.com/kissen/stringset v1.0.0
+	github.com/pkg/errors v0.9.1
+)
diff --git a/ulocollect/go.sum b/ulocollect/go.sum
new file mode 100644
index 0000000..ab7de5c
--- /dev/null
+++ b/ulocollect/go.sum
@@ -0,0 +1,6 @@
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/kissen/stringset v1.0.0 h1:HyLlCU/U+XHSJpmVKjhLz4PWdFALhvJfFbRSlwZsJcA=
+github.com/kissen/stringset v1.0.0/go.mod h1:Xsqah6oXc+ZO4GZgFblCNzHn6Pt6Z/wYUCnZfwXxeA0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-- 
GitLab