• 大小: 80KB
    文件类型: .zip
    金币: 2
    下载: 3 次
    发布日期: 2021-06-09
  • 语言: C#
  • 标签: C#  kafka  net  

资源简介

该demo是C#中怎样使用kafka的demo,将demo中的Program.cs中的配置server的IP地址改成本机,即可运行

资源截图

代码片段和文件信息

using System;
using System.Text;
using System.Collections.Generic;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace Kafka.Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            if (string.IsNullOrEmpty(args[0]))
            {
                Console.WriteLine(“consume>cmd [test-consume]“);
                Console.WriteLine(“produce>cmd [test-produce]“);
            }

            Console.WriteLine(args[0]);
            if (args[0].Equals(“test-consume“))
            {
                Consume(“testconn“);
            }
            else if (!string.IsNullOrEmpty(args[0]))
            {
                Consume(args[0]);
            }


            if (args[0].Equals(“test-produce“))
            {
                Produce(“testconn“);
            }
        }

        static void Produce(string myTopic)
        {
            var config = new Dictionaryject>
                {
                    { “bootstrap.servers“ “10.37.36.96:909210.37.36.97:909210.37.36.103:9092“ }
                };

            using (var producer = new Producer(config null new StringSerializer(Encoding.UTF8)))
            {
                var dr = producer.ProduceAsync(myTopic null “test message text“).Result;
                Console.WriteLine($“Delivered ‘{dr.Value}‘ to: {dr.TopicPartitionOffset}“);
            }
        }
        static void Consume(string myTopic)
        {
            var count = 0;
            bool canceled = false;
            var conf = new Dictionaryject>
            {
                { “group.id“ “test-consumer-group“ }
                { “bootstrap.servers“ “10.37.36.96:909210.37.36.97:909210.37.36.103:9092“ }
                { “auto.commit.interval.ms“ 5000 }
                { “auto.offset.reset“ “earliest“ }
            };

            using (var consumer = new Consumer(conf null new StringDeserializer(Encoding.UTF8)))
            {
                consumer.OnMessage += (_ msg)
                  =>
                {
                    count++;
                    Console.WriteLine($“{msg.Topic}.{msg.Partition}.{msg.Offset}“);
                    Console.WriteLine($“Value:{msg.Value}“);
                    //Console.WriteLine($“Read ‘{msg.Value}‘ from: {msg.TopicPartitionOffset}“);
                };

                consumer.onerror += (_ error)
                  =>
                {
                    canceled = true;
                    Console.WriteLine($“Error: {error}“);
                };

                consumer.OnConsumeError += (_ msg)
                  =>
                {
                    canceled = true;
                    Console.WriteLine($“Consume error ({msg.TopicPartitionOffset}): {msg.Error}“);
                };

                //consumer.Subscribe(“my-topic“);
                consumer.Subscribe(myTopic);

  

 属性            大小     日期    时间   名称
----------- ---------  ---------- -----  ----
     目录           0  2018-08-24 13:57  Kafka.Demo\
     目录           0  2018-08-24 13:57  Kafka.Demo\.vscode\
     文件        1206  2018-08-24 13:57  Kafka.Demo\.vscode\launch.json
     文件         322  2018-08-24 13:57  Kafka.Demo\.vscode\tasks.json
     目录           0  2018-08-24 13:56  Kafka.Demo\bin\
     目录           0  2018-08-24 13:56  Kafka.Demo\bin\Debug\
     目录           0  2018-08-24 14:02  Kafka.Demo\bin\Debug\netcoreapp2.0\
     文件       12900  2018-08-24 14:02  Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.deps.json
     文件        7680  2018-08-24 15:24  Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.dll
     文件        1132  2018-08-24 15:24  Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.pdb
     文件         244  2018-08-24 14:02  Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.runtimeconfig.dev.json
     文件         154  2018-08-24 14:02  Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.runtimeconfig.json
     文件         272  2018-08-24 13:58  Kafka.Demo\Kafka.Demo.csproj
     目录           0  2018-08-24 14:02  Kafka.Demo\obj\
     目录           0  2018-08-24 13:56  Kafka.Demo\obj\Debug\
     目录           0  2018-08-24 14:02  Kafka.Demo\obj\Debug\netcoreapp2.0\
     文件        1121  2018-08-24 13:57  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.AssemblyInfo.cs
     文件          42  2018-08-24 13:57  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.AssemblyInfoInputs.cache
     文件          42  2018-08-24 14:02  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csproj.CoreCompileInputs.cache
     文件        1145  2018-08-24 14:02  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csproj.FileListAbsolute.txt
     文件      493934  2018-08-24 13:59  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csprojResolveAssemblyReference.cache
     文件        7680  2018-08-24 15:24  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.dll
     文件        1132  2018-08-24 15:24  Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.pdb
     文件         149  2018-08-24 14:02  Kafka.Demo\obj\Kafka.Demo.csproj.nuget.cache
     文件        1588  2018-08-24 14:02  Kafka.Demo\obj\Kafka.Demo.csproj.nuget.g.props
     文件         981  2018-08-24 13:56  Kafka.Demo\obj\Kafka.Demo.csproj.nuget.g.targets
     文件      101477  2018-08-24 14:02  Kafka.Demo\obj\project.assets.json
     文件        3180  2018-08-24 15:53  Kafka.Demo\Program.cs

评论

共有 条评论