Chinese Yellow Pages | Classifieds | Knowledge | Tax | IME

How to Publish a kafka msg

Kafka from programmer point of view is: just topic, key, value , headers

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

send(topicvalue=Nonekey=Noneheaders=Nonepartition=Nonetimestamp_ms=None)[source]

Publish a message to a topic.

Parameters:
  • topic (str) – topic where the message will be published
  • value (optional) – message value. Must be type bytes, or be serializable to bytes via configured value_serializer. If value is None, key is required and message acts as a ‘delete’. See kafka compaction documentation for more details: https://kafka.apache.org/documentation.html#compaction (compaction requires kafka >= 0.8.1)
  • partition (intoptional) – optionally specify a partition. If not set, the partition will be selected using the configured ‘partitioner’.
  • key (optional) – a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.
  • headers (optional) – a list of header key value pairs. List items are tuples of str key and bytes value.
  • timestamp_ms (intoptional) – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

 

 

Python code for easy understanding

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
...     producer.send('foobar', b'some_message_bytes')
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
...     producer.send('foobar', b'msg %d' % i)

 

On the high level/kafka level, just key, value, headers, in order to send more structured/complicated data, we need specify serializer, which convert the pre-define structured msg into bytes ( that kafak needed at its API level), on the same token, on the consumer side, it need the same de-serilizer to convert bytes into well-defined struct.

 

How to receive kafka msg and de-serialize it

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

classkafka.KafkaConsumer(*topics**configs)[source]

Consume records from a Kafka cluster.

The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

The consumer is not thread safe and should not be shared across threads.

Parameters:

*topics (str) – optional list of topics to subscribe to. If not set, call subscribe() or assign() before consuming records.

Keyword Arguments:
  • bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
  • client_id (str) – A name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
  • group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
  • key_deserializer (callable) – Any callable that takes a raw message key and returns a deserialized key.
  • value_deserializer (callable) – Any callable that takes a raw message value and returns a deserialized value.

 

sample code

import threading
import logging
import time
import jsonfrom kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
    daemon = True    def run(self):
        producer = KafkaProducer(bootstrap_servers='victoria.com:6667',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))        while True:
            producer.send('my-topic', {"dataObjectID": "test1"})
            producer.send('my-topic', {"dataObjectID": "test2"})
            time.sleep(1)
class Consumer(threading.Thread):
    daemon = True    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='victoria.com:6667',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
        consumer.subscribe(['my-topic'])        for message in consumer:
            print (message)

 

 

Headers

Kafka custom header properties enable you to add metadata to the Kafka message, which can then be used during message processing; for example, the header properties can carry information about the format of the data, such as a schema name.