Skip to main content

Genel topoloji

Tüm servisler tek bir topic exchange (integration_event_bus) paylaşır. Publisher routing key ile yayınlar; her servis kendi shared queue’sunu ilgili routing key’lere bind eder.

Exchange ve routing key

MassTransitRabbitMqOptions (BuildingBlocks.EventBus.MassTransit.RabbitMq):
AyarDefaultAnlamı
ExchangeNameintegration_event_busTüm servislerin paylaştığı topic exchange. Cross-service için aynı isimle hizalanmalı.
ExchangeTypetopicRouting key pattern eşleştirmesi.
RoutingKeyPrefixintegration.eventRouting key ön eki.
Routing key formatı: integration.event.{EventName}. Örn. UserCreatedIntegrationEventintegration.event.UserCreatedIntegrationEvent (event IBusEvent ise GetEventKey(), değilse tip adı).
cfg.Message<IntegrationEvent>(m => m.SetEntityName(rabbitOptions.ExchangeName));
cfg.Publish<IntegrationEvent>(p => p.ExchangeType = rabbitOptions.ExchangeType);

Queue ve binding

Queue adı EventBusOptions.SubscriptionClientName’den gelir (bu projede DiyanetCleanArchitecture). Tüm servis tek shared queue kullanır; abone olunan her event tipi için ayrı binding kurulur:
cfg.ReceiveEndpoint(eventBusOptions.SubscriptionClientName, e =>
{
    e.ConcurrentMessageLimit = eventBusOptions.ConcurrentMessageLimit;   // paralel işleme limiti
    e.PrefetchCount          = eventBusOptions.ConcurrentMessageLimit;   // aynı değer: dağıtım dengeli

    // ... retry / redelivery ...

    var uniqueEventTypes = subscriptionInfo.EventTypes.Values.Distinct();
    foreach (var eventEntry in uniqueEventTypes)
    {
        e.Bind(rabbitOptions.ExchangeName, s =>
        {
            s.RoutingKey   = $"{rabbitOptions.RoutingKeyPrefix}.{eventEntry}";
            s.ExchangeType = rabbitOptions.ExchangeType;
        });

        var consumerType = typeof(IntegrationEventConsumer<>).MakeGenericType(eventEntry);
        e.Consumer(consumerType, type => context.CreateScope().ServiceProvider.GetRequiredService(type));
    }
});
ConcurrentMessageLimit ile PrefetchCount bilerek eşit tutulur: multi-instance senaryoda tek node tüm mesajları prefetch edip diğerlerini aç bırakmasın, kuyruk dağıtımı dengeli kalsın. Default ConcurrentMessageLimit = 10 (EventBusOptions).

İki katmanlı retry + redelivery

Mesaj fail olduğunda iki katman devreye girer. Sıra kritik: UseDelayedRedelivery ÖNCE eklenir, UseMessageRetry SONRA — MassTransit zinciri “outer → inner” değerlendirir, yani önce in-process retry tüketilir, sonra delayed redelivery devreye girer.
// Dış katman: mesaj kuyruktan ayrılır, delayed exchange üzerinden gecikmeli geri konur.
if (rabbitOptions.RedeliveryIntervalsMinutes is { Length: > 0 } redeliveryMinutes)
{
    e.UseDelayedRedelivery(r => r.Intervals(
        redeliveryMinutes.Select(min => TimeSpan.FromMinutes(min)).ToArray()));
}

// İç katman: her redelivery'de in-process kısa retry.
e.UseMessageRetry(r => r.Interval(
    rabbitOptions.RetryCount,                               // 3
    TimeSpan.FromSeconds(rabbitOptions.RetryIntervalSeconds))); // 2s
KatmanAyarDefaultDavranış
In-process retryRetryCount × RetryIntervalSeconds3 × 2sMesaj kuyruktan ayrılmaz, aynı consumer’da bekler.
Delayed redeliveryRedeliveryIntervalsMinutes[0.16, 1, 5] dk (≈10s, 1dk, 5dk)Mesaj delayed exchange üzerinden gecikmeli ana kuyruğa geri konur.
Toplam max deneme = (RetryCount + 1) × (RedeliveryIntervalsMinutes.Length + 1) = 4 × 4 = 16. Tüm denemeler tükenince mesaj kalıcı olarak {queue}_error (DLX) kuyruğuna düşer ve orada durur — sonsuz döngü mümkün değildir (listeler sınırlı).
Delayed redelivery RabbitMQ rabbitmq_delayed_message_exchange plugin’ini gerektirir (docker/rabbitmq/enabled_plugins üzerinden enable edilir). cfg.UseDelayedMessageScheduler() bunu altyapı olarak kullanır; plugin yoksa redelivery beklenen şekilde çalışmaz.

Dirençli başlatma

Bus, broker’a bağlanamasa bile uygulamayı crash etmez:
services.AddOptions<MassTransitHostOptions>().Configure(o =>
{
    o.WaitUntilStarted = false;          // RabbitMQ down ise crash etme, arka planda reconnect dene
    o.StartTimeout     = TimeSpan.Zero;  // bağlantı denemesi beklenmeden geçilir
    o.StopTimeout      = TimeSpan.FromSeconds(30);
});
Health check rabbitmq adıyla kaydedilir; broker down olduğunda status Degraded (Unhealthy değil), çünkü outbox sayesinde mesajlar kuyrukta birikir ve API çalışmaya devam eder.

Konfigürasyon

{
  "MassTransitRabbitMq": {
    "Host": "rabbitmq",
    "VirtualHost": "/",
    "Username": "guest",
    "Password": "guest",
    "ExchangeName": "integration_event_bus",
    "ExchangeType": "topic",
    "RoutingKeyPrefix": "integration.event",
    "RetryCount": 3,
    "RetryIntervalSeconds": 2,
    "RedeliveryIntervalsMinutes": [0.16, 1, 5]
  },
  "EventBus": {
    "SubscriptionClientName": "DiyanetCleanArchitecture",
    "ConcurrentMessageLimit": 10
  }
}

İlgili

Outbox Pattern

Mesajın exchange’e gelmeden önceki yolu.

Integration Events

Event tipi, routing key ve consumer.

Event Akışı

Uçtan uca event akışı.

Event Bus

EventBus building block.