Verified Commit 209db6d1 authored by Camil Staps's avatar Camil Staps 🙂

Add wrapper around basic functionality in librdkafka

parent 89b6ae51
Pipeline #42251 passed with stage
in 2 minutes and 9 seconds
......@@ -2,7 +2,7 @@ test-nightly:
before_script:
- install_clean.sh bundle-complete
- apt-get update -qq
- apt-get install -y -qq build-essential git coreutils libmariadb-dev libsnappy-dev libsqlite3-dev
- apt-get install -y -qq build-essential git coreutils libmariadb-dev librdkafka-dev libsnappy-dev libsqlite3-dev
- make -C src/cdeps install
......
definition module Message.Kafka
/**
* Bindings for Apache Kafka. See:
*
* - https://kafka.apache.org/intro.html
* - https://github.com/edenhill/librdkafka
* - https://github.com/edenhill/librdkafka/wiki/Language-bindings-development
*
* Not all functionality has been implemented yet.
*
* A basic producer can be implemented as follows, based on
* https://github.com/edenhill/librdkafka/blob/master/examples/producer.c:
*
* ```clean
* //* Create a producer and write "payload" to the topic "topic".
* producer :: !*World -> *World
* producer w
* # c = newKafkaClient KafkaProducer [("bootstrap.servers", "localhost")]
* | c=:(Error _) = abort (fromError c+++"\n")
* # c = fromOk c
* # t = newKafkaTopic "topic" c
* | t=:(Error _) = abort (fromError t+++"\n")
* # t = fromOk t
* # (err,w) = produceKafkaMessage KafkaUnassignedPartition Nothing "payload" t w
* | err=:(Error _) = abort (fromError err+++"\n")
* # (err,w) = flushKafkaClient 1000 c w
* | err=:(Error _) = abort (fromError err+++"\n")
* | otherwise = w
* ```
*
* A basic producer can be implemented as follows, based on
* https://github.com/edenhill/librdkafka/blob/master/examples/consumer.c:
*
* ```clean
* //* Print messages from the topic "topic" to stdout.
* consumer :: !*World -> *World
* consumer w
* # c = newKafkaClient KafkaConsumer
* [ ("bootstrap.servers", "localhost")
* , ("group.id", "group")
* , ("auto.offset.reset", "earliest")
* ]
* | c=:(Error _) = abort (fromError c+++"\n")
* # c = fromOk c
* # (err, w) = redirectKafkaPollQueue c w
* | err=:(Error _) = abort (fromError err+++"\n")
* # (err, w) = subscribeToKafkaTopics [("topic", KafkaUnassignedPartition)] c w
* | err=:(Error _) = abort (fromError err+++"\n")
* # w = loop c w
* # w = closeKafkaConsumer c w
* # w = destroyKafkaClient c w
* = w
* where
* loop c w
* # (mbMsg,w) = pollKafkaConsumer 100 c w
* | mbMsg=:(Error _)
* # err = fromError mbMsg
* | err == "Broker: No more messages" // ignore this error
* = loop c w
* # (_,w) = fclose (stderr <<< err <<< "\n") w
* = loop c w
* # mbMsg = fromOk mbMsg
* | mbMsg=:Nothing
* = loop c w
* # msg = fromJust mbMsg
* # (io,w) = stdio w
* # io = io <<< "Message: " <<< msg.payload <<< "\n"
* # (_,w) = fclose io w
* = loop c w
* ```
*/
from StdMaybe import :: Maybe
from Data.Error import :: MaybeError
from System._Pointer import :: Pointer
:: KafkaClientType
= KafkaConsumer
| KafkaProducer
:: KafkaClient (:== Pointer)
:: KafkaTopic (:== Pointer)
:: KafkaMessage =
{ topic :: !KafkaTopic
, partition :: !KafkaPartition
, offset :: !Int //* NB: on 32-bit systems, the upper 32 bits of the offset are lost.
, payload :: !String
, key :: !String
}
/**
* Messages can be sent to a particular (non-negative) partition identifier, or
* to the unassigned partition `KafkaUnassignedPartition`.
*/
:: KafkaPartition :== Int
KafkaUnassignedPartition :== -1
/**
* Polling can either:
*
* - Not block at all (`KafkaNoBlock`)
* - Block until new events are available (`KafkaNoTimeout`)
* - Block for a certain time only (given in ms)
*/
:: KafkaBlockSetting :== Int
KafkaNoBlock :== 0
KafkaNoTimeout :== -1
/**
* Creates a new client given a list of settings.
*
* @param The kind of client.
* @param Setings.
* @result A new `KafkaClient`, or an error on failure.
*/
newKafkaClient :: !KafkaClientType ![(String,String)] -> MaybeError String KafkaClient
/**
* Let a consumer leave the group gracefully before destroying it with
* `destroyKafkaClient`.
*/
closeKafkaConsumer :: !KafkaClient !*World -> *World
/**
* Destroy a `KafkaClient`.
*
* Before destroying a client, all remaining messages should be flushed with
* `flushKafkaClient` and a reasonably long timeout.
*
* A consumer should be closed with `closeKafkaConsumer` before destroying it.
*/
destroyKafkaClient :: !KafkaClient !*World -> *World
/**
* Poll for new events.
*
* This function should be called regularly.
*
* @param Whether to block and for how long.
* @param The client.
* @result The number of events served.
*/
pollKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!Int, !*World)
/**
* Attempt to flush all remaning events.
*
* This function should be called before destroying the client with
* `destroyKafkaClient`.
*
* @param Whether to block and for how long.
* @param The client.
* @result The number of events served.
*/
flushKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (), !*World)
/**
* Create a new topic.
*
* Topics are refcounted internally. Creating a new topic with a name that has
* already been used previously will return the previous handle.
*
* NB: configurations (`rd_kafka_topic_conf_set`) are not supported yet.
*
* @param The topic name.
* @param The client.
* @result The new topic.
*/
newKafkaTopic :: !String !KafkaClient -> MaybeError String KafkaTopic
/**
* Destroy a topic.
*
* When a topic has been 'created' multiple times (i.e. `newKafkaTopic` was
* used multiple times with the same name), this does not actually free
* resources but only decreases the topic's refcount.
*/
destroyKafkaTopic :: !KafkaTopic !*World -> *World
//* Get the name of a topic.
kafkaTopicName :: !KafkaTopic -> String
/**
* Produce and queue a message.
*
* `pollKafkaClient` must be called after this function to ensure the message
* is sent as well.
*
* NB: for now it is not possible to check whether a message has been sent
* successfully.
*
* @param The partition to send to, or `KafkaUnassignedPartition`.
* @param An optional message key.
* @param The message payload.
* @param The topic to send to.
* @result On failure, a human-readable error message is returned.
*/
produceKafkaMessage :: !KafkaPartition !(Maybe String) !String !KafkaTopic !*World -> (!MaybeError String (), !*World)
/**
* Subscribe to a list of topics.
*
* @param The topics to subscribe to.
* @param The client (should be a `KafkaConsumer`).
* @result On failure, a human-readable error message is returned.
*/
subscribeToKafkaTopics :: ![(String,KafkaPartition)] !KafkaClient !*World -> (!MaybeError String (), !*World)
/**
* Redirect the main Kafka poll queue to the poll queue of a particular client
* (should be a `KafkaConsumer`). It is not permitted to use `pollKafkaClient`
* after `redirectKafkaPollQueue`.
*/
redirectKafkaPollQueue :: !KafkaClient !*World -> (!MaybeError String (), !*World)
/**
* Poll for new events.
*
* This function should be called regularly.
*
* @param Whether to block and for how long.
* @param The client.
* @result The number of events served.
*/
pollKafkaConsumer :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (Maybe KafkaMessage), !*World)
implementation module Message.Kafka
import StdEnv
import StdMaybe
import Control.Monad
import Data.Error
import Message._Kafka
import System._Pointer
import qualified Text
from Text import class Text, instance Text String
:: KafkaClient :== Pointer
:: KafkaTopic :== Pointer
// Offsets to the message struct
KM_Err :== 0
KM_Topic :== IF_INT_64_OR_32 8 4
KM_Partition :== IF_INT_64_OR_32 16 8
KM_Payload :== IF_INT_64_OR_32 24 12
KM_Len :== IF_INT_64_OR_32 32 16
KM_Key :== IF_INT_64_OR_32 40 20
KM_KeyLen :== IF_INT_64_OR_32 48 24
KM_Offset :== IF_INT_64_OR_32 56 28
KM_Private :== IF_INT_64_OR_32 64 36
newKafkaClient :: !KafkaClientType ![(String,String)] -> MaybeError String KafkaClient
newKafkaClient type settings =
foldM
(\c (k,v) -> rd_kafka_conf_set c k v)
rd_kafka_conf_new
settings >>=
rd_kafka_new (if type=:KafkaProducer 0 1)
closeKafkaConsumer :: !KafkaClient !*World -> *World
closeKafkaConsumer client w = rd_kafka_consumer_close client w
destroyKafkaClient :: !KafkaClient !*World -> *World
destroyKafkaClient client w = rd_kafka_destroy client w
pollKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!Int, !*World)
pollKafkaClient block client w
# nevents = rd_kafka_poll client block
= (nevents, w)
flushKafkaClient :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (), !*World)
flushKafkaClient block client w
# ok = rd_kafka_flush client block
| ok=:(Error _)
= (Error (rd_kafka_err2str (fromError ok)), w)
= (Ok (), w)
newKafkaTopic :: !String !KafkaClient -> MaybeError String KafkaTopic
newKafkaTopic topic client = rd_kafka_topic_new client topic 0
destroyKafkaTopic :: !KafkaTopic !*World -> *World
destroyKafkaTopic topic w = rd_kafka_topic_destroy topic w
kafkaTopicName :: !KafkaTopic -> String
kafkaTopicName topic = rd_kafka_topic_name topic
produceKafkaMessage :: !KafkaPartition !(Maybe String) !String !KafkaTopic !*World -> (!MaybeError String (), !*World)
produceKafkaMessage partition key msg topic w
# flags = 2 /* F_COPY to copy payload */
= (rd_kafka_produce topic partition flags msg key, w)
subscribeToKafkaTopics :: ![(String,KafkaPartition)] !KafkaClient !*World -> (!MaybeError String (), !*World)
subscribeToKafkaTopics topics client w
# list = foldl
(\l (t,p)
# r = rd_kafka_topic_partition_list_add l t p
| r <> r // force evaluation
-> l
-> l)
(rd_kafka_topic_partition_list_new (length topics))
topics
# err = rd_kafka_subscribe client list
# w = forceEval err w
# w = rd_kafka_topic_partition_list_destroy list w
= (err, w)
redirectKafkaPollQueue :: !KafkaClient !*World -> (!MaybeError String (), !*World)
redirectKafkaPollQueue client w = (rd_kafka_poll_set_consumer client, w)
pollKafkaConsumer :: !KafkaBlockSetting !KafkaClient !*World -> (!MaybeError String (Maybe KafkaMessage), !*World)
pollKafkaConsumer block client w
# mbMsg = rd_kafka_consumer_poll client block
| mbMsg=:(Error _)
= (liftError mbMsg, w)
# mbMsg = fromOk mbMsg
| mbMsg=:Nothing
= (Ok Nothing, w)
# ptr = fromJust mbMsg
payload = readInt ptr KM_Payload
key = readInt ptr KM_Key
# msg =
{ topic = readInt ptr KM_Topic
, partition = readInt ptr KM_Partition
, offset = readInt ptr KM_Offset
, payload = if (payload == 0) "" (derefCharArray payload (readInt ptr KM_Len))
, key = if (key == 0) "" (derefCharArray key (readInt ptr KM_KeyLen))
}
# msg = rd_kafka_message_destroy ptr msg
= (Ok (Just msg), w)
definition module Message._Kafka
/**
* Low-level bindings for `Message.Kafka`.
*/
from StdInt import IF_INT_64_OR_32
from StdMaybe import :: Maybe
from Data.Error import :: MaybeError
from System._Pointer import :: Pointer
rd_kafka_conf_new :: Pointer
rd_kafka_conf_set :: !Pointer !String !String -> MaybeError String Pointer
rd_kafka_consumer_close :: !Pointer !.a -> .a
rd_kafka_consumer_poll :: !Pointer !Int -> MaybeError String (Maybe Pointer)
rd_kafka_destroy :: !Pointer !.a -> .a
rd_kafka_err2str :: !Int -> String
rd_kafka_flush :: !Pointer !Int -> MaybeError Int ()
rd_kafka_last_error :: Int
rd_kafka_message_destroy :: !Pointer !.a -> .a
rd_kafka_new :: !Int !Pointer -> MaybeError String Pointer
rd_kafka_poll :: !Pointer !Int -> Int
rd_kafka_poll_set_consumer :: !Pointer -> MaybeError String ()
rd_kafka_produce :: !Pointer !Int !Int !String !(Maybe String) -> MaybeError String ()
rd_kafka_produce_batch :: !Pointer !Int !Int !{#Int} !Int -> Int
rd_kafka_subscribe :: !Pointer !Pointer -> MaybeError String ()
rd_kafka_topic_destroy :: !Pointer !.a -> .a
rd_kafka_topic_name :: !Pointer -> String
rd_kafka_topic_new :: !Pointer !String !Pointer -> MaybeError String Pointer
rd_kafka_topic_partition_list_add :: !Pointer !String !Int -> Pointer
rd_kafka_topic_partition_list_destroy :: !Pointer !.a -> .a
rd_kafka_topic_partition_list_new :: !Int -> Pointer
implementation module Message._Kafka
import StdEnv
import Data.Error
import Data.Maybe
import System._Pointer
import code from library "-lrdkafka"
ERRSTR_SIZE :== 512
errstr :: String
errstr =: createArray ERRSTR_SIZE '\0'
rd_kafka_conf_new :: Pointer
rd_kafka_conf_new = code {
ccall rd_kafka_conf_new ":p"
}
rd_kafka_conf_set :: !Pointer !String !String -> MaybeError String Pointer
rd_kafka_conf_set conf key val
# r = set conf (packString key) (packString val) errstr ERRSTR_SIZE
| r == 0
= Ok conf
= Error (unpackString errstr)
where
set :: !Pointer !String !String !String !Int -> Int
set _ _ _ _ _ = code {
ccall rd_kafka_conf_set "psssI:I"
}
rd_kafka_consumer_close :: !Pointer !.a -> .a
rd_kafka_consumer_close rk _ = code {
ccall rd_kafka_consumer_close "p:V:A"
}
// - Returns `Ok Nothing` when the result of poll is 0
// - Returns `Error` when a message with `err` set is returned
// - Otherwise, returns the pointer to the message
//
// In other words, for a `Ok (Just ..)` it can be assumed that `err = 0`.
rd_kafka_consumer_poll :: !Pointer !Int -> MaybeError String (Maybe Pointer)
rd_kafka_consumer_poll rk timeout
# rkw = poll rk timeout
| rkw == 0
= Ok Nothing
# err = derefInt rkw
| err == 0
= Ok (Just rkw)
= Error (rd_kafka_err2str err)
where
poll :: !Pointer !Int -> Pointer
poll _ _ = code {
ccall rd_kafka_consumer_poll "pI:p"
}
rd_kafka_destroy :: !Pointer !.a -> .a
rd_kafka_destroy rk _ = code {
ccall rd_kafka_destroy "p:V:A"
}
rd_kafka_err2str :: !Int -> String
rd_kafka_err2str err = derefString (get err)
where
get :: !Int -> Pointer
get _ = code {
ccall rd_kafka_err2str "I:p"
}
rd_kafka_flush :: !Pointer !Int -> MaybeError Int ()
rd_kafka_flush rk timeout
# r = flush rk timeout
| r == 0
= Ok ()
= Error r
where
flush :: !Pointer !Int -> Int
flush _ _ = code {
ccall rd_kafka_flush "GpI:I"
}
rd_kafka_last_error :: Int
rd_kafka_last_error = code {
ccall rd_kafka_last_error ":I"
}
rd_kafka_message_destroy :: !Pointer !.a -> .a
rd_kafka_message_destroy msg _ = code {
ccall rd_kafka_message_destroy "p:V:A"
}
// type: 0 for producer, 1 for consumer
rd_kafka_new :: !Int !Pointer -> MaybeError String Pointer
rd_kafka_new type conf
# rk = new type conf errstr ERRSTR_SIZE
| rk <> 0
= Ok rk
= Error (unpackString errstr)
where
new :: !Int !Pointer !String !Int -> Pointer
new _ _ _ _ = code {
ccall rd_kafka_new "IpsI:p"
}
rd_kafka_poll :: !Pointer !Int -> Int
rd_kafka_poll rk timeout = code {
ccall rd_kafka_poll "GpI:I"
}
rd_kafka_poll_set_consumer :: !Pointer -> MaybeError String ()
rd_kafka_poll_set_consumer client
# err = set client
| err == 0
= Ok ()
= Error (rd_kafka_err2str err)
where
set :: !Pointer -> Int
set _ = code {
ccall rd_kafka_poll_set_consumer "p:I"
}
rd_kafka_produce :: !Pointer !Int !Int !String !(Maybe String) -> MaybeError String ()
rd_kafka_produce rkt partition flags payload key
# key = fromMaybe "" key
# r = produce rkt partition flags payload (size payload) key (size key) 0
| r == 0
= Ok ()
# err = rd_kafka_last_error
| err == err // force evaluation
= Error (rd_kafka_err2str err)
= abort "internal error in rd_kafka_produce\n"
where
produce :: !Pointer !Int !Int !String !Int !String !Int !Pointer -> Int
produce _ _ _ _ _ _ _ _ = code {
ccall rd_kafka_produce "pIIsIsIp:I"
}
rd_kafka_produce_batch :: !Pointer !Int !Int !{#Int} !Int -> Int
rd_kafka_produce_batch rkt partition flags messages cnt = code {
ccall rd_kafka_produce_batch "pIIAI:I"
}
rd_kafka_subscribe :: !Pointer !Pointer -> MaybeError String ()
rd_kafka_subscribe rk list
# err = subscribe rk list
| err == 0
= Ok ()
= Error (rd_kafka_err2str err)
where
subscribe :: !Pointer !Pointer -> Int
subscribe _ _ = code {
ccall rd_kafka_subscribe "pp:I"
}
rd_kafka_topic_destroy :: !Pointer !.a -> .a
rd_kafka_topic_destroy topic _ = code {
ccall rd_kafka_topic_destroy "p:V:A"
}
rd_kafka_topic_name :: !Pointer -> String
rd_kafka_topic_name topic = derefString (get topic)
where
get :: !Pointer -> Pointer
get _ = code {
ccall rd_kafka_topic_name "p:p"
}
rd_kafka_topic_new :: !Pointer !String !Pointer -> MaybeError String Pointer
rd_kafka_topic_new rk topic conf
# topic = new rk (packString topic) conf
| topic <> 0
= Ok topic
# err = rd_kafka_last_error
| err == err // force evaluation
= Error (rd_kafka_err2str err)
= abort "internal error in rd_kafka_topic_new\n"
where
new :: !Pointer !String !Pointer -> Pointer
new _ _ _ = code {
ccall rd_kafka_topic_new "psp:p"
}
rd_kafka_topic_partition_list_add :: !Pointer !String !Int -> Pointer
rd_kafka_topic_partition_list_add list topic partition = add list (packString topic) partition
where
add :: !Pointer !String !Int -> Pointer
add _ _ _ = code {
ccall rd_kafka_topic_partition_list_add "psI:p"
}
rd_kafka_topic_partition_list_destroy :: !Pointer !.a -> .a
rd_kafka_topic_partition_list_destroy list _ = code {
ccall rd_kafka_topic_partition_list_destroy "p:V:A"
}
rd_kafka_topic_partition_list_new :: !Int -> Pointer
rd_kafka_topic_partition_list_new cnt = code {
ccall rd_kafka_topic_partition_list_new "I:p"
}
......@@ -167,6 +167,8 @@ import qualified Internet.HTTP.CGI
import qualified Internet.IRC
import qualified Math.Geometry
import qualified Math.Random
import qualified Message._Kafka
import qualified Message.Kafka
import qualified Network.IP
import qualified System.CommandLine
import qualified System.Directory
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment