We planned to upgrade GitLab and Mattermost to the latest version this Friday morning (early). You may experience some downtime!

Commit cbc8df21 authored by Steffen Michels's avatar Steffen Michels

add timeout parameters to 'tcpconnect' & timeout option to 'sendEmail'

parent 11719419
Pipeline #30391 passed with stage
in 5 minutes and 46 seconds
......@@ -212,7 +212,7 @@ where
syncNetworkChannel :: String Int String (String -> m) (m -> String) (Shared sds ([m],Bool,[m],Bool)) -> Task () | iTask m & RWShared sds
syncNetworkChannel server port msgSeparator decodeFun encodeFun channel
= tcpconnect server port channel {ConnectionHandlers|onConnect=onConnect,onData=onData,onShareChange=onShareChange,onDisconnect=onDisconnect,onDestroy= \s->(Ok s, [])} @! ()
= tcpconnect server port Nothing channel {ConnectionHandlers|onConnect=onConnect,onData=onData,onShareChange=onShareChange,onDisconnect=onDisconnect,onDestroy= \s->(Ok s, [])} @! ()
where
onConnect _ _ (received,receiveStopped,send,sendStopped)
= (Ok "",if (not (isEmpty send)) (Just (received,False,[],sendStopped)) Nothing, map encodeFun send,False)
......
......@@ -44,14 +44,14 @@ hostTaskPoolServer
connectToTaskPoolServer :: Task ()
connectToTaskPoolServer
= Hint "Connect to task pool" @>> enterInformation []
>>= \{ConnectToTaskPool|domain=(Domain host),port} -> (instanceClient host port (Domain host)) -|| (instanceFilter (const True) (Domain host))
>>= \{ConnectToTaskPool|domain=(Domain host),port} -> (instanceClient host port Nothing (Domain host)) -|| (instanceFilter (const True) (Domain host))
intermediateTaskPoolServer :: Task ()
intermediateTaskPoolServer
= Hint "Enter YOUR subdomain" @>> enterInformation []
>>= \subdomain -> Hint "Enter a port for YOUR task pool server" @>> enterInformation []
>>= \serverPort -> Hint "Connect to (master) task pool" @>> enterInformation []
>>= \{ConnectToTaskPool|domain=(Domain host),port} -> ((instanceClient host port (Domain host)) -|| (instanceClameFilter (const True) (Domain host))) -|| instanceServer serverPort subdomain
>>= \{ConnectToTaskPool|domain=(Domain host),port} -> ((instanceClient host port Nothing (Domain host)) -|| (instanceClameFilter (const True) (Domain host))) -|| instanceServer serverPort subdomain
askQuestion :: Task String
askQuestion
......
......@@ -9,7 +9,7 @@ derive class iTask DeviceRequestState
deviceRequest :: String (String -> Bool) -> Task String
deviceRequest request close
= tcpconnect "127.0.0.1" 20097 (constShare ())
= tcpconnect "127.0.0.1" 20097 Nothing (constShare ())
{ ConnectionHandlers
| onConnect = onConnect
, onData = onData
......
......@@ -129,7 +129,7 @@ request host port request
where
client :: Task (Maybe a) | iTask a
client
= ((tcpconnect host port (constShare ())
= ((tcpconnect host port Nothing (constShare ())
{ ConnectionHandlers
| onConnect = onConnect
, onData = onData
......
......@@ -7,7 +7,7 @@ import StdTuple
import StdFile
import StdOrdList
from StdFunc import const, o
from TCPIP import :: Timeout
import System.OS
from iTasks.WF.Definition import class iTask
......
......@@ -3,6 +3,7 @@ definition module iTasks.Extensions.Email
* This module provides basic SMTP email support
*/
from Text.HTML import :: HtmlTag
from TCPIP import :: Timeout
import iTasks
/**
......@@ -32,3 +33,4 @@ sendHtmlEmail :: ![EmailOpt] !String ![String] !String !HtmlTag -> Task ()
= EmailOptSMTPServer !String //SMTP server to use. Default: localhost
| EmailOptSMTPServerPort !Int //TCP port of the SMTP server to use. Default: 25
| EmailOptExtraHeaders ![(String,String)] //Additional headers to add before the body
| EmailOptTimeout !Timeout // TCP timeout
......@@ -6,11 +6,12 @@ import Text, Text.HTML
sendEmail :: ![EmailOpt] !String ![String] !String !String -> Task ()
sendEmail opts sender recipients subject body
= tcpconnect server port (constShare ()) {ConnectionHandlers|onConnect=onConnect,onData=onData,onDisconnect=onDisconnect,onShareChange = \l _ = (Ok l, Nothing, [], False), onDestroy= \s->(Ok s, [])}
= tcpconnect server port timeout (constShare ()) {ConnectionHandlers|onConnect=onConnect,onData=onData,onDisconnect=onDisconnect,onShareChange = \l _ = (Ok l, Nothing, [], False), onDestroy= \s->(Ok s, [])}
@! ()
where
server = getServerOpt opts
port = getPortOpt opts
timeout = getTimeoutOpt opts
headers = getHeadersOpt opts
//Sending the message with SMTP is essentially one-way communication
//but we send it in parts. After each part we get a response with a status code.
......@@ -94,3 +95,7 @@ getPortOpt [x:xs] = getPortOpt xs
getHeadersOpt [] = []
getHeadersOpt [EmailOptExtraHeaders s:xs] = s ++ getHeadersOpt xs
getHeadersOpt [x:xs] = getHeadersOpt xs
getTimeoutOpt [] = Nothing
getTimeoutOpt [EmailOptTimeout t:xs] = Just t
getTimeoutOpt [x:xs] = getTimeoutOpt xs
......@@ -185,7 +185,7 @@ where
callHTTP :: !HTTPMethod !URI !String !(HTTPResponse -> (MaybeErrorString a)) -> Task a | iTask a
callHTTP method url=:{URI|uriScheme,uriRegName=Just uriRegName,uriPort,uriPath,uriQuery,uriFragment} data parseFun
= tcpconnect uriRegName port (constShare ()) {ConnectionHandlers|onConnect=onConnect,onData=onData,onShareChange=onShareChange,onDisconnect=onDisconnect,onDestroy= \s->(Ok s, [])}
= tcpconnect uriRegName port Nothing (constShare ()) {ConnectionHandlers|onConnect=onConnect,onData=onData,onShareChange=onShareChange,onDisconnect=onDisconnect,onDestroy= \s->(Ok s, [])}
@? taskResult
where
port = fromMaybe 80 uriPort
......
......@@ -42,7 +42,7 @@ onDestroy s = (Ok s, [])
queueSDSRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r
queueSDSRequest req host port taskId symbols iworld
= case addConnection taskId host port connectionTask iworld of
= case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok id, iworld)
where
......@@ -61,7 +61,7 @@ where
= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
queueModifyRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
queueModifyRequest req=:(SDSModifyRequest p r w) host port taskId symbols iworld = case addConnection taskId host port connectionTask iworld of
queueModifyRequest req=:(SDSModifyRequest p r w) host port taskId symbols iworld = case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok id, iworld)
where
......@@ -80,7 +80,7 @@ where
= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
queueWriteRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
queueWriteRequest req=:(SDSWriteRequest sds p w) host port taskId symbols iworld = case addConnection taskId host port connectionTask iworld of
queueWriteRequest req=:(SDSWriteRequest sds p w) host port taskId symbols iworld = case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok id, iworld)
where
......@@ -101,7 +101,7 @@ where
queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !Bool !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r
queueServiceRequest (SDSRemoteService (Just _) _) _ _ _ iworld = (Error (exception "SDSRemoteService queing request while still a connection id"), iworld)
queueServiceRequest service=:(SDSRemoteService _ (HTTPShareOptions {host, port, createRequest, fromResponse})) p taskId _ iworld
= case addConnection taskId host port connectionTask iworld of
= case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok id, iworld)
where
......@@ -132,7 +132,7 @@ where
(Ok a) = (Ok (Right a), Nothing)
queueServiceRequest service=:(SDSRemoteService _ (TCPShareOptions {host, port, createMessage, fromTextResponse})) p taskId register iworld
= case addConnection taskId host port connectionTask iworld of
= case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok id, iworld)
where
......@@ -164,7 +164,7 @@ queueServiceWriteRequest :: !(SDSRemoteService p r w) !p !w !TaskId !*IWorld ->
queueServiceWriteRequest service=:(SDSRemoteService (Just _) _) _ _ _ iworld = (Error (exception "SDSRemoteService queing write request while still containing a connection id"), iworld)
queueServiceWriteRequest service=:(SDSRemoteService _ (HTTPShareOptions {host, port, writeHandlers})) p w taskId iworld
| isNothing writeHandlers = (Ok Nothing, iworld) // Writing not supported for this share.
= case addConnection taskId host port connectionTask iworld of
= case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok (Just id), iworld)
where
......@@ -196,7 +196,7 @@ where
queueServiceWriteRequest service=:(SDSRemoteService _ (TCPShareOptions {host, port, writeMessageHandlers})) p w taskId iworld
| isNothing writeMessageHandlers = (Ok Nothing, iworld)
= case addConnection taskId host port connectionTask iworld of
= case addConnection taskId host port Nothing connectionTask iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (id, _), iworld) = (Ok (Just id), iworld)
where
......
definition module iTasks.Internal.Distributed.Instance
from TCPIP import :: Timeout
from iTasks.WF.Definition import :: InstanceNo
from iTasks.UI.Editor import :: Editor
from iTasks.Internal.Generic.Visualization import :: TextFormat
......@@ -19,7 +20,7 @@ from iTasks.Extensions.Distributed.Task import :: Domain
instanceServer :: Int Domain -> Task ()
instanceClient :: String Int Domain -> Task ()
instanceClient :: String Int (Maybe Timeout) Domain -> Task ()
instanceFilter :: (TaskAttributes -> Bool) Domain -> Task ()
......
......@@ -89,7 +89,7 @@ instanceServer port domain = tcplisten port True instanceServerShared {Connectio
, onShareChange = onShareChange
, onDisconnect = onDisconnect
, onDestroy= \s->(Ok s, [])
} -|| (instanceClient` "127.0.0.1" port domain True) -|| (process instanceServerShared) @! ()
} -|| (instanceClient` "127.0.0.1" port Nothing domain True) -|| (process instanceServerShared) @! ()
where
onConnect :: ConnectionId String InstanceServerShare -> (MaybeErrorString InstanceServerState, Maybe InstanceServerShare, [String], Bool)
onConnect connId host share=:{InstanceServerShare|lastId,clients}
......@@ -457,18 +457,18 @@ getClientIdByDomain (Domain domain)
[x] -> return (Just x)
_ -> return Nothing
instanceClient :: String Int Domain -> Task ()
instanceClient host port domain = instanceClient` host port domain False
instanceClient :: String Int (Maybe Timeout) Domain -> Task ()
instanceClient host port timeout domain = instanceClient` host port timeout domain False
instanceClient` :: String Int Domain Bool -> Task ()
instanceClient` host port domain local
instanceClient` :: String Int (Maybe Timeout) Domain Bool -> Task ()
instanceClient` host port timeout domain local
= getClientId domain
>>- \clientId -> (repeatClient (client clientId) @! ()) -|| (process (instanceClientShare clientId) clientId)
where
client :: Int -> Task (Maybe ())
client clientId
= getClientServerId clientId
>>- \serverId -> (tcpconnect host port (instanceClientShare clientId) { ConnectionHandlers
>>- \serverId -> (tcpconnect host port timeout (instanceClientShare clientId) { ConnectionHandlers
| onConnect = (onConnect (maybe "connect" (\id -> ("reconnect " +++ (toString id))) serverId))
, onData = onData
, onShareChange = onShareChange
......
......@@ -26,7 +26,7 @@ serve :: ![StartupTask] ![(Int,ConnectionTask)] (*IWorld -> (Maybe Timeout,*IWor
addListener :: !TaskId !Int !Bool !(ConnectionTask) !*IWorld -> (!MaybeError TaskException (),!*IWorld)
//Dynamically add a connection
addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException (ConnectionId, Dynamic),!*IWorld)
addConnection :: !TaskId !String !Int !(Maybe Timeout) !ConnectionTask !*IWorld -> (!MaybeError TaskException (ConnectionId, Dynamic),!*IWorld)
ioStateString :: !IOStates -> String
......
......@@ -484,8 +484,8 @@ addListener taskId port removeOnClose connectionTask iworld=:{ioTasks={todo,done
# ioStates = put taskId (IOActive newMap) ioStates
= (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})
addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException (ConnectionId, Dynamic),!*IWorld)
addConnection taskId host port connectionTask=:(ConnectionTask handlers sds) iworld
addConnection :: !TaskId !String !Int !(Maybe Timeout) !ConnectionTask !*IWorld -> (!MaybeError TaskException (ConnectionId, Dynamic),!*IWorld)
addConnection taskId host port timeout connectionTask=:(ConnectionTask handlers sds) iworld
= addIOTask taskId sds init tcpConnectionIOOps onInitHandler mkIOTaskInstance iworld
where
init :: !*IWorld -> (!MaybeErrorString (!IPAddress, !*TCP_DuplexChannel), !*IWorld)
......@@ -494,7 +494,7 @@ where
= case mbIP of
Nothing = (Error ("Failed to connect to host " +++ host +++ ": lookup failed"), {iworld & world = world})
Just ip
# (tReport, mbConn, world) = connectTCP_MT Nothing (fromJust mbIP,port) world
# (tReport, mbConn, world) = connectTCP_MT timeout (fromJust mbIP,port) world
= case mbConn of
Nothing = (Error ("Failed to connect to host " +++ host +++ ":" +++ toString port), {iworld & world = world})
Just channel = (Ok (ip, channel), {iworld & world = world})
......
......@@ -3,6 +3,7 @@ definition module iTasks.WF.Tasks.IO
* This modules provides tasks that support interaction with other systems.
* Either by running external programs, creating network clients and servers, or exchanging files
*/
from TCPIP import :: Timeout
import iTasks.WF.Definition
import iTasks.SDS.Definition
from iTasks.Internal.IWorld import :: ConnectionId
......@@ -36,10 +37,11 @@ externalProcess :: !Timespec !FilePath ![String] !(Maybe FilePath) !(Maybe Proce
* Connect to an external system using TCP. This task's value becomes stable when the connection is closed
* @param Hostname
* @param Port
* @param The timeout (in ms) for opening the connection
* @param A reference to shared data the task has access to
* @param The event handler functions
*/
tcpconnect :: !String !Int !(sds () r w) (ConnectionHandlers l r w) -> Task l | iTask l & iTask r & iTask w & RWShared sds
tcpconnect :: !String !Int !(Maybe Timeout) !(sds () r w) (ConnectionHandlers l r w) -> Task l | iTask l & iTask r & iTask w & RWShared sds
/**
* Listen for connections from external systems using TCP.
* @param Port
......
......@@ -105,14 +105,14 @@ where
rep port = stringDisplay ("Listening for connections on port "<+++ port)
tcpconnect :: !String !Int !(sds () r w) (ConnectionHandlers l r w) -> Task l | iTask l & iTask r & iTask w & RWShared sds
tcpconnect host port sds handlers = Task evalinit
tcpconnect :: !String !Int !(Maybe Timeout) !(sds () r w) (ConnectionHandlers l r w) -> Task l | iTask l & iTask r & iTask w & RWShared sds
tcpconnect host port timeout sds handlers = Task evalinit
where
//We cannot make ioStates local since the engine uses it
evalinit DestroyEvent _ iworld
= (DestroyedResult, iworld)
evalinit event eo=:{TaskEvalOpts|taskId} iworld
= case addConnection taskId host port (wrapConnectionTask handlers sds) iworld of
= case addConnection taskId host port timeout (wrapConnectionTask handlers sds) iworld of
(Error e,iworld) = (ExceptionResult e, iworld)
(Ok _,iworld) = eval event eo iworld
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment