Asos.ServiceBus.MessageSiphon 1.1.17

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet tool install --global Asos.ServiceBus.MessageSiphon --version 1.1.17                
This package contains a .NET tool you can call from the shell/command line.
dotnet new tool-manifest # if you are setting up this repo
dotnet tool install --local Asos.ServiceBus.MessageSiphon --version 1.1.17                
This package contains a .NET tool you can call from the shell/command line.
#tool dotnet:?package=Asos.ServiceBus.MessageSiphon&version=1.1.17                
nuke :add-package Asos.ServiceBus.MessageSiphon --version 1.1.17                

azure-servicebus-message-siphon

A dotnet tool for message siphon scenarios when working with Azure Service Bus, allowing you to purge and replay messages, within a single namespace or across namespaces

What's it for?

You may have cases such as

  • Messages have dead lettered and you want to replay them back onto the main queue\topic
  • You want to clone messages from one entity to another in the same namespace
  • You want to clone and remove messages from one namespace to another
  • You want to select messages by a header value from one entity and move them to another
  • You want to purge messages, deleting from a queue, topic or dead-letter sub entity.
  • You want to use connection strings, RBAC or both for the source and targets. (i.e. connection string for source, RBAC for target)

How does it work?

A configuration file is used that determines the set of work that will be performed. The configuration file is made up of service bus connection details, and a list of siphon work that needs to be performed. The siphon work then references the connections by Name

As the consumer of this package, you can define some standard work you'd like to perform as a set of configuration files, then use them to execute the work against one or many service bus namespaces - just provide the appropriate ConnectionString or FullyQualifiedNamespace at runtime

Configuration details

ReplayMessagesJob - A replay messages just has the following

  • JobType - Required. the type of work to perform. Can be one of :
    • DeadLettersSameEntity - replays messages from the sub-queue back onto the parent entity
    • DeadLettersToDifferentEntity - replays messages from the entity sub-queue, to a different Target entity
    • SourceToTarget - siphons messages from any source to any target entity
    • PurgeSource - purges messages from the source entity, a delete operation
    • PurgeDeadLetters - as per PurgeSource but for dead letter entities
  • JobName - Required. Give the work a useful name, used in log messages
  • NumberOfConcurrentProcesses - A value between 1 and 10. When performing a siphon job, copying messages from source to target,
  • messages are received in batches as per the SourceBatchReceiveSize setting. Once messages are received, they need to be sent to the target and NumberOfConcurrentProcesses controls how many concurrent operations attempt to work through the batch. E.g. if
  • your batch size is 100 and you set NumberOfConcurrentProcesses to 10, then 10 operations will each attempt to process
  • 10 messages in parallel. Experiment with these settings to find the best combination to achieve high throughput.

ServiceBusDetails - a list of service bus connections. A ServiceBusDetail instance has the following

  • Name - give the item a useful name, you'll use this to refer to it from any siphon work you define
  • ConnectionMode - how to connect to the service bus, can be one of ConnectionString or Rbac
  • ConnectionString - if ConnectionMode is ConnectionString, then define the SAS connection string to use
  • FullyQualifiedNamespace = if ConnectionMode is Rbac, then define the fully qualified namespace, e.g. namespace.servicebus.windows.net

SiphonWork - a list of work to perform, Siphon work requires the following

  • SiphonMode - determines the type of operation to perform, which impacts the receive behaviour of the source messages

    • When Clone, the worker will use PeekMessagesAsync so not to increment the delivery count of messages, and the message will not be received or completed. A copy of the message is created
    • When CloneAndDelete, the worker will use ReceiveMessagesAsync, and will attempt to complete the message once the send operation to target has completed OK
    • When Delete, the worker will use ReceiveMessagesAsync, and will attempt to complete the message once the it's received and no copy of a message will be created.
  • SourceConnectionName - the Name of the ServiceBusDetails to use as the source

  • SourceEntity - the name of the entity, can be either a queue or a topic

  • SourceSubscriptions - if SourceEntity is a topic, then define a list of subscription names for the Topic

  • TargetConnectionName - the Name of the ServiceBusDetails to use as the target

  • TargetEntity - the name of the entity to send to, can be either a queue or a topic

  • SourceBatchReceiveSize - the size of the batch to receive messages from the source. Defaults to 20.

  • MessagesOlderThan - An ISO8601 date span, as per the duration specification. E.g. PT72H (72 hours), P30D (30 days). When performing a purge, only messages that have an older enqueue time than this value are considered, allowing you to perform work such as purge messages older than 72 hours

  • HeaderSelector - A filter to use on the message headers to select only matching messages. This has the following limitations

    • Only a single header filter is support - You can filter by one field only, e.g. HeaderName = Value
    • Only =, > and < operators are supported
    • If you use > or < operators, then the field selector must be numeric, e.g. HeaderName > 12345

Example configurations

Replaying service bus messages from dead letters on the subscriptions

In this configuration, we only need to define a single service bus connection, since we're replaying onto the same namespace/entity. Dead letters on the test and test1 subscriptions are received and put back onto the snapshot entity, both subscriptions would receive copies of the messages again

{
    "Logging": {
        "LogLevel": {
            "Default": "Information"
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "DeadLettersSameEntity",
        "JobName": "Replay-Messages-From-DeadLetter-Subqueue",
        "NumberOfConcurrentProcesses": 1,
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "ConnectionString",
                "ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-sas-key"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "CloneAndDelete",
                "SourceConnectionName": "Source",
                "SourceEntity": "snapshot",
                "SourceSubscriptions": [ "test", "test1" ]
            }
        ]
    }
}

Cloning messages from a subscription in one namespace using a connection string, to another namespace using RBAC.

In this example, messages will be received in batches of 40, and 5 concurrent tasks will be used to to send them to the target in parallel (i.e. each task processes 8 messages each)

{
    "Logging": {
        "LogLevel": {
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "SourceToTarget",
        "JobName": "Local Debug",
        "NumberOfConcurrentProcesses": 5,
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "ConnectionString",
                "ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
            },
            {
                "Name": "Target",
                "ConnectionMode": "Rbac",
                "FullyQualifiedNamespace": "namespace1.servicebus.windows.net"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "Clone",
                "SourceConnectionName": "Source",
                "SourceEntity": "topic-entity",
                "SourceSubscriptions": [ "test-subscription" ], 
                "SourceBatchReceiveSize" : 40,
                "TargetConnectionName": "Target",
                "TargetEntity": "copy-of-topic"
            }
        ]
    }
}

Cloning messages from a subscription in one namespace to another topic in the same namespace, using a message selector and age to filter .

In this example, only messages that have header name Payload.MessageType with a value of ProductUpdated which are older than 1 hours will be selected and will be copied from topic-entity to copy-of-topic

{
    "Logging": {
        "LogLevel": {
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "SourceToTargetByMessageSelector",
        "JobName": "Local Debug",
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "ConnectionString",
                "ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "Clone",
                "SourceConnectionName": "Source",
                "SourceEntity": "topic-entity",
                "SourceSubscriptions": [ "test-subscription" ],
                "HeaderSelector": "[Payload.MessageType] = 'ProductUpdated'", 
                "SourceBatchReceiveSize" : 40,
                "TargetConnectionName": "Source",
                "TargetEntity": "copy-of-topic",
                "MessagesOlderThan" : "PT1H"
            }
        ]
    }
}

Using RBAC, receive messages from a dead letter subscription in batches of 100 and purge messages that are older than 1 hour

{
    "Logging": {
        "LogLevel": {
            "Default": "Information"
        }
    },   
    "ReplayMessagesJob": {
        "JobType": "PurgeDeadLetters",
        "JobName": "Purge Debug",
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "Rbac",
                "FullyQualifiedNamespace": "namespace.servicebus.windows.net"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "Delete",
                "SourceConnectionName": "Source",
                "SourceEntity": "demo",
                "SourceSubscriptions": [ "test" ],
                "SourceBatchReceiveSize" : 100,
                "MessagesOlderThan" : "PT1H"
            }
        ]
    }
}

Move messages from a multiple topic subscriptions in one namespace using RBAC, to another namespace using RBAC.

In this example, messages will be received in batches of 40, and 5 concurrent tasks will be used to to send them to the target in parallel (i.e. each task processes 8 messages each)

{
    "Logging": {
        "LogLevel": {
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "SourceToTarget",
        "JobName": "Local Debug",
        "NumberOfConcurrentProcesses": 5,
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "Rbac",
                "ConnectionString": "namespace1.servicebus.windows.net"
            },
            {
                "Name": "Target",
                "ConnectionMode": "Rbac",
                "FullyQualifiedNamespace": "namespace2.servicebus.windows.net"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "CloneAndDelete",
                "SourceConnectionName": "Source",
                "SourceEntity": "topic-1-entity",
                "SourceSubscriptions": [ "subscription-A" ],
                "SourceBatchReceiveSize" : 10,
                "TargetConnectionName": "Target",
                "TargetEntity": "copy-of-topic-1"
            },
            {
              "SiphonMode": "CloneAndDelete",
              "SourceConnectionName": "Source",
              "SourceEntity": "topic-2-entity",
              "SourceSubscriptions": [ "subscription-B" ],
              "SourceBatchReceiveSize" : 10,
              "TargetConnectionName": "Target",
              "TargetEntity": "copy-of-topic-2"
            }          
        ]
    }
}

Add as much work as you think is sensible and each piece of work is executed concurrently.

How to use?

The package is available as a dotnet tool, so you can install from nuget.org by running the following command

dotnet tool update --global Asos.ServiceBus.MessageSiphon ---version x.x.x

Once installed, you have a command available to you siphon-asb-messages. This requires a single parameter, the path to the configuration file, e.g.

siphon-asb-messages -n D:\temp\replay-config.json
siphon-asb-messages -n usr/tmp/replay-config.json

You might define a set of files that represent common scenarios for you, which can be easily referenced.

Azure Pipelines

Since this is a dotnet tool, you can run it from your Azure Pipeline on any platform. This lets you take advantage of Managed Identity from Service Connections, and interact with namespaces that may be IP Restricted by running the work from a build pool that is allow-listed on the namespace.

This gives you a way to allow supporting team members to run jobs, replaying or purging messages, in a secure and repeatable manner & without requiring any particular tools or permissions.

For example, another team may have a dependency on a service bus that you own and maintain. The namespace is IP restricted and access is via RBAC - they need to replay some dead letters on a topic-subscription. You can define an Azure pipeline to do this work and define a particular build pool with a known IP, then grant their team members permissions to run that pipeline only. In this way, the consuming team can self-serve without sharing connection strings or having to use any other Bastion type services

To do this, you'd need to define a configuration file in your source repository. Note, we use a variable placeholder here for $ServiceBusConnectionString

{
  "Logging": {
    "LogLevel": {
      "Default": "Information"
    }
  },  
  "ReplayMessagesJob": {
    "JobType": "DeadLettersSameEntity",
    "JobName": "Dead Letters Replay",
    "ServiceBusDetails": [
      {
        "Name": "Source",
        "ConnectionMode": "ConnectionString",
        "FullyQualifiedNamespace": "$(ServiceBusConnectionString)"
      }
    ],
    "SiphonWork": [
      {
        "SiphonMode": "CloneAndDelete",
        "SourceConnectionName": "Source",
        "SourceEntity": "some-topic-orders",
        "SourceSubscriptions": [
          "the-subscription-name"
        ]
      }
    ]
  }
}

An example pipeline might look like so. In this example, we support either RBAC using a fully qualified namespace, or connection strings by retrieving a secret from a vault. Replace Tokens will update the config file with the appropriate values

parameters:
  - name: StageName
    displayName: Service Bus Connection String Secret
    type: string
    default: "ReplayPurgeMessages"

  - name: AzureSubscription
    displayName: The name of the service connection that's used to connect to Azure
    type: string

  - name: ConfigPath
    displayName: Path to config
    type: string

  - name: ConfigFileName
    displayName: Message replay config
    type: string

  - name: Environment
    displayName: Environment
    type: string

  - name: ServiceBusFullyQualifiedName
    displayName: ServiceBusFullyQualifiedName
    type: string
    default: ""
  
  - name: KeyVaultSubscriptionName
    displayName: Name of the subscription that contains the key vault, if using it to retrieve connection strings
    type: string
    default: ""
  
  - name: KeyVaultName
    displayName: Key Vault Name
    type: string
    default: ""
  
  - name: SecretName
    displayName: Service Bus Connection String Secret
    type: string
    default: ""

stages:
  - stage: ${{parameters.StageName}}
    displayName: ${{parameters.StageName}}

    pool:
      vmImage: ubuntu-latest

    jobs: 
      - job:
        steps:
          - checkout: self

          - task: AzureCLI@2
            displayName: Get Service Bus Connection String From Vault
            condition: and(succeeded(), ne('${{ parameters.KeyVaultName }}', ''))
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                $subscription = ${{parameters.KeyVaultSubscriptionName}}
                $connectionString = az keyvault secret show --name ${{parameters.SecretName}} --subscription $subscription --vault-name ${{parameters.KeyVaultName}} --query value -o tsv                
                echo "##vso[task.setvariable variable=ServiceBusConnectionString]$connectionString"

          - task: AzureCLI@2
            displayName: Set variable to fully qualified namespace parameter
            condition: and(succeeded(), eq('${{ parameters.KeyVaultName }}', ''))
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                  $namespaceName = ${{parameters.ServiceBusFullyQualifiedName}}
                  echo "Connecting to namespace " + $namespaceName
                  echo "##vso[task.setvariable variable=ServiceBusFullyQualifiedName]$namespaceName"

          - task: replacetokens@4
            displayName: Replace Tokens
            inputs:
              targetFiles: '${{parameters.ConfigPath}}/*'
              encoding: 'auto'
              tokenPattern: 'azpipelines'
              writeBOM: true
              actionOnMissing: 'warn'
              keepToken: false
              actionOnNoFiles: 'continue'
              enableTransforms: false
              useLegacyPattern: false
              enableTelemetry: true

          - task: AzureCLI@2
            displayName: Perform Message Work
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                dotnet tool update Asos.ServiceBus.MessageSiphon --tool-path . --version 1.20.0
                & "./siphon-asb-messages" -n ${{parameters.ConfigPath}}/${{parameters.ConfigFileName}}

AKS Cronjob

Another option is to run the Message Siphon as an AKS Cronjob. This will start the tool at an interval defined in a Cron expression.

To achieve this you need to:

  1. Create a DevOps repository in your area, or use an existing one.
  2. Copy the charts/values.yaml from this repo and change the commented values to be correct for your platform
  3. Create a configuration file for the message siphon tool using the guidance above - an example is present in the .azdo/configuration folder.
  4. Create a DevOps Pipeline to deploy the application to the AKS cluster. An example pipeline has been provided in the .azdo folder which reads the configuration file into a DevOps variable, substitutes the values and then uses the Asos Core helm charts with the values.yaml to deploy the component to the cluster.

Limitations of the Cronjob

Using RBAC access to Service Bus connections is not supported at present. This will be resolved once Workload Identity becomes available. Access must therefore be achieved using Connection Strings. The example pipeline provided assumes that the connection string is stored in KeyVault and accessed at deployment time via a DevOps library variable.

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

This package has no dependencies.

Version Downloads Last updated
1.2.32 134 7/8/2024
1.2.31 115 7/1/2024
1.2.30 107 6/24/2024
1.2.29 91 6/24/2024
1.2.28 122 6/17/2024
1.2.27 94 6/12/2024
1.2.26 97 6/10/2024
1.2.25 84 6/4/2024
1.2.24 106 6/3/2024
1.2.23 102 5/28/2024
1.2.22 106 5/20/2024
1.2.21 123 5/20/2024
1.2.20 74 5/13/2024
1.2.19 92 5/13/2024
1.2.18 140 5/6/2024
1.2.17 124 4/29/2024
1.2.16 119 4/22/2024
1.2.15 94 4/22/2024
1.2.14 153 4/17/2024
1.2.12 202 4/12/2024
1.1.35 233 1/30/2024
1.1.34 213 1/26/2024
1.1.33 194 1/8/2024
1.1.26 235 12/6/2023
1.1.24 143 12/4/2023
1.1.23 146 12/4/2023
1.1.22 171 12/4/2023
1.1.21 180 12/4/2023
1.1.20 164 11/27/2023
1.1.19 186 11/20/2023
1.1.18 199 11/13/2023
1.1.17 251 11/6/2023
1.1.16 217 10/23/2023
1.1.15 264 10/17/2023
1.1.14 249 10/17/2023
1.1.13 249 10/16/2023
1.1.12 228 10/16/2023
1.1.11 215 10/2/2023
1.1.10 258 9/25/2023
1.1.9 243 9/14/2023
1.1.8 180 6/28/2023
1.1.7 299 2/16/2023
1.1.6 248 2/13/2023
1.1.5 238 2/7/2023
1.1.4 280 2/2/2023
1.1.2 316 12/13/2022
1.1.1 295 12/13/2022
1.1.0 320 12/13/2022
1.0.24 308 12/13/2022
1.0.23 303 12/12/2022
1.0.22 306 12/9/2022
1.0.21 433 8/5/2022
1.0.21-pr0026-0011 276 8/4/2022
1.0.20 437 8/3/2022
1.0.20-pr0024-0004 265 8/2/2022
1.0.20-ci-message-selec0003 304 8/2/2022
1.0.19 504 1/24/2022