From 209db6d1fc7c06953cc16bf5f179ddf23e4fd781 Mon Sep 17 00:00:00 2001 From: Camil Staps Date: Sun, 10 May 2020 20:25:11 +0200 Subject: [PATCH] Add wrapper around basic functionality in librdkafka --- .gitlab-ci.yml | 2 +- .../OS-Independent/Message/Kafka.dcl | 228 ++++++++++++++++++ .../OS-Independent/Message/Kafka.icl | 105 ++++++++ .../OS-Independent/Message/_Kafka.dcl | 32 +++ .../OS-Independent/Message/_Kafka.icl | 200 +++++++++++++++ tests/linux64/test.icl | 2 + 6 files changed, 568 insertions(+), 1 deletion(-) create mode 100644 src/libraries/OS-Independent/Message/Kafka.dcl create mode 100644 src/libraries/OS-Independent/Message/Kafka.icl create mode 100644 src/libraries/OS-Independent/Message/_Kafka.dcl create mode 100644 src/libraries/OS-Independent/Message/_Kafka.icl diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a7297099..0efb5f34 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -2,7 +2,7 @@ test-nightly: before_script: - install_clean.sh bundle-complete - apt-get update -qq - - apt-get install -y -qq build-essential git coreutils libmariadb-dev libsnappy-dev libsqlite3-dev + - apt-get install -y -qq build-essential git coreutils libmariadb-dev librdkafka-dev libsnappy-dev libsqlite3-dev - make -C src/cdeps install diff --git a/src/libraries/OS-Independent/Message/Kafka.dcl b/src/libraries/OS-Independent/Message/Kafka.dcl new file mode 100644 index 00000000..24d6c407 --- /dev/null +++ b/src/libraries/OS-Independent/Message/Kafka.dcl @@ -0,0 +1,228 @@ +definition module Message.Kafka + +/** + * Bindings for Apache Kafka. See: + * + * - https://kafka.apache.org/intro.html + * - https://github.com/edenhill/librdkafka + * - https://github.com/edenhill/librdkafka/wiki/Language-bindings-development + * + * Not all functionality has been implemented yet. + * + * A basic producer can be implemented as follows, based on + * https://github.com/edenhill/librdkafka/blob/master/examples/producer.c: + * + * ```clean + * //* Create a producer and write "payload" to the topic "topic". + * producer :: !*World -> *World + * producer w + * # c = newKafkaClient KafkaProducer [("bootstrap.servers", "localhost")] + * | c=:(Error _) = abort (fromError c+++"\n") + * # c = fromOk c + * # t = newKafkaTopic "topic" c + * | t=:(Error _) = abort (fromError t+++"\n") + * # t = fromOk t + * # (err,w) = produceKafkaMessage KafkaUnassignedPartition Nothing "payload" t w + * | err=:(Error _) = abort (fromError err+++"\n") + * # (err,w) = flushKafkaClient 1000 c w + * | err=:(Error _) = abort (fromError err+++"\n") + * | otherwise = w + * ``` + * + * A basic producer can be implemented as follows, based on + * https://github.com/edenhill/librdkafka/blob/master/examples/consumer.c: + * + * ```clean + * //* Print messages from the topic "topic" to stdout. + * consumer :: !*World -> *World + * consumer w + * # c = newKafkaClient KafkaConsumer + * [ ("bootstrap.servers", "localhost") + * , ("group.id", "group") + * , ("auto.offset.reset", "earliest") + * ] + * | c=:(Error _) = abort (fromError c+++"\n") + * # c = fromOk c + * # (err, w) = redirectKafkaPollQueue c w + * | err=:(Error _) = abort (fromError err+++"\n") + * # (err, w) = subscribeToKafkaTopics [("topic", KafkaUnassignedPartition)] c w + * | err=:(Error _) = abort (fromError err+++"\n") + * # w = loop c w + * # w = closeKafkaConsumer c w + * # w = destroyKafkaClient c w + * = w + * where + * loop c w + * # (mbMsg,w) = pollKafkaConsumer 100 c w + * | mbMsg=:(Error _) + * # err = fromError mbMsg + * | err == "Broker: No more messages" // ignore this error + * = loop c w + * # (_,w) = fclose (stderr <<< err <<< "\n") w + * = loop c w + * # mbMsg = fromOk mbMsg + * | mbMsg=:Nothing + * = loop c w + * # msg = fromJust mbMsg + * # (io,w) = stdio w + * # io = io <<< "Message: " <<< msg.payload <<< "\n" + * # (_,w) = fclose io w + * = loop c w + * ``` + */ + +from StdMaybe import :: Maybe +from Data.Error import :: MaybeError +from System._Pointer import :: Pointer + +:: KafkaClientType + = KafkaConsumer + | KafkaProducer + +:: KafkaClient (:== Pointer) + +:: KafkaTopic (:== Pointer) + +:: KafkaMessage = + { topic :: !KafkaTopic + , partition :: !KafkaPartition + , offset :: !Int //* NB: on 32-bit systems, the upper 32 bits of the offset are lost. + , payload :: !String + , key :: !String + } + +/** + * Messages can be sent to a particular (non-negative) partition identifier, or + * to the unassigned partition `KafkaUnassignedPartition`. + */ +:: KafkaPartition :== Int +KafkaUnassignedPartition :== -1 + +/** + * Polling can either: + * + * - Not block at all (`KafkaNoBlock`) + * - Block until new events are available (`KafkaNoTimeout`) + * - Block for a certain time only (given in ms) + */ +:: KafkaBlockSetting :== Int +KafkaNoBlock :== 0 +KafkaNoTimeout :== -1 + +/** + * Creates a new client given a list of settings. + * + * @param The kind of client. + * @param Setings. + * @result A new `KafkaClient`, or an error on failure. + */ +newKafkaClient :: !KafkaClientType ![(String,String)] -> MaybeError String KafkaClient + +/** + * Let a consumer leave the group gracefully before destroying it with + * `destroyKafkaClient`. + */ +closeKafkaConsumer :: !KafkaClient !*World -> *World + +/** + * Destroy a `KafkaClient`. + * + * Before destroying a client, all remaining messages should be flushed with + * `flushKafkaClient` and a reasonably long timeout. + * + * A consumer should be closed with `closeKafkaConsumer` before destroying it. + */ +destroyKafkaClient :: !KafkaClient !*World -> *World + +/** + * Poll for new events. + * + * This function should be called regularly. + * + * @param Whether to block and for how long. + * @param The client. + * @result The number of events served. + */ +pollKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!Int, !*World) + +/** + * Attempt to flush all remaning events. + * + * This function should be called before destroying the client with + * `destroyKafkaClient`. + * + * @param Whether to block and for how long. + * @param The client. + * @result The number of events served. + */ +flushKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (), !*World) + +/** + * Create a new topic. + * + * Topics are refcounted internally. Creating a new topic with a name that has + * already been used previously will return the previous handle. + * + * NB: configurations (`rd_kafka_topic_conf_set`) are not supported yet. + * + * @param The topic name. + * @param The client. + * @result The new topic. + */ +newKafkaTopic :: !String !KafkaClient -> MaybeError String KafkaTopic + +/** + * Destroy a topic. + * + * When a topic has been 'created' multiple times (i.e. `newKafkaTopic` was + * used multiple times with the same name), this does not actually free + * resources but only decreases the topic's refcount. + */ +destroyKafkaTopic :: !KafkaTopic !*World -> *World + +//* Get the name of a topic. +kafkaTopicName :: !KafkaTopic -> String + +/** + * Produce and queue a message. + * + * `pollKafkaClient` must be called after this function to ensure the message + * is sent as well. + * + * NB: for now it is not possible to check whether a message has been sent + * successfully. + * + * @param The partition to send to, or `KafkaUnassignedPartition`. + * @param An optional message key. + * @param The message payload. + * @param The topic to send to. + * @result On failure, a human-readable error message is returned. + */ +produceKafkaMessage :: !KafkaPartition !(Maybe String) !String !KafkaTopic !*World -> (!MaybeError String (), !*World) + +/** + * Subscribe to a list of topics. + * + * @param The topics to subscribe to. + * @param The client (should be a `KafkaConsumer`). + * @result On failure, a human-readable error message is returned. + */ +subscribeToKafkaTopics :: ![(String,KafkaPartition)] !KafkaClient !*World -> (!MaybeError String (), !*World) + +/** + * Redirect the main Kafka poll queue to the poll queue of a particular client + * (should be a `KafkaConsumer`). It is not permitted to use `pollKafkaClient` + * after `redirectKafkaPollQueue`. + */ +redirectKafkaPollQueue :: !KafkaClient !*World -> (!MaybeError String (), !*World) + +/** + * Poll for new events. + * + * This function should be called regularly. + * + * @param Whether to block and for how long. + * @param The client. + * @result The number of events served. + */ +pollKafkaConsumer :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (Maybe KafkaMessage), !*World) diff --git a/src/libraries/OS-Independent/Message/Kafka.icl b/src/libraries/OS-Independent/Message/Kafka.icl new file mode 100644 index 00000000..882f4e20 --- /dev/null +++ b/src/libraries/OS-Independent/Message/Kafka.icl @@ -0,0 +1,105 @@ +implementation module Message.Kafka + +import StdEnv +import StdMaybe + +import Control.Monad +import Data.Error +import Message._Kafka +import System._Pointer +import qualified Text +from Text import class Text, instance Text String + +:: KafkaClient :== Pointer + +:: KafkaTopic :== Pointer + +// Offsets to the message struct +KM_Err :== 0 +KM_Topic :== IF_INT_64_OR_32 8 4 +KM_Partition :== IF_INT_64_OR_32 16 8 +KM_Payload :== IF_INT_64_OR_32 24 12 +KM_Len :== IF_INT_64_OR_32 32 16 +KM_Key :== IF_INT_64_OR_32 40 20 +KM_KeyLen :== IF_INT_64_OR_32 48 24 +KM_Offset :== IF_INT_64_OR_32 56 28 +KM_Private :== IF_INT_64_OR_32 64 36 + +newKafkaClient :: !KafkaClientType ![(String,String)] -> MaybeError String KafkaClient +newKafkaClient type settings = + foldM + (\c (k,v) -> rd_kafka_conf_set c k v) + rd_kafka_conf_new + settings >>= + rd_kafka_new (if type=:KafkaProducer 0 1) + +closeKafkaConsumer :: !KafkaClient !*World -> *World +closeKafkaConsumer client w = rd_kafka_consumer_close client w + +destroyKafkaClient :: !KafkaClient !*World -> *World +destroyKafkaClient client w = rd_kafka_destroy client w + +pollKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!Int, !*World) +pollKafkaClient block client w + # nevents = rd_kafka_poll client block + = (nevents, w) + +flushKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (), !*World) +flushKafkaClient block client w + # ok = rd_kafka_flush client block + | ok=:(Error _) + = (Error (rd_kafka_err2str (fromError ok)), w) + = (Ok (), w) + +newKafkaTopic :: !String !KafkaClient -> MaybeError String KafkaTopic +newKafkaTopic topic client = rd_kafka_topic_new client topic 0 + +destroyKafkaTopic :: !KafkaTopic !*World -> *World +destroyKafkaTopic topic w = rd_kafka_topic_destroy topic w + +kafkaTopicName :: !KafkaTopic -> String +kafkaTopicName topic = rd_kafka_topic_name topic + +produceKafkaMessage :: !KafkaPartition !(Maybe String) !String !KafkaTopic !*World -> (!MaybeError String (), !*World) +produceKafkaMessage partition key msg topic w + # flags = 2 /* F_COPY to copy payload */ + = (rd_kafka_produce topic partition flags msg key, w) + +subscribeToKafkaTopics :: ![(String,KafkaPartition)] !KafkaClient !*World -> (!MaybeError String (), !*World) +subscribeToKafkaTopics topics client w + # list = foldl + (\l (t,p) + # r = rd_kafka_topic_partition_list_add l t p + | r <> r // force evaluation + -> l + -> l) + (rd_kafka_topic_partition_list_new (length topics)) + topics + # err = rd_kafka_subscribe client list + # w = forceEval err w + # w = rd_kafka_topic_partition_list_destroy list w + = (err, w) + +redirectKafkaPollQueue :: !KafkaClient !*World -> (!MaybeError String (), !*World) +redirectKafkaPollQueue client w = (rd_kafka_poll_set_consumer client, w) + +pollKafkaConsumer :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (Maybe KafkaMessage), !*World) +pollKafkaConsumer block client w + # mbMsg = rd_kafka_consumer_poll client block + | mbMsg=:(Error _) + = (liftError mbMsg, w) + # mbMsg = fromOk mbMsg + | mbMsg=:Nothing + = (Ok Nothing, w) + # ptr = fromJust mbMsg + payload = readInt ptr KM_Payload + key = readInt ptr KM_Key + # msg = + { topic = readInt ptr KM_Topic + , partition = readInt ptr KM_Partition + , offset = readInt ptr KM_Offset + , payload = if (payload == 0) "" (derefCharArray payload (readInt ptr KM_Len)) + , key = if (key == 0) "" (derefCharArray key (readInt ptr KM_KeyLen)) + } + # msg = rd_kafka_message_destroy ptr msg + = (Ok (Just msg), w) diff --git a/src/libraries/OS-Independent/Message/_Kafka.dcl b/src/libraries/OS-Independent/Message/_Kafka.dcl new file mode 100644 index 00000000..b039e387 --- /dev/null +++ b/src/libraries/OS-Independent/Message/_Kafka.dcl @@ -0,0 +1,32 @@ +definition module Message._Kafka + +/** + * Low-level bindings for `Message.Kafka`. + */ + +from StdInt import IF_INT_64_OR_32 +from StdMaybe import :: Maybe +from Data.Error import :: MaybeError +from System._Pointer import :: Pointer + +rd_kafka_conf_new :: Pointer +rd_kafka_conf_set :: !Pointer !String !String -> MaybeError String Pointer +rd_kafka_consumer_close :: !Pointer !.a -> .a +rd_kafka_consumer_poll :: !Pointer !Int -> MaybeError String (Maybe Pointer) +rd_kafka_destroy :: !Pointer !.a -> .a +rd_kafka_err2str :: !Int -> String +rd_kafka_flush :: !Pointer !Int -> MaybeError Int () +rd_kafka_last_error :: Int +rd_kafka_message_destroy :: !Pointer !.a -> .a +rd_kafka_new :: !Int !Pointer -> MaybeError String Pointer +rd_kafka_poll :: !Pointer !Int -> Int +rd_kafka_poll_set_consumer :: !Pointer -> MaybeError String () +rd_kafka_produce :: !Pointer !Int !Int !String !(Maybe String) -> MaybeError String () +rd_kafka_produce_batch :: !Pointer !Int !Int !{#Int} !Int -> Int +rd_kafka_subscribe :: !Pointer !Pointer -> MaybeError String () +rd_kafka_topic_destroy :: !Pointer !.a -> .a +rd_kafka_topic_name :: !Pointer -> String +rd_kafka_topic_new :: !Pointer !String !Pointer -> MaybeError String Pointer +rd_kafka_topic_partition_list_add :: !Pointer !String !Int -> Pointer +rd_kafka_topic_partition_list_destroy :: !Pointer !.a -> .a +rd_kafka_topic_partition_list_new :: !Int -> Pointer diff --git a/src/libraries/OS-Independent/Message/_Kafka.icl b/src/libraries/OS-Independent/Message/_Kafka.icl new file mode 100644 index 00000000..43d33d33 --- /dev/null +++ b/src/libraries/OS-Independent/Message/_Kafka.icl @@ -0,0 +1,200 @@ +implementation module Message._Kafka + +import StdEnv + +import Data.Error +import Data.Maybe +import System._Pointer + +import code from library "-lrdkafka" + +ERRSTR_SIZE :== 512 + +errstr :: String +errstr =: createArray ERRSTR_SIZE '\0' + +rd_kafka_conf_new :: Pointer +rd_kafka_conf_new = code { + ccall rd_kafka_conf_new ":p" +} + +rd_kafka_conf_set :: !Pointer !String !String -> MaybeError String Pointer +rd_kafka_conf_set conf key val + # r = set conf (packString key) (packString val) errstr ERRSTR_SIZE + | r == 0 + = Ok conf + = Error (unpackString errstr) +where + set :: !Pointer !String !String !String !Int -> Int + set _ _ _ _ _ = code { + ccall rd_kafka_conf_set "psssI:I" + } + +rd_kafka_consumer_close :: !Pointer !.a -> .a +rd_kafka_consumer_close rk _ = code { + ccall rd_kafka_consumer_close "p:V:A" +} + +// - Returns `Ok Nothing` when the result of poll is 0 +// - Returns `Error` when a message with `err` set is returned +// - Otherwise, returns the pointer to the message +// +// In other words, for a `Ok (Just ..)` it can be assumed that `err = 0`. +rd_kafka_consumer_poll :: !Pointer !Int -> MaybeError String (Maybe Pointer) +rd_kafka_consumer_poll rk timeout + # rkw = poll rk timeout + | rkw == 0 + = Ok Nothing + # err = derefInt rkw + | err == 0 + = Ok (Just rkw) + = Error (rd_kafka_err2str err) +where + poll :: !Pointer !Int -> Pointer + poll _ _ = code { + ccall rd_kafka_consumer_poll "pI:p" + } + +rd_kafka_destroy :: !Pointer !.a -> .a +rd_kafka_destroy rk _ = code { + ccall rd_kafka_destroy "p:V:A" +} + +rd_kafka_err2str :: !Int -> String +rd_kafka_err2str err = derefString (get err) +where + get :: !Int -> Pointer + get _ = code { + ccall rd_kafka_err2str "I:p" + } + +rd_kafka_flush :: !Pointer !Int -> MaybeError Int () +rd_kafka_flush rk timeout + # r = flush rk timeout + | r == 0 + = Ok () + = Error r +where + flush :: !Pointer !Int -> Int + flush _ _ = code { + ccall rd_kafka_flush "GpI:I" + } + +rd_kafka_last_error :: Int +rd_kafka_last_error = code { + ccall rd_kafka_last_error ":I" +} + +rd_kafka_message_destroy :: !Pointer !.a -> .a +rd_kafka_message_destroy msg _ = code { + ccall rd_kafka_message_destroy "p:V:A" +} + +// type: 0 for producer, 1 for consumer +rd_kafka_new :: !Int !Pointer -> MaybeError String Pointer +rd_kafka_new type conf + # rk = new type conf errstr ERRSTR_SIZE + | rk <> 0 + = Ok rk + = Error (unpackString errstr) +where + new :: !Int !Pointer !String !Int -> Pointer + new _ _ _ _ = code { + ccall rd_kafka_new "IpsI:p" + } + +rd_kafka_poll :: !Pointer !Int -> Int +rd_kafka_poll rk timeout = code { + ccall rd_kafka_poll "GpI:I" +} + +rd_kafka_poll_set_consumer :: !Pointer -> MaybeError String () +rd_kafka_poll_set_consumer client + # err = set client + | err == 0 + = Ok () + = Error (rd_kafka_err2str err) +where + set :: !Pointer -> Int + set _ = code { + ccall rd_kafka_poll_set_consumer "p:I" + } + +rd_kafka_produce :: !Pointer !Int !Int !String !(Maybe String) -> MaybeError String () +rd_kafka_produce rkt partition flags payload key + # key = fromMaybe "" key + # r = produce rkt partition flags payload (size payload) key (size key) 0 + | r == 0 + = Ok () + # err = rd_kafka_last_error + | err == err // force evaluation + = Error (rd_kafka_err2str err) + = abort "internal error in rd_kafka_produce\n" +where + produce :: !Pointer !Int !Int !String !Int !String !Int !Pointer -> Int + produce _ _ _ _ _ _ _ _ = code { + ccall rd_kafka_produce "pIIsIsIp:I" + } + +rd_kafka_produce_batch :: !Pointer !Int !Int !{#Int} !Int -> Int +rd_kafka_produce_batch rkt partition flags messages cnt = code { + ccall rd_kafka_produce_batch "pIIAI:I" +} + +rd_kafka_subscribe :: !Pointer !Pointer -> MaybeError String () +rd_kafka_subscribe rk list + # err = subscribe rk list + | err == 0 + = Ok () + = Error (rd_kafka_err2str err) +where + subscribe :: !Pointer !Pointer -> Int + subscribe _ _ = code { + ccall rd_kafka_subscribe "pp:I" + } + +rd_kafka_topic_destroy :: !Pointer !.a -> .a +rd_kafka_topic_destroy topic _ = code { + ccall rd_kafka_topic_destroy "p:V:A" +} + +rd_kafka_topic_name :: !Pointer -> String +rd_kafka_topic_name topic = derefString (get topic) +where + get :: !Pointer -> Pointer + get _ = code { + ccall rd_kafka_topic_name "p:p" + } + +rd_kafka_topic_new :: !Pointer !String !Pointer -> MaybeError String Pointer +rd_kafka_topic_new rk topic conf + # topic = new rk (packString topic) conf + | topic <> 0 + = Ok topic + # err = rd_kafka_last_error + | err == err // force evaluation + = Error (rd_kafka_err2str err) + = abort "internal error in rd_kafka_topic_new\n" +where + new :: !Pointer !String !Pointer -> Pointer + new _ _ _ = code { + ccall rd_kafka_topic_new "psp:p" + } + +rd_kafka_topic_partition_list_add :: !Pointer !String !Int -> Pointer +rd_kafka_topic_partition_list_add list topic partition = add list (packString topic) partition +where + add :: !Pointer !String !Int -> Pointer + add _ _ _ = code { + ccall rd_kafka_topic_partition_list_add "psI:p" + } + +rd_kafka_topic_partition_list_destroy :: !Pointer !.a -> .a +rd_kafka_topic_partition_list_destroy list _ = code { + ccall rd_kafka_topic_partition_list_destroy "p:V:A" +} + +rd_kafka_topic_partition_list_new :: !Int -> Pointer +rd_kafka_topic_partition_list_new cnt = code { + ccall rd_kafka_topic_partition_list_new "I:p" +} diff --git a/tests/linux64/test.icl b/tests/linux64/test.icl index 859dc273..2bf0fd4b 100644 --- a/tests/linux64/test.icl +++ b/tests/linux64/test.icl @@ -167,6 +167,8 @@ import qualified Internet.HTTP.CGI import qualified Internet.IRC import qualified Math.Geometry import qualified Math.Random +import qualified Message._Kafka +import qualified Message.Kafka import qualified Network.IP import qualified System.CommandLine import qualified System.Directory -- GitLab