ADF (фабрика данных azure) копирует данные из SQL Server в Cosmos DB с полем, содержащим объект json

У меня есть база данных SQL, содержащая от одного до многих отношений между таблицами. Я написал запрос, чтобы он содержал строки с полями, содержащими json (для связанных строк таблицы). Вот запрос -

select msg.MessageId as id
      ,msg.CreatedDate as [CreatedDate]
      ,
      (select [RecipientTypeId] as [RecipientTypeId]
      ,[RecipientId] as [RecipientId]
      ,[mr.CreatedDate] as [CreatedDate]
      ,[IsRead] as [IsRead]
      ,[ReadDate] as [ReadDate]
       from [dbo].[MsgRecipients] mr
       where msg.messageid = mr.messageid  FOR JSON PATH, INCLUDE_NULL_VALUES) as Recipients
       ,
       (select 
       [Data] as [Data]
      ,[Value] as [Value]
      ,[mc.SomeId] as [SomeId]
       from [dbo].[MessageContent] mc
       where msg.messageid = mc.messageid  FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES) as MessageContent
       from [dbo].[Messages] msg

Ниже приведены результаты запроса -

введите здесь описание изображения

Здесь я получаю два поля: получатели содержат массив объектов, а сообщения содержат набор объектов.

В ADF я написал это -

{
    "name": "CopyPipeline_SQL_to_Cosmos",
    "properties": {
        "description": "CopyPipeline_SQL_to_Cosmos",
        "activities": [
            {
                "name": "Copy_lbp",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [
                    {
                        "name": "Destination",
                        "value": "Messages1"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": {
                            "value": "select msg.MessageId as id, msg.CreatedDate as [CreatedDate], (select [RecipientTypeId] as [RecipientTypeId], [RecipientId] as [RecipientId], [mr.CreatedDate] as [CreatedDate], [ReadDate] as [ReadDate] from [dbo].[MsgRecipients] mr where msg.messageid = mr.messageid  FOR JSON PATH, INCLUDE_NULL_VALUES) as Recipients, (select  [Data] as [Data], [Value] as [Value], [mc.SomeId] as [SomeId] from [dbo].[MessageContent] mc where msg.messageid = mc.messageid  FOR JSON PATH, WITHOUT_ARRAY_WRAPPER, INCLUDE_NULL_VALUES) as MessageContent from [dbo].[Messages] msg  where CreatedDate >= '@{formatDateTime(pipeline().parameters.windowStart, 'yyyy-MM-dd HH:mm' )}' AND CreatedDate < '@{formatDateTime(pipeline().parameters.windowEnd, 'yyyy-MM-dd HH:mm' )}'",
                            "type": "Expression"
                        }
                    },
                    "sink": {
                        "type": "DocumentDbCollectionSink",
                        "nestingSeparator": "",
                        "writeBatchSize": 10000,
                        "writeBehavior": "upsert"
                    },
                    "enableStaging": false
                },
                "inputs": [
                    {
                        "referenceName": "SourceDataset_lbp",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "DestinationDataset_lbp",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "Custom1",
                "type": "Custom",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                }
            }
        ],
        "parameters": {
            "windowStart": {
                "type": "String"
            },
            "windowEnd": {
                "type": "String"
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Я получаю такие записи ->

введите здесь описание изображения

На изображении выше вы можете заметить, что в Recipients и MessageContent добавлен весь json.

А я этого жду -

введите здесь описание изображения




Ответы (1)


У меня нет данных, поэтому я не могу протестировать, но я думаю, что вам придется использовать «Отображение схемы».

Прочтите о структуре здесь https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-schema-and-type-mapping#alternative-schema-mapping

person HimanshuSinha-msft    schedule 04.06.2019