Commit 1ed30466 authored by Sietse Ringers's avatar Sietse Ringers

feat: subscribe for updates at revocation server using server sent events

parent a677c822
......@@ -44,12 +44,23 @@ func New(conf *server.Configuration) (*Server, error) {
client: make(map[string]*session),
conf: conf,
},
ServerSentEvents: sse.NewServer(&sse.Options{
handlers: make(map[string]server.SessionHandler),
}
if conf.EnableSSE {
s.ServerSentEvents = sse.NewServer(&sse.Options{
ChannelNameFunc: func(r *http.Request) string {
component, token, noun, _, err := Route(r.URL.Path, r.Method)
if err == nil && component == server.ComponentSession && noun == "statusevents" {
component, token, noun, args, err := Route(r.URL.Path, r.Method)
if err != nil {
_ = server.LogWarning(err)
return ""
}
if component == server.ComponentSession && noun == "statusevents" {
return "session/" + token
}
if component == server.ComponentRevocation && noun == "updateevents" {
return "revocation/" + args[0]
}
return ""
},
Headers: map[string]string{
......@@ -58,8 +69,8 @@ func New(conf *server.Configuration) (*Server, error) {
"Access-Control-Allow-Headers": "Keep-Alive,X-Requested-With,Cache-Control,Content-Type,Last-Event-ID",
},
Logger: log.New(conf.Logger.WithField("type", "sse").WriterLevel(logrus.DebugLevel), "", 0),
}),
handlers: make(map[string]server.SessionHandler),
})
s.Conf.IrmaConfiguration.Revocation.ServerSentEvents = s.ServerSentEvents
}
s.scheduler.Every(10).Seconds().Do(func() {
......@@ -169,7 +180,7 @@ func (s *Server) Revoke(credid irma.CredentialTypeIdentifier, key string, issued
}
func Route(path, method string) (component, token, noun string, arg []string, err error) {
rev := regexp.MustCompile(server.ComponentRevocation + "/(events|update|issuancerecord)/?(.*)$")
rev := regexp.MustCompile(server.ComponentRevocation + "/(events|updateevents|update|issuancerecord)/?(.*)$")
matches := rev.FindStringSubmatch(path)
if len(matches) == 3 {
args := strings.Split(matches[2], "/")
......
......@@ -174,18 +174,13 @@ func TestRevocationAll(t *testing.T) {
})
t.Run("POSTUpdates", func(t *testing.T) {
revocationConfiguration = revocationConf(t)
revocationConfiguration.RevocationSettings[revocationTestCred].PostURLs = []string{
"http://localhost:48680",
}
StartIrmaServer(t, false)
defer func() {
StopIrmaServer()
revocationConfiguration = nil
}()
startRevocationServer(t, true)
defer stopRevocationServer()
StartIrmaServer(t, false)
defer StopIrmaServer()
require.NoError(t, irmaServerConfiguration.IrmaConfiguration.Revocation.SyncDB(revocationTestCred))
sacc1, err := revocationConfiguration.IrmaConfiguration.Revocation.Accumulator(revocationTestCred, revocationPkCounter)
require.NoError(t, err)
acctime := sacc1.Accumulator.Time
......@@ -695,10 +690,11 @@ func fakeMultipleRevocations(t *testing.T, count uint64, conf *irma.RevocationSt
require.NoError(t, conf.AddUpdate(revocationTestCred, update))
}
func revocationConf(t *testing.T) *server.Configuration {
func revocationConf(_ *testing.T) *server.Configuration {
return &server.Configuration{
URL: "http://localhost:48683",
Logger: logger,
EnableSSE: true,
DisableSchemesUpdate: true,
SchemesPath: filepath.Join(testdata, "irma_configuration"),
RevocationSettings: map[irma.CredentialTypeIdentifier]*irma.RevocationSetting{
......
......@@ -27,7 +27,7 @@ var (
)
func init() {
logger.Level = logrus.FatalLevel
logger.Level = logrus.TraceLevel
logger.Formatter = &prefixed.TextFormatter{
ForceFormatting: true,
ForceColors: true,
......@@ -67,7 +67,7 @@ func StartIrmaServer(t *testing.T, updatedIrmaConf bool) {
DisableSchemesUpdate: true,
SchemesPath: filepath.Join(testdata, irmaconf),
RevocationSettings: map[irma.CredentialTypeIdentifier]*irma.RevocationSetting{
revocationTestCred: {RevocationServerURL: "http://localhost:48683/"},
revocationTestCred: {RevocationServerURL: "http://localhost:48683/", SSE: true},
revKeyshareTestCred: {RevocationServerURL: "http://localhost:48683/"},
},
}
......
package irma
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"math/bits"
"sort"
"strings"
"sync"
"time"
sseclient "astuart.co/go-sse"
"github.com/alexandrevicenzi/go-sse"
"github.com/fxamacker/cbor"
"github.com/getsentry/raven-go"
"github.com/go-errors/errors"
......@@ -35,6 +40,11 @@ type (
Keys RevocationKeys
client RevocationClient
ServerSentEvents *sse.Server
close chan struct{}
events chan *sseclient.Event
}
// RevocationClient offers an HTTP client to the revocation server endpoints.
......@@ -54,6 +64,7 @@ type (
ServerMode bool `json:"server,omitempty" mapstructure:"server"`
RevocationServerURL string `json:"revocation_server_url,omitempty" mapstructure:"revocation_server_url"`
Tolerance uint64 `json:"tolerance,omitempty" mapstructure:"tolerance"` // in seconds, min 30
SSE bool `json:"sse,omitempty" mapstructure:"sse"`
// set to now whenever a new update is received, or when the RA indicates
// there are no new updates. Thus it specifies up to what time our nonrevocation
......@@ -292,6 +303,11 @@ func (*RevocationStorage) newUpdates(records []*AccumulatorRecord, events []*Eve
}
func (rs *RevocationStorage) AddUpdate(id CredentialTypeIdentifier, record *revocation.Update) error {
if rs.sqlMode {
return rs.sqldb.Transaction(func(tx sqlRevStorage) error {
return rs.addUpdate(tx, id, record, false)
})
}
return rs.addUpdate(rs.sqldb, id, record, false)
}
......@@ -325,6 +341,8 @@ func (rs *RevocationStorage) addUpdate(tx sqlRevStorage, id CredentialTypeIdenti
s := rs.getSettings(id)
s.updated = time.Now()
// POST record to listeners, if any, asynchroniously
rs.PostUpdate(id, update)
return nil
}
......@@ -545,6 +563,8 @@ func (rs *RevocationStorage) updateAccumulatorTimes(types []CredentialTypeIdenti
s := rs.getSettings(r.CredType)
s.updated = time.Now()
// POST record to listeners, if any, asynchroniously
rs.PostUpdate(r.CredType, &revocation.Update{SignedAccumulator: sacc})
}
return nil
})
......@@ -615,6 +635,49 @@ func (rs *RevocationStorage) SaveIssuanceRecord(id CredentialTypeIdentifier, rec
// Misscelaneous methods
func (rs *RevocationStorage) receiveUpdates() {
for {
select {
case event := <-rs.events:
var (
segments = strings.Split(event.URI, "/")
id = NewCredentialTypeIdentifier(segments[len(segments)-1])
update revocation.Update
err error
)
if err = json.Unmarshal(event.Data, &update); err != nil {
Logger.Warn("failed to unmarshal pushed update: ", err)
} else {
Logger.WithField("credtype", id).Trace("received SSE update event")
if err = rs.AddUpdate(id, &update); err != nil {
Logger.Warn("failed to add pushed update: ", err)
}
}
case <-rs.close:
return
}
}
}
func (rs *RevocationStorage) listenUpdates(id CredentialTypeIdentifier, url string) {
logger := Logger.WithField("credtype", id)
logger.Trace("listening for SSE update events")
// make a context that closes when rs.close closes
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-rs.close:
cancel()
case <-ctx.Done():
}
}()
err := sseclient.Notify(ctx, url, true, rs.events)
if err != nil {
logger.Warn("SSE connection closed: ", err)
}
}
func (rs *RevocationStorage) Load(debug bool, dbtype, connstr string, settings map[CredentialTypeIdentifier]*RevocationSetting) error {
var t *CredentialTypeIdentifier
var ourtypes []CredentialTypeIdentifier
......@@ -626,6 +689,26 @@ func (rs *RevocationStorage) Load(debug bool, dbtype, connstr string, settings m
ourtypes = append(ourtypes, id)
t = &id
}
if s.SSE {
url := s.RevocationServerURL
if url == "" {
if credtype := rs.conf.CredentialTypes[id]; credtype != nil {
if len(credtype.RevocationServers) > 0 {
url = credtype.RevocationServers[0]
}
}
}
if url == "" {
return errors.Errorf("revocation server of %s unknown", id.String())
}
if rs.close == nil {
rs.close = make(chan struct{})
rs.events = make(chan *sseclient.Event)
go rs.receiveUpdates()
}
url = fmt.Sprintf("%srevocation/updateevents/%s", url, id.String())
go rs.listenUpdates(id, url)
}
}
if t != nil && connstr == "" {
return errors.Errorf("revocation mode for %s requires SQL database but no connection string given", *t)
......@@ -679,6 +762,9 @@ func (rs *RevocationStorage) Load(debug bool, dbtype, connstr string, settings m
}
func (rs *RevocationStorage) Close() error {
if rs.close != nil {
close(rs.close)
}
return rs.sqldb.Close()
}
......@@ -736,6 +822,15 @@ func (rs *RevocationStorage) getSettings(id CredentialTypeIdentifier) *Revocatio
return s
}
func (rs *RevocationStorage) PostUpdate(id CredentialTypeIdentifier, update *revocation.Update) {
if rs.ServerSentEvents == nil || !rs.getSettings(id).ServerMode {
return
}
Logger.WithField("credtype", id).Tracef("sending SSE update event")
bts, _ := json.Marshal(update)
rs.ServerSentEvents.SendMessage("revocation/"+id.String(), sse.SimpleMessage(string(bts)))
}
func (client RevocationClient) PostIssuanceRecord(id CredentialTypeIdentifier, sk *revocation.PrivateKey, rec *IssuanceRecord, url string) error {
message, err := signed.MarshalSign(sk.ECDSA, rec)
if err != nil {
......
......@@ -103,7 +103,7 @@ func (s *Server) HandlerFunc() http.HandlerFunc {
}
}
component, token, noun, _, err := servercore.Route(r.URL.Path, r.Method)
component, token, noun, args, err := servercore.Route(r.URL.Path, r.Method)
if err == nil && component == server.ComponentSession && noun == "statusevents" { // if err != nil we let it be handled by HandleProtocolMessage below
if err = s.SubscribeServerSentEvents(w, r, token, false); err != nil {
server.WriteResponse(w, nil, &irma.RemoteError{
......@@ -114,6 +114,17 @@ func (s *Server) HandlerFunc() http.HandlerFunc {
}
return
}
if err == nil && component == server.ComponentRevocation && noun == "updateevents" {
id := irma.NewCredentialTypeIdentifier(args[0])
if settings := s.Conf.RevocationSettings[id]; settings != nil &&
settings.ServerMode &&
s.ServerSentEvents != nil {
s.ServerSentEvents.ServeHTTP(w, r)
} else {
server.WriteError(w, server.ErrorUnsupported, "")
}
return
}
status, response, headers, _ := s.HandleProtocolMessage(r.URL.Path, r.Method, r.Header, message)
for key, h := range headers {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment