Commit ea7c2d88 authored by Sietse Ringers's avatar Sietse Ringers
Browse files

Add support for server sent events

parent fdb77118
......@@ -405,6 +405,14 @@
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "v1"
digest = "1:08eeb29b91cc584a7227b52e8865638091ee6f399689d8e16e63eb9f91d9cda8"
name = "gopkg.in/antage/eventsource.v1"
packages = ["."]
pruneopts = "UT"
revision = "803f4c5af2259daa9225c72732b23e1e65ee794e"
[[projects]]
digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96"
name = "gopkg.in/yaml.v2"
......@@ -445,6 +453,7 @@
"github.com/spf13/cobra",
"github.com/spf13/viper",
"github.com/stretchr/testify/require",
"gopkg.in/antage/eventsource.v1",
]
solver-name = "gps-cdcl"
solver-version = 1
......@@ -65,6 +65,10 @@ const (
StatusTimeout Status = "TIMEOUT" // Session timed out
)
func (status Status) Finished() bool {
return status == StatusDone || status == StatusCancelled || status == StatusTimeout
}
// RemoteError converts an error and an explaining message to an *irma.RemoteError.
func RemoteError(err Error, message string) *irma.RemoteError {
stack := string(debug.Stack())
......
......@@ -162,6 +162,30 @@ func CancelSession(token string) error {
return nil
}
func ParsePath(path string) (string, string, error) {
pattern := regexp.MustCompile("(\\w+)/?(|commitments|proofs|status|statusevents)$")
matches := pattern.FindStringSubmatch(path)
if len(matches) != 3 {
return "", "", server.LogWarning(errors.Errorf("Invalid URL: %s", path))
}
return matches[1], matches[2], nil
}
func SubscribeServerSentEvents(w http.ResponseWriter, r *http.Request, token string) error {
session := sessions.get(token)
if session == nil {
return server.LogError(errors.Errorf("can't subscribe to server sent events of unknown session %s", token))
}
if session.status.Finished() {
return server.LogError(errors.Errorf("can't subscribe to server sent events of finished session %s", token))
}
session.Lock()
defer session.Unlock()
session.eventSource().ServeHTTP(w, r)
return nil
}
func HandleProtocolMessage(
path string,
method string,
......@@ -183,17 +207,13 @@ func HandleProtocolMessage(
conf.Logger.Trace("POST body: ", string(message))
}
conf.Logger.Trace("HTTP headers: ", server.ToJson(headers))
pattern := regexp.MustCompile("(\\w+)/?(|commitments|proofs|status)$")
matches := pattern.FindStringSubmatch(path)
if len(matches) != 3 {
conf.Logger.Warnf("Invalid URL: %s", path)
status, output = server.JsonResponse(nil, server.RemoteError(server.ErrorInvalidRequest, ""))
token, noun, err := ParsePath(path)
if err != nil {
status, output = server.JsonResponse(nil, server.RemoteError(server.ErrorUnsupported, ""))
return
}
// Fetch the session
token := matches[1]
noun := matches[2]
session := sessions.get(token)
if session == nil {
conf.Logger.Warnf("Session not found: %s", token)
......@@ -207,12 +227,11 @@ func HandleProtocolMessage(
// then we should inform the user by returning a SessionResult - but only if we have not
// already done this in the past, e.g. by a previous HTTP call handled by this function
defer func() {
if session.finished() && !session.returned {
if session.status.Finished() && !session.returned {
session.returned = true
result = session.result
conf.Logger.Infof("Session %s done, status %s", session.token, session.result.Status)
}
sessions.update(token, session)
}()
// Route to handler
......@@ -241,6 +260,12 @@ func HandleProtocolMessage(
status, output = server.JsonResponse(nil, session.fail(server.ErrorInvalidRequest, ""))
return
default:
if noun == "statusevents" {
err := server.RemoteError(server.ErrorInvalidRequest, "server sent events not supported by this server")
status, output = server.JsonResponse(nil, err)
return
}
if method == http.MethodGet && noun == "status" {
status, output = server.JsonResponse(session.handleGetStatus())
return
......
......@@ -14,7 +14,7 @@ import (
var conf *server.Configuration
func (session *session) handleDelete() {
if session.finished() {
if session.status.Finished() {
return
}
session.markAlive()
......
......@@ -2,6 +2,7 @@ package core
import (
"encoding/json"
"net/http"
"reflect"
"time"
......@@ -10,16 +11,11 @@ import (
"github.com/privacybydesign/gabi"
"github.com/privacybydesign/irmago"
"github.com/privacybydesign/irmago/server"
"gopkg.in/antage/eventsource.v1"
)
// Session helpers
func (session *session) finished() bool {
return session.status == server.StatusDone ||
session.status == server.StatusCancelled ||
session.status == server.StatusTimeout
}
func (session *session) markAlive() {
session.lastActive = time.Now()
conf.Logger.Debugf("session %s marked active at %s", session.token, session.lastActive.String())
......@@ -29,6 +25,14 @@ func (session *session) setStatus(status server.Status) {
conf.Logger.Debugf("Status of session %s updated to %s", session.token, status)
session.status = status
session.result.Status = status
sessions.update(session)
}
func (session *session) onUpdate() {
if session.evtSource != nil {
conf.Logger.Tracef("Sending %s to SSE listeners of session %s", session.status, session.token)
session.evtSource.SendEventMessage(string(session.status), "", "")
}
}
func (session *session) fail(err server.Error, message string) *irma.RemoteError {
......@@ -116,6 +120,18 @@ func (session *session) getProofP(commitments *irma.IssueCommitmentMessage, sche
return session.kssProofs[scheme], nil
}
var eventHeaders = [][]byte{[]byte("Access-Control-Allow-Origin: *")}
func (session *session) eventSource() eventsource.EventSource {
if session.evtSource != nil {
return session.evtSource
}
conf.Logger.Trace("Making server sent event source for session ", session.token)
session.evtSource = eventsource.New(nil, func(_ *http.Request) [][]byte { return eventHeaders })
return session.evtSource
}
// Other
func chooseProtocolVersion(min, max *irma.ProtocolVersion) (*irma.ProtocolVersion, error) {
......
......@@ -10,6 +10,7 @@ import (
"github.com/privacybydesign/gabi/big"
"github.com/privacybydesign/irmago"
"github.com/privacybydesign/irmago/server"
"gopkg.in/antage/eventsource.v1"
)
type session struct {
......@@ -21,7 +22,9 @@ type session struct {
rrequest irma.RequestorRequest
request irma.SessionRequest
status server.Status
status server.Status
evtSource eventsource.EventSource
lastActive time.Time
returned bool
result *server.SessionResult
......@@ -32,7 +35,7 @@ type session struct {
type sessionStore interface {
get(token string) *session
add(token string, session *session)
update(token string, session *session)
update(session *session)
deleteExpired()
}
......@@ -42,12 +45,10 @@ type memorySessionStore struct {
}
const (
maxSessionLifetime = 5 * time.Minute // After this a session is cancelled
expiryTicker = 10 * time.Second // Every so often we check if any session has expired
maxSessionLifetime = 5 * time.Minute // After this a session is cancelled
sessionChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
)
const sessionChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
var (
minProtocolVersion = irma.NewVersion(2, 4)
maxProtocolVersion = irma.NewVersion(2, 4)
......@@ -78,13 +79,11 @@ func (s *memorySessionStore) add(token string, session *session) {
s.m[token] = session
}
func (s *memorySessionStore) update(token string, session *session) {
// nop
func (s *memorySessionStore) update(session *session) {
session.onUpdate()
}
func (s memorySessionStore) deleteExpired() {
conf.Logger.Trace("Deleting expired sessions")
// First check which sessions have expired
// We don't need a write lock for this yet, so postpone that for actual deleting
s.RLock()
......@@ -92,13 +91,13 @@ func (s memorySessionStore) deleteExpired() {
for token, session := range s.m {
session.Lock()
timeout := 5 * time.Minute
timeout := maxSessionLifetime
if session.status == server.StatusInitialized && session.rrequest.Base().ClientTimeout != 0 {
timeout = time.Duration(session.rrequest.Base().ClientTimeout) * time.Second
}
if session.lastActive.Add(timeout).Before(time.Now()) {
if !session.finished() {
if !session.status.Finished() {
conf.Logger.Infof("Session %s expired", token)
session.markAlive()
session.setStatus(server.StatusTimeout)
......@@ -114,6 +113,10 @@ func (s memorySessionStore) deleteExpired() {
// Using a write lock, delete the expired sessions
s.Lock()
for _, token := range expired {
session := s.m[token]
if session.evtSource != nil {
session.evtSource.Close()
}
delete(s.m, token)
}
s.Unlock()
......@@ -129,16 +132,20 @@ func newSession(action irma.Action, request irma.RequestorRequest) *session {
request: request.SessionRequest(),
lastActive: time.Now(),
token: token,
status: server.StatusInitialized,
result: &server.SessionResult{
Token: token,
Type: action,
Token: token,
Type: action,
Status: server.StatusInitialized,
},
}
s.setStatus(server.StatusInitialized)
conf.Logger.Debug("New session started: ", token)
nonce, _ := gabi.RandomBigInt(gabi.DefaultSystemParameters[2048].Lstatzk)
s.request.SetNonce(nonce)
s.request.SetContext(one)
sessions.add(token, s)
return s
}
......
......@@ -30,6 +30,7 @@ var (
ErrorMalformedInput Error = Error{Type: "MALFORMED_INPUT", Status: 400, Description: "Input could not be parsed"}
ErrorUnknown Error = Error{Type: "EXCEPTION", Status: 500, Description: "Encountered unexpected problem"}
ErrorUnsupported Error = Error{Type: "UNSUPPORTED", Status: 501, Description: "Unsupported by this server"}
ErrorInvalidRequest Error = Error{Type: "INVALID_REQUEST", Status: 400, Description: "Invalid HTTP request"}
ErrorProtocolVersion Error = Error{Type: "PROTOCOL_VERSION", Status: 400, Description: "Protocol version negotiation failed"}
)
......@@ -9,6 +9,7 @@ import (
"io/ioutil"
"net/http"
"github.com/go-errors/errors"
"github.com/privacybydesign/irmago"
"github.com/privacybydesign/irmago/server"
"github.com/privacybydesign/irmago/server/core"
......@@ -53,6 +54,10 @@ func CancelSession(token string) error {
return core.CancelSession(token)
}
func SubscribeServerSentEvents(w http.ResponseWriter, r *http.Request, token string) error {
return core.SubscribeServerSentEvents(w, r, token)
}
// HttpHandlerFunc returns a http.HandlerFunc that handles the IRMA protocol
// with IRMA apps. Initialize() must be called before this.
//
......@@ -63,14 +68,28 @@ func CancelSession(token string) error {
func HttpHandlerFunc() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var message []byte
message, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
var err error
if r.Method == http.MethodPost {
if message, err = ioutil.ReadAll(r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
token, noun, err := core.ParsePath(r.URL.Path)
if err == nil && noun == "statusevents" { // if err != nil we let it be handled by HandleProtocolMessage below
if err = SubscribeServerSentEvents(w, r, token); err != nil {
server.WriteError(w, server.ErrorUnexpectedRequest, err.Error())
}
return
}
status, response, result := core.HandleProtocolMessage(r.URL.Path, r.Method, r.Header, message)
w.WriteHeader(status)
w.Write(response)
_, err = w.Write(response)
if err != nil {
_ = server.LogError(errors.WrapPrefix(err, "http.ResponseWriter.Write() returned error", 0))
}
if result != nil {
if handler, ok := handlers[result.Token]; ok {
go handler(result)
......
......@@ -87,13 +87,15 @@ func Initialize(config *Configuration) error {
return nil
}
var corsOptions = cors.Options{
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "Cache-Control"},
AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete},
}
func ClientHandler() http.Handler {
router := chi.NewRouter()
router.Use(cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete},
}).Handler)
router.Use(cors.New(corsOptions).Handler)
router.Mount("/irma/", irmarequestor.HttpHandlerFunc())
return router
......@@ -103,11 +105,7 @@ func ClientHandler() http.Handler {
// and IRMA client messages.
func Handler() http.Handler {
router := chi.NewRouter()
router.Use(cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete},
}).Handler)
router.Use(cors.New(corsOptions).Handler)
if !conf.separateClientServer() {
// Mount server for irmaclient
......@@ -118,6 +116,7 @@ func Handler() http.Handler {
router.Post("/session", handleCreate)
router.Delete("/session/{token}", handleDelete)
router.Get("/session/{token}/status", handleStatus)
router.Get("/session/{token}/statusevents", handleStatusEvents)
router.Get("/session/{token}/result", handleResult)
// Routes for getting signed JWTs containing the session result. Only work if configuration has a private key
......@@ -208,6 +207,14 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
server.WriteJson(w, res.Status)
}
func handleStatusEvents(w http.ResponseWriter, r *http.Request) {
token := chi.URLParam(r, "token")
conf.Logger.Debug("new client subscribed to server sent events of session " + token)
if err := irmarequestor.SubscribeServerSentEvents(w, r, token); err != nil {
server.WriteError(w, server.ErrorUnexpectedRequest, err.Error())
}
}
func handleDelete(w http.ResponseWriter, r *http.Request) {
err := irmarequestor.CancelSession(chi.URLParam(r, "token"))
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment