Skip to end of banner
Go to start of banner

DI-SAAS (ELT Flow)

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

For the purpose of account launch and best-practices configuration, the following data must be synchronized fully once:

  1. product

  2. order (online or/and offline)

  3. user (not top priority)

More information about the Integration Strategies is available on Confluence.
Integration Strategies

Data Integration (DI) Generic Flow

The Generic Data Integration (DI) Flow expects for the client, themselves to:

  1. create each JSONL document

  2. load to Boxalino GCS

  3. load the files to Boxalino BQ

  4. trigger the computation (SYNC)

For certain platforms (Shopware6, Magento2), the step #1 can be done with a Boxalino Data Integration plugin.

For generic clients, the JSONL generation is managed by the clients` team by following the Boxalino Data Structure requirements Data Structure (as documented).

The Boxalino Data Structure and integration flows/traits are publicly available as a PHP repository https://github.com/boxalino/data-integration-doc-php

As a integrator, you can do the steps #1-#4 by following our recommended integration flowIntegration Flow

Data Integration (DI) SAAS Flow

The di-saas service is designed to:

  1. be used for custom integrations by ingesting data (.csv, .jsonl) from SFTP, GCS, BQ or public links.

  2. integrate data from targetted platforms (ex: PlentyMarkets)

For both cases, a mapping is done between the Boxalino Data Structure Data Structure and the input sources (either files or API endpoints) with the help of BQ & Dataform.

In the case of a custom integration, the di-saas flow is composed of the following scopes:

  1. Data connector & Data load

    1. downloads the files from remote source to GCS

    2. loads the raw content to the transform dataset (<account>_T)

  2. Data transformation (with logic/SQL provided by the client)

    1. Steps #1-#4 from the Generic Flow

  3. DI automation

DI-SAAS Request (Overview)

The DI (data integration) request has all the required information in a JSON body.

The elements are for connector (+ files load options) and the di (data integration) request parameters (default configurations).

[
  {
    "connector": {
      "type": "sftp|gcs|plentymarket|boxalino",
      "options": {
          // specific for each connector type
      },
      "load": {
        "options": {
          // for loading the data in BQ (BQ parameters)
          "format": "CSV|NEWLINE_DELIMITED_JSON",
          "field_delimiter": ";",
          "autodetect": true,
          "schema": "",
          "skipRows": 1(CSV)|0(JSONL),
          "max_bad_records": 0,
          "quote": "",
          "write_disposition": "WRITE_TRUNCATE",
          "create_disposition": "",
          "encoding": ""
        },
        "doc_X": {
          "property_node_from_doc_data_structure": [
            {
              "source": "file___NODASHDATE__.jsonl",
              "name": "<used to create suffix for BQ table>",
              "schema": ""
            }
          ]
        }
      }
    },
    "di": {
      "configuration": {
        "languages": [
          "de",
          "fr"
        ],
        "currencies": [
          "CHF"
        ],
        "mapping": {
          "languagesCountryCode": {"de":"CH_de", "fr":"CH_fr"}
        },
        "default": {
          "language": "de",
          "currency": "CHF"
        }
      }
    }
  }
]

Data Connector & Data Load

The access to the data is managed by the client. These are a few guidelines for naming patterns:

  1. The relevant data sources are available in .csv or JSONL (prefered) format

  2. The files have a timestamp in the naming or in the access path (ex: product_20221206.jsonl)

    1. this will help automating the integration

  3. The files required to update a certain data type (ex: product, order, etc) are available in the same path

  4. The files are available on an endpoint (SFTP, GCS, public) to which Boxalino has access

    1. for GCS/ GCP sources: access being shared to Boxalino`s Service Account boxalino-di-api@rtux-data-integration.iam.gserviceaccount.com

    2. for AWS / SFTP : a Boxalino user & credentials are provided

 GCS connector properties (with JSONL files)
"connector": {
  "type": "gcs",
    "options": {
      "source": {
        "bucket": "<GCS-bucket-name>",
        "pattern": "path/to/data/__NODASHDATE__/"
      }
  },
  "load": {
    "options": {
      "format": "NEWLINE_DELIMITED_JSON",
      "autodetect": true,
      "schema": "",
      "skipRows": 0,
      "max_bad_records": 0,
      "write_disposition": "WRITE_TRUNCATE"
    },
    "doc_attribute": {
      "properties": [
        {
          "source": "file1___NODASHDATE__.jsonl"
        }
      ],
      ..
    },
    "doc_attribute_value": {
      "properties": [
        {
          "source": "file2___NODASHDATE__.jsonl"
        }
      ],
      ..
    },
    "doc_product": {
      "entity": [
        {
          "source": "file3___NODASHDATE__.jsonl",
          "autodetect": 0,
          "schema": "SCHEMA FOR LOAD TO BQ"
          "name": ""
        }
      ]
    }
}
 SFTP connector properties (with CSV files)
"connector": {
      "type": "sftp",
      "options": {
        "hostname": "<server>",
        "port": 22,
        "username": "<boxalino user>",
        "password": "<boxalino password>",
        "source": {
          "pattern": "___NODASHDATE__",
          "files": [
          ],
          "path": "<path on server>"
        }
      },
      "load": {
        "options": {
          "format": "CSV",
          "field_delimiter": ";",
          "autodetect": true,
          "schema": "",
          "skipRows": 1,
          "max_bad_records": 0,
          "quote": "",
          "write_disposition": "WRITE_TRUNCATE",
          "create_disposition": "",
          "encoding": ""
        },
        "doc_attribute": {
          "properties": [
            {
              "source": "file1___NODASHDATE__.csv"
            }
          ],
          ..
        },
        "doc_attribute_value": {
          "properties": [
            {
              "source": "file2___NODASHDATE__.csv"
            }
          ],
          ..
        },
        "doc_product": {
          "entity": [
            {
              "source": "file3___NODASHDATE__.csv",
              "autodetect": 0,
              "schema": "SCHEMA FOR LOAD TO BQ"
              "name": "used to create suffix for BQ table"
            }
          ]
        }
    },

 PlentyMarket connector properties
"connector": {
    "type": "plentyMarket",
    "options": {
      "username": "<access-username>",
      "password": "<access-password>",
      "url": "<domain>",
      "paths": [
        {
          "value": "/rest/accounts/contacts",
          "itemsPerPage": 250,
          "name": "accounts",
          "filters": {
            "with": "addresses,accounts,options,primaryBillingAddress",
            "updatedAtAfter": "DATE-7",
            "lazyLoaded": false
          }
        }
      ]
    },
    "load": {
      "options": {
        "format": "NEWLINE_DELIMITED_JSON",
        "autodetect": true,
        "max_bad_records": 100,
        "write_disposition": "WRITE_TRUNCATE"
      },
      "doc_user": {
        "entity": [
          {
            "source": "accounts.json",
            "schema": "<LOAD SCHEMA>"
          }
        ]
      }
    }
  }

The load configuration defines the properties for loading content to BQ.

The requirements specified above (#1-#4) are necessary if the data is accessed from a remote (outside Boxalino) scope.

If your integration exports the data directly in Boxalino (as described https://boxalino-internal.atlassian.net/wiki/spaces/DOC/pages/2606792705/Boxalino+Data+Integration+DI-SAAS+-+ELT+Flow#Loading-content-to-Boxalino-GCS-(connector%3A-boxalino) ), please continue with the Data Transformation step.

Data Transformation

Once the data is loaded in GCS and BQ, it is time to transform it in the necessary data structure.

The transformation happens by preparing a BQ SQL for every node part of the doc_X data structure (ex: title, stock, price, string_attributes, etc). The output of the SQL will be of a certain format/structure for each property.

As this step was done in-house by Boxalino, for the POC of our ELT solution, further definition will be provided.

The DI-SAAS request

The DI request will use the same headers (client, tm, mode, type, authorization) and a JSON request body that would provide mapping details between the loaded .jsonl files and data meaning.

  • it will load the files to BQ in a T (transform) dataset (ex: SELECT * FROM rtux-data-integration.<account>_T.<tm>_<doc_X>_<property>_<name>)

  • it will run the transform flow (step #1-#3 from Data Integration )

    • generating each doc_X JSONL content

    • loading the doc_X JSONL in your GCS bucket

    • loading the doc_X JSONL to BQ

  • it will run the SYNC request Sync Request for the process

    • loads the content in core tables in BQ

    • loads the content in the respective data index (for products)

REQUEST DEFINITION

As an integrator, please create the bellow request to the provided endpoint.

There should be a process within your own project that triggers the data sync between a 3rd party source (connector) and Boxalino.

Endpoint

production

https://boxalino-di-saas-krceabfwya-ew.a.run.app

1

 

test / stage

https://boxalino-di-transformer-stage-krceabfwya-ew.a.run.app

2

Action

/di

3

Method

POST

4

Body

https://boxalino.atlassian.net/wiki/spaces/BPKB/pages/928874497/DI-SAAS+ELT+Flow#DI-SAAS-Request-(Overview)

5

Headers

Authorization

Basic base64<<DATASYNC API key : DATASYNC API Secret>>

note: use the API credentials from your Boxalino account that have the ADMIN role assigned

6

 

Content-Type

application/json

7

 

client

account name

8

 

mode

data sync mode: F for full, D for delta

9

 

type

product, user, content, user_content, order.

if left empty - it will check for all tables with the given tm

10

 

tm

(optional) time , in format: YmdHis;

technical: used to identify the documents version

11

 

ts

(optional) timestamp, must be millisecond based in UNIX timestamp

12

 

dev

(optional) use this to mark the content for the dev data index

 Sample

This is a sample of a triggered full product sync (minimal data):

curl "https://boxalino-di-transformer-krceabfwya-ew.a.run.app/di" \
  -X POST \
  -d "[{\"connector\":{\"type\":\"gcs\",\"options\":{\"source\":{\"bucket\":\"my-account-bucket\",\"pattern\":\"boxalino/product/__NODASHDATE__/\"}},\"load\":{\"options\":{\"format\":\"NEWLINE_DELIMITED_JSON\",\"autodetect\":true,\"schema\":\"\",\"skipRows\":0,\"max_bad_records\":0,\"write_disposition\":\"WRITE_TRUNCATE\"},\"doc_product\":{\"entity\":[{\"source\":\"product.jsonl\",\"autodetect\":0,\"schema\":\"ProductId:INT64,Name:STRING,ProductTypeId:INT64,Sku:STRING,GroupId:STRING,Price:FLOAT64,SalePrice:FLOAT64,created:DATETIME\"}],\"product_relations\":[{\"source\":\"crosssell.jsonl\",\"name\":\"crosssell\"}],\"link\":[{\"source\":\"urlrecord.jsonl\"}]}}},\"di\":{\"configuration\":{\"languages\":[\"de\",\"fr\"],\"currencies\":[\"CHF\"],\"mapping\":{\"languagesCountryCode\":{\"de\":\"CH_de\",\"fr\":\"CH_fr\"}},\"default\":{\"language\":\"de\",\"currency\":\"CHF\"}}}}]" \
  -H "Content-Type: application/json" \
  -H "account: <boxalino-account-name>" \
  -H "mode: F" \
  -H "tm: 202303112000" \
  -H "type: product" \
  -H "Authorization: <base64_encode(api_key:api_secret)>" 

The request above created the following resources:

  1. GCS (raw data, as migrated from the connector)

    1. gs://prod_rtux-data-integration_<account>/product/202303112000/F_product.jsonl

    2. gs://prod_rtux-data-integration_<account>/product/202303112000/F_crossell.jsonl

    3. gs://prod_rtux-data-integration_<account>/product/202303112000/F_urlrecord.jsonl

  2. BQ (the T dataset - raw data as loaded from BQ)

    1. rtux-data-integration.<account>_T.202303112000-doc_product-entity

    2. rtux-data-integration.<account>_T.202303112000-doc_product-product_relations-crossell

    3. rtux-data-integration.<account>_T.202303112000-doc_product-link

  3. GCS (transformed doc_X JSONL)

    1. gs://prod_rtux-data-integration_<account>/doc_product_F_202303112000.jsonl

    2. gs://prod_rtux-data-integration_<account>/doc_language_F_202303112000.jsonl

  4. BQ (the F dataset - transformed data to doc_X data structure)

    1. rtux-data-integration.<account>_F.doc_product_F_202303112000

    2. rtux-data-integration.<account>_F.doc_language_F_202303112000

 Sample request with Boxalino connector (ex: data is already available in Boxalino GCP project, in client`s GCS bucket)
[
  {
    "connector": {
      "type": "boxalino",
       "load": {
        "options": {
          "format": "NEWLINE_DELIMITED_JSON",
          "autodetect": true,
          "schema": "",
          "skipRows": 0,
          "max_bad_records": 0,
          "write_disposition": "WRITE_TRUNCATE"
        },
        "doc_product": {
          "entity": [
            {
              "source": "product.jsonl",
              "autodetect": 0,
              "schema": "ProductId:INT64,Name:STRING,ProductTypeId:INT64,Sku:STRING,GroupId:STRING,Price:FLOAT64,SalePrice:FLOAT64,created:DATETIME"
            }
          ],
          "product_relations": [
            {
              "source": "crosssell.jsonl",
              "name": "crosssell"
            }
          ],
          "link": [
            {
              "source": "urlrecord.jsonl"
            }
          ]
        }
      }
    },
    "di": {
      "configuration": {
        "languages": [
          "de",
          "fr"
        ],
        "currencies": [
          "CHF"
        ],
        "mapping": {
          "languagesCountryCode": {"de":"CH_de", "fr":"CH_fr"}
        },
        "default": {
          "language": "de",
          "currency": "CHF"
        }
      }
    }
  }
]

Loading content to Boxalino GCS (connector: boxalino)

There are 2 available flows, based on the size of your data:

  1. The content is exported as the body of your POST request

  2. The content is exported with the help of a public GCS Signed URL (https://cloud.google.com/storage/docs/access-control/signed-urls )

Option #1 is recommended for data volume less than 32MB.

Option #2 is allowed for any data size.

The described steps can be followed if you want to export your data in Boxalino GCP project, in your clients` account.

The LOAD request will create a GCS file in your project`s GCS buckte: gs://prod_rtux-data-integration_<account>/<type>/<tm>/<mode>_<doc>

  • there is no content validation at this step

A. Loading content less than 32 MB

Files under 32MB can be loaded directly as REQUEST BODY CONTENT in Boxalino`s GCS.

The sample request bellow would create the languagefeed_20220317.csv in your GCS bucket gs://prod_rtux-data-integration_<account>/<type>/<tm>/<mode>_<filename>

curl "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "dev: true|false" \
  -H "tm: YYYYmmddHHiiss" \
  -H "type: product|content|order|user|communication_history|communication_planning|user_generated_content" \
  -H "mode: F|D|I" \
  -H "doc: <filename>" \
  -d "<JSONL>" \
  -H "Authorization: Basic <encode of the account>"

Step #1 must be repeated for every file that is required to be added for the given process (same tm, mode & type)

 Sample

For example, the request bellow would create a gs://prod_rtux-data-integration_<account>/product/20230301161554/T_doc_language.json in your clients` GCS bucket. This is a necessary data for the type:product integration.

curl --connect-timeout 30 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "tm: 20230301161554" \
  -H "type: product" \
  -H "mode: F" \
  -H "doc: doc_language.json" \
  -d "{\"language\":\"en\",\"country_code\":\"en-GB\",\"creation_tm\":\"2023-03-01 16:15:54\",\"client_id\":0,\"src_sys_id\":0}\n{\"language\":\"de\",\"country_code\":\"de-CH\",\"creation_tm\":\"2023-03-01 16:15:54\",\"client_id\":0,\"src_sys_id\":0}" \
  -H "Authorization: Basic <encode of the account>" 

The same tm value must be used across your other requests. This identifies the timestamp of your computation process.

If the service response is an error like: 413 Request Entity Too Large - please use the 2nd flow.

B. Loading undefined data size

For content over 32MB, we provide an endpoint to access a Signed GCS Url that would put all your streamed content into a file.

This is the generic POST request:

curl --connect-timeout 60 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load/url" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "dev: true|false" \
  -H "tm: YYYYmmddHHiiss" \
  -H "type: product|content|order|user|communication_history|communication_planning|user_generated_content" \
  -H "mode: F|D|I" \
  -H "chunk: <id>" \
  -H "doc: <filename>" \
  -H "Authorization: Basic <encode of the account>"

The response will be an upload link that can only be used in order to create the file in the clients` GCS bucket. The link is valid for 30 minutes.

 Sample

Lets say, we need a public link to upload a document called product-entity.jsonl. The following request can be used to generate the link:

curl --connect-timeout 60 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load/url" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "tm: 20230301161554" \
  -H "type: product" \
  -H "mode: F" \
  -H "doc: product-entity.jsonl" \
  -H "Authorization: Basic <encode of the account>"
curl --connect-timeout 60 --timeout 0 <GCS-signed-url> \
    -X PUT \
    -H "Content-Type: application/octet-stream" \
    -d "<YOUR DOCUMENT JSONL CONTENT (STREAM)>"

Read more about Google Cloud Signed URL https://cloud.google.com/storage/docs/access-control/signed-urls (response samples, uses, etc)

The use of the header chunk is required if the same file/document is exported in batches/sections.
Repeat steps 1+2 for every data batch loaded in GCS.
Make sure to increment the value of the chunk property for each /transformer/load/url request.

Step #1 - #2 must be repeated for every file that is required to be added for the given process (same tm, mode & type)

Only after the full content is available in GCS, you can move on to step#3.

After all required documents (doc) for the given type data sync (ex: product, order, etc) have been made available in GCS, the DI-SAAS request can be called. https://boxalino.atlassian.net/wiki/spaces/BPKB/pages/928874497/DI-SAAS+ELT+Flow#The-DI-request

Technical notes

  1. Every request to our nodes returns a response (200 or 400). This can be used internally for testing purposes as well.

  2. It is possible to review the STATUS of the requests at any given time Status Review

  • No labels