Commit 29b7b829 authored by Sietse Ringers's avatar Sietse Ringers

refactor: switch server sent events dependency

parent ee9fe190
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
branch = "master"
digest = "1:24b7325ed858ab22648d01abb40f1b5f96af67ae20c0bee6807f49600688dc9e"
name = "github.com/alexandrevicenzi/go-sse"
packages = ["."]
pruneopts = "UT"
revision = "7b23d5ff7420ab4f1a3e45a584a56bfc9e4ba49e"
[[projects]]
branch = "master"
digest = "1:e730c8372514c662e9ed97228c2e2023f1ec99eb68f7bb56a44c1084733d85f5"
......@@ -552,14 +560,6 @@
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "v1"
digest = "1:08eeb29b91cc584a7227b52e8865638091ee6f399689d8e16e63eb9f91d9cda8"
name = "gopkg.in/antage/eventsource.v1"
packages = ["."]
pruneopts = "UT"
revision = "803f4c5af2259daa9225c72732b23e1e65ee794e"
[[projects]]
digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96"
name = "gopkg.in/yaml.v2"
......@@ -584,6 +584,7 @@
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/alexandrevicenzi/go-sse",
"github.com/bwesterb/go-atum",
"github.com/dgrijalva/jwt-go",
"github.com/fxamacker/cbor",
......@@ -615,7 +616,6 @@
"github.com/timshannon/bolthold",
"github.com/x-cray/logrus-prefixed-formatter",
"go.etcd.io/bbolt",
"gopkg.in/antage/eventsource.v1",
]
solver-name = "gps-cdcl"
solver-version = 1
......@@ -7,12 +7,14 @@ package servercore
import (
"encoding/hex"
"encoding/json"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/alexandrevicenzi/go-sse"
"github.com/go-errors/errors"
"github.com/jasonlvhit/gocron"
"github.com/privacybydesign/gabi/revocation"
......@@ -22,10 +24,11 @@ import (
)
type Server struct {
conf *server.Configuration
sessions sessionStore
scheduler *gocron.Scheduler
stopScheduler chan bool
ServerSentEvents *sse.Server
conf *server.Configuration
sessions sessionStore
scheduler *gocron.Scheduler
stopScheduler chan bool
}
func New(conf *server.Configuration) (*Server, error) {
......@@ -41,7 +44,23 @@ func New(conf *server.Configuration) (*Server, error) {
client: make(map[string]*session),
conf: conf,
},
ServerSentEvents: sse.NewServer(&sse.Options{
ChannelNameFunc: func(r *http.Request) string {
token, noun, _, err := ParsePath(r.URL.Path)
if err == nil && token != "" && noun == "statusevents" {
return "session/" + token
}
return ""
},
Headers: map[string]string{
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, OPTIONS",
"Access-Control-Allow-Headers": "Keep-Alive,X-Requested-With,Cache-Control,Content-Type,Last-Event-ID",
},
Logger: log.New(conf.Logger.WithField("type", "sse").WriterLevel(logrus.DebugLevel), "", 0),
}),
}
s.scheduler.Every(10).Seconds().Do(func() {
s.sessions.deleteExpired()
})
......@@ -180,21 +199,17 @@ func (s *Server) SubscribeServerSentEvents(w http.ResponseWriter, r *http.Reques
return server.LogError(errors.Errorf("can't subscribe to server sent events of finished session %s", token))
}
session.Lock()
defer session.Unlock()
// The EventSource.onopen Javascript callback is not consistently called across browsers (Chrome yes, Firefox+Safari no).
// However, when the SSE connection has been opened the webclient needs some signal so that it can early detect SSE failures.
// So we manually send an "open" event. Unfortunately:
// - we need to give the webclient that connected just now some time, otherwise it will miss the "open" event
// - the "open" event also goes to all other webclients currently listening, as we have no way to send this
// event to just the webclient currently listening. (Thus the handler of this "open" event must be idempotent.)
evtSource := session.eventSource()
go func() {
time.Sleep(200 * time.Millisecond)
evtSource.SendEventMessage("", "open", "")
s.ServerSentEvents.SendMessage("session/"+token, sse.NewMessage("", "", "open"))
}()
evtSource.ServeHTTP(w, r)
s.ServerSentEvents.ServeHTTP(w, r)
return nil
}
......
......@@ -4,10 +4,10 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"
"github.com/alexandrevicenzi/go-sse"
"github.com/dgrijalva/jwt-go"
"github.com/go-errors/errors"
"github.com/privacybydesign/gabi"
......@@ -15,7 +15,6 @@ import (
"github.com/privacybydesign/irmago"
"github.com/privacybydesign/irmago/server"
"github.com/sirupsen/logrus"
"gopkg.in/antage/eventsource.v1"
)
// Session helpers
......@@ -34,12 +33,12 @@ func (session *session) setStatus(status server.Status) {
}
func (session *session) onUpdate() {
if session.evtSource != nil {
session.conf.Logger.WithFields(logrus.Fields{"session": session.token, "status": session.status}).
Debug("Sending status to SSE listeners")
// We send JSON like the other APIs, so quote
session.evtSource.SendEventMessage(fmt.Sprintf(`"%s"`, session.status), "", "")
}
session.sse.SendMessage("session/"+session.clientToken,
sse.SimpleMessage(fmt.Sprintf(`"%s"`, session.status)),
)
session.sse.SendMessage("session/"+session.token,
sse.SimpleMessage(fmt.Sprintf(`"%s"`, session.status)),
)
}
func (session *session) fail(err server.Error, message string) *irma.RemoteError {
......@@ -205,18 +204,6 @@ func (session *session) getProofP(commitments *irma.IssueCommitmentMessage, sche
return session.kssProofs[scheme], nil
}
var eventHeaders = [][]byte{[]byte("Access-Control-Allow-Origin: *")}
func (session *session) eventSource() eventsource.EventSource {
if session.evtSource != nil {
return session.evtSource
}
session.conf.Logger.WithFields(logrus.Fields{"session": session.token}).Debug("Making server sent event source")
session.evtSource = eventsource.New(nil, func(_ *http.Request) [][]byte { return eventHeaders })
return session.evtSource
}
// Other
func (session *session) chooseProtocolVersion(minClient, maxClient *irma.ProtocolVersion) (*irma.ProtocolVersion, error) {
......
......@@ -5,13 +5,14 @@ import (
"sync"
"time"
"github.com/alexandrevicenzi/go-sse"
"github.com/privacybydesign/gabi"
"github.com/privacybydesign/gabi/big"
"github.com/privacybydesign/irmago"
"github.com/privacybydesign/irmago/internal/fs"
"github.com/privacybydesign/irmago/server"
"github.com/sirupsen/logrus"
"gopkg.in/antage/eventsource.v1"
)
type session struct {
......@@ -27,7 +28,7 @@ type session struct {
status server.Status
prevStatus server.Status
evtSource eventsource.EventSource
sse *sse.Server
responseCache responseCache
lastActive time.Time
......@@ -100,9 +101,8 @@ func (s *memorySessionStore) stop() {
s.Lock()
defer s.Unlock()
for _, session := range s.requestor {
if session.evtSource != nil {
session.evtSource.Close()
}
session.sse.CloseChannel("session/" + session.token)
session.sse.CloseChannel("session/" + session.clientToken)
}
}
......@@ -137,9 +137,8 @@ func (s *memorySessionStore) deleteExpired() {
s.Lock()
for _, token := range expired {
session := s.requestor[token]
if session.evtSource != nil {
session.evtSource.Close()
}
session.sse.CloseChannel("session/" + session.token)
session.sse.CloseChannel("session/" + session.clientToken)
delete(s.client, session.clientToken)
delete(s.requestor, token)
}
......@@ -163,6 +162,7 @@ func (s *Server) newSession(action irma.Action, request irma.RequestorRequest) *
prevStatus: server.StatusInitialized,
conf: s.conf,
sessions: s.sessions,
sse: s.ServerSentEvents,
result: &server.SessionResult{
LegacySession: request.SessionRequest().Base().Legacy(),
Token: token,
......
......@@ -207,7 +207,7 @@ func (s *Server) Handler() http.Handler {
r.Post("/session", s.handleCreateSession)
r.Delete("/session/{token}", s.handleDelete)
r.Get("/session/{token}/status", s.handleStatus)
r.Get("/session/{token}/statusevents", s.handleStatusEvents)
r.HandleFunc("/session/{token}/statusevents", s.handleStatusEvents)
r.Get("/session/{token}/result", s.handleResult)
// Routes for getting signed JWTs containing the session result. Only work if configuration has a private key
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment