Skip to end of banner
Go to start of banner

DI-SAAS (ELT Flow)

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

At Boxalino we are strong advisors for ELT flows (vs ETL). The major benefit is speed, transparency and maintenence: data is loaded directly into a destination system (BQ), and transformed in-parallel (Dataform).

The ELT flow is recommended when wanting to load your data as is (no transformation) in BQ and then perform the transformation logic (SQL) in Dataform. Integration-effort is minimal (push all data to GCS - Boxalino-owned or private client-owned, load data to BQ, write SQL in Dataform).

More information about the Integration Strategies is available on Confluence.
Integration Strategies

Data Integration (DI) Generic Flow

The Generic Data Integration (DI) Flow expects for the client, themselves to execute the steps described in Data Integration .

For certain platforms (Shopware6, Magento2), the step #1 can be done with a Boxalino Data Integration plugin.

For generic clients, the JSONL generation is managed by the clients` team by following the Boxalino Data Structure requirements Data Structure (as documented).

The Boxalino Data Structure and integration flows/traits are publicly available as a PHP repository https://github.com/boxalino/data-integration-doc-php

As a integrator, you can do the steps #1-#4 Data Integration by following our recommended integration flowIntegration Flow

Data Integration (DI) SAAS Flow

The di-saas service is designed to:

  1. be used for custom integrations by ingesting data (.csv, .jsonl) from SFTP, GCS, BQ or public links.

  2. integrate data from targetted platforms (ex: PlentyMarkets)

For both cases, a mapping is done between the Boxalino Data Structure Data Structure and the input sources (either files or API endpoints) with the help of BQ & Dataform.

In the case of a custom integration, the di-saas flow is composed of the following scopes:

  1. Data connector & Data load

    1. downloads the files from remote source to GCS

    2. loads the raw content to the transform dataset (<account>_T)

  2. Data transformation (with logic/SQL provided by the client)

    1. Steps #1-#4 from the Generic Flow

  3. DI automation

DI-SAAS Request (Overview)

The DI (data integration) request has all the required information in a JSON body.

The elements are for connector (+ files load options) and the di (data integration) request parameters (default configurations).

[
  {
    "connector": {
      "type": "sftp|gcs|plentymarket|plytix|boxalino",
      "options": {
          // specific for each connector type
      },
      "load": {
        "options": {
          // for loading the data in BQ (BQ parameters)
          "format": "CSV|NEWLINE_DELIMITED_JSON",
          "field_delimiter": ";",
          "autodetect": true,
          "schema": "",
          "skipRows": 1(CSV)|0(JSONL),
          "max_bad_records": 0,
          "quote": "",
          "write_disposition": "WRITE_TRUNCATE",
          "create_disposition": "",
          "encoding": ""
        },
        "doc_X": {
          "property_node_from_doc_data_structure": [
            {
              "source": "file___NODASHDATE__.jsonl",
              "name": "<used to create suffix for BQ table>",
              "schema": ""
            }
          ]
        }
      }
    },
    "di": {
      "configuration": {
        "languages": [
          "de",
          "fr"
        ],
        "currencies": [
          "CHF"
        ],
        "mapping": {
          "languagesCountryCode": {"de":"CH_de", "fr":"CH_fr"}
        },
        "default": {
          "language": "de",
          "currency": "CHF"
        }
      }
    }
  }
]

1. Data Connector & Data Load

The access to the data is managed by the client. These are a few guidelines for naming patterns:

  1. The relevant data sources are available in .csv or JSONL (prefered) format

  2. The files have a timestamp in the naming or in the access path (ex: product_20221206.jsonl)

    1. this will help automating the integration

  3. The files required to update a certain data type (ex: product, order, etc) are available in the same path

  4. The files are available on an endpoint (SFTP, GCS, 3rd party API, public URL) to which Boxalino has access

    1. for GCS/ GCP sources: access being shared to Boxalino`s Service Account boxalino-di-api@rtux-data-integration.iam.gserviceaccount.com

    2. for AWS / SFTP : the client`s own AWS/SFTP environment with a Boxalino user & credentials

 GCS connector properties (sample for JSONL files load definition)
"connector": {
  "type": "gcs",
    "options": {
      "source": {
        "bucket": "<GCS-bucket-name>",
        "pattern": "path/to/data/__NODASHDATE__/"
      }
  },
  "load": {
    "options": {
      "format": "NEWLINE_DELIMITED_JSON",
      "autodetect": true,
      "schema": "",
      "skipRows": 0,
      "max_bad_records": 0,
      "write_disposition": "WRITE_TRUNCATE"
    },
    "doc_attribute": {
      "properties": [
        {
          "source": "file1___NODASHDATE__.jsonl"
        }
      ],
      ..
    },
    "doc_attribute_value": {
      "properties": [
        {
          "source": "file2___NODASHDATE__.jsonl"
        }
      ],
      ..
    },
    "doc_product": {
      "entity": [
        {
          "source": "file3___NODASHDATE__.jsonl",
          "autodetect": 0,
          "schema": "SCHEMA FOR LOAD TO BQ"
          "name": ""
        }
      ]
    }
}
 (client-owned environment) SFTP connector properties (sample for CSV files load definition)
"connector": {
      "type": "sftp",
      "options": {
        "hostname": "<server>",
        "port": 22,
        "username": "<user for Boxalino access>",
        "password": "<password for Boxalino access>",
        "source": {
          "pattern": "___NODASHDATE__",
          "files": [
          ],
          "path": "<path on server>"
        }
      },
      "load": {
        "options": {
          "format": "CSV",
          "field_delimiter": ";",
          "autodetect": true,
          "schema": "",
          "skipRows": 1,
          "max_bad_records": 0,
          "quote": "",
          "write_disposition": "WRITE_TRUNCATE",
          "create_disposition": "",
          "encoding": ""
        },
        "doc_attribute": {
          "properties": [
            {
              "source": "file1___NODASHDATE__.csv"
            }
          ],
          ..
        },
        "doc_attribute_value": {
          "properties": [
            {
              "source": "file2___NODASHDATE__.csv"
            }
          ],
          ..
        },
        "doc_product": {
          "entity": [
            {
              "source": "file3___NODASHDATE__.csv",
              "autodetect": 0,
              "schema": "field1:STRING,field2:INT64,field3:FLOAT64,field4:BOOL"
              "name": "used to create suffix for BQ table"
            }
          ]
        }
    },

 PlentyMarket connector properties
"connector": {
    "type": "plentyMarket",
    "options": {
      "username": "<access-username>",
      "password": "<access-password>",
      "url": "<domain>",
      "paths": [
        {
          "value": "/rest/accounts/contacts",
          "itemsPerPage": 250,
          "name": "accounts",
          "filters": {
            "with": "addresses,accounts,options,primaryBillingAddress",
            "updatedAtAfter": "DATE-7",
            "lazyLoaded": false
          }
        }
      ]
    },
    "load": {
      "options": {
        "format": "NEWLINE_DELIMITED_JSON",
        "autodetect": true,
        "max_bad_records": 100,
        "write_disposition": "WRITE_TRUNCATE"
      },
      "doc_user": {
        "entity": [
          {
            "source": "accounts.json",
            "schema": "<LOAD SCHEMA>"
          }
        ]
      }
    }
  }

The load configuration defines the GCP properties for loading content to BQ.

The requirements specified above (#1-#4) are necessary if the data is accessed from a remote (outside Boxalino) scope.

If your integration exports the data directly in Boxalino (as described https://boxalino-internal.atlassian.net/wiki/spaces/DOC/pages/2606792705/Boxalino+Data+Integration+DI-SAAS+-+ELT+Flow#Loading-content-to-Boxalino-GCS-(connector%3A-boxalino) ), please continue with the Data Transformation step.

It is possible to configure dynamic source names (to identify the files loaded in GCS) (ex: if the content to be loaded in BQ is exported in batches or has a dynamic suffix).

The supported variations are:
__DATE__ (Y-m-d), __NODASHDATE__ (Ymd), __TM__ (Y-m-dTH:i:s), __NODASHTM__ (YmdHis), __DATE_PATH__ (Y/m/d), __JN_DATE_PATH__ (Y/n/j), __DATE_YESTERDAY__ (Y-m-d -1 day), __DATE_MONTH__ (Y-m-01), __NODASHDATE_MONTH__ (Ym01), __NODASHDATE_YESTERDAY__ (Ymd -1 day).

For example, if there are 3 chunks for order-<tm>-1.jsonl, the configuration for load can be:
"load":{"options":{<add the default BQ-load options}, "doc_order":{"entity":[{"source":"doc_order___NODASHTM__*.jsonl}]}}

2. Data Transformation

Once the data is loaded in GCS and BQ, it is time to transform it in the necessary data structure.

The transformation happens by preparing a BQ SQL for every node part of the doc_X data structure (ex: title, stock, price, string_attributes, etc). The output of the SQL will be of a certain format/structure for each property.

As this step was done in-house by Boxalino, for the POC of our ELT solution, further definition will be provided.

Dataform

The transformation happens with the help of Google Dataform https://cloud.google.com/dataform .

This implies the following:

  1. The client has access to a GCP project

  2. The client will create a Dataform repository https://cloud.google.com/dataform/docs/repositories

  3. The client has access to a GitHub or GitLab repository (to connect it to the Dataform repository) https://cloud.google.com/dataform/docs/connect-repository

  4. The client has given “Dataform Admin” permission to Boxalino Service Account boxalino-dataform@rtux-data-integration.iam.gserviceaccount.com

The DI-SAAS SYNC request

The DI request will use the same headers (client, tm, mode, type, authorization) and a JSON request body that would provide mapping details between the loaded .jsonl files and data meaning.

  • it will load the files to BQ in a T (transform) dataset (ex: SELECT * FROM rtux-data-integration.<account>_T.<tm>_<doc_X>_<property>_<name>)

  • it will run the transform flow (step #1-#3 from Data Integration )

    • generating each doc_X JSONL content

    • loading the doc_X JSONL in your GCS bucket

    • loading the doc_X JSONL to BQ

  • it will run the SYNC request Sync Request for the process

    • loads the content in core tables in BQ

    • loads the content in the respective data index (for products)

REQUEST DEFINITION

As an integrator, please create the bellow request to the provided endpoint.

There should be a process within your own project that triggers the data sync between a 3rd party source (connector) and Boxalino.

Endpoint

production

https://boxalino-di-saas-krceabfwya-ew.a.run.app

1

Action

/sync

2

Method

POST

3

Body

https://boxalino.atlassian.net/wiki/spaces/BPKB/pages/928874497/DI-SAAS+ELT+Flow#DI-SAAS-Request-(Overview)

4

Headers

Authorization

Basic base64<<DATASYNC API key : DATASYNC API Secret>>

note: use the API credentials from your Boxalino account that have the ADMIN role assigned

5

 

Content-Type

application/json

6

 

client

account name

7

 

mode

data sync mode: F for full, D for delta, E for enrichments

8

 

type

product, user, content, user_content, order.

if left empty - it will check for all tables with the given tm

9

 

tm

(optional) time , in format: YmdHis;

technical: used to identify the documents version

10

 

ts

(optional) timestamp, must be millisecond based in UNIX timestamp

11

 

dev

(optional) use this to mark the content for the dev data index

 Sample

This is a sample of a triggered full product sync (minimal data):

curl "https://boxalino-di-saas-krceabfwya-ew.a.run.app/sync" \
  -X POST \
  -d "[{\"connector\":{\"type\":\"gcs\",\"options\":{\"source\":{\"bucket\":\"my-account-bucket\",\"pattern\":\"boxalino/product/__NODASHDATE__/\"}},\"load\":{\"options\":{\"format\":\"NEWLINE_DELIMITED_JSON\",\"autodetect\":true,\"schema\":\"\",\"skipRows\":0,\"max_bad_records\":0,\"write_disposition\":\"WRITE_TRUNCATE\"},\"doc_product\":{\"entity\":[{\"source\":\"product.jsonl\",\"autodetect\":0,\"schema\":\"ProductId:INT64,Name:STRING,ProductTypeId:INT64,Sku:STRING,GroupId:STRING,Price:FLOAT64,SalePrice:FLOAT64,created:DATETIME\"}],\"product_relations\":[{\"source\":\"crosssell.jsonl\",\"name\":\"crosssell\"}],\"link\":[{\"source\":\"urlrecord.jsonl\"}]}}},\"di\":{\"configuration\":{\"languages\":[\"de\",\"fr\"],\"currencies\":[\"CHF\"],\"mapping\":{\"languagesCountryCode\":{\"de\":\"CH_de\",\"fr\":\"CH_fr\"}},\"default\":{\"language\":\"de\",\"currency\":\"CHF\"}}}}]" \
  -H "Content-Type: application/json" \
  -H "account: <boxalino-account-name>" \
  -H "mode: F" \
  -H "tm: 202303112000" \
  -H "type: product" \
  -H "Authorization: <base64_encode(api_key:api_secret)>" 

The request above created the following resources:

  1. GCS (raw data, as migrated from the connector)

    1. gs://prod_rtux-data-integration_<account>/product/202303112000/F_product.jsonl

    2. gs://prod_rtux-data-integration_<account>/product/202303112000/F_crossell.jsonl

    3. gs://prod_rtux-data-integration_<account>/product/202303112000/F_urlrecord.jsonl

  2. BQ (the T dataset - raw data as loaded from BQ)

    1. rtux-data-integration.<account>_T.202303112000-doc_product-entity

    2. rtux-data-integration.<account>_T.202303112000-doc_product-product_relations-crossell

    3. rtux-data-integration.<account>_T.202303112000-doc_product-link

  3. GCS (transformed doc_X JSONL)

    1. gs://prod_rtux-data-integration_<account>/doc_product_F_202303112000.jsonl

    2. gs://prod_rtux-data-integration_<account>/doc_language_F_202303112000.jsonl

  4. BQ (the F dataset - transformed data to doc_X data structure)

    1. rtux-data-integration.<account>_F.doc_product_F_202303112000

    2. rtux-data-integration.<account>_F.doc_language_F_202303112000

 Sample request with Boxalino connector (ex: data is already available in Boxalino GCP project, in client`s GCS bucket)
[
  {
    "connector": {
      "type": "boxalino",
       "load": {
        "options": {
          "format": "NEWLINE_DELIMITED_JSON",
          "autodetect": true,
          "schema": "",
          "skipRows": 0,
          "max_bad_records": 0,
          "write_disposition": "WRITE_TRUNCATE"
        },
        "doc_product": {
          "entity": [
            {
              "source": "product.jsonl",
              "autodetect": 0,
              "schema": "ProductId:INT64,Name:STRING,ProductTypeId:INT64,Sku:STRING,GroupId:STRING,Price:FLOAT64,SalePrice:FLOAT64,created:DATETIME"
            }
          ],
          "product_relations": [
            {
              "source": "crosssell.jsonl",
              "name": "crosssell"
            }
          ],
          "link": [
            {
              "source": "urlrecord.jsonl"
            }
          ]
        }
      }
    },
    "di": {
      "configuration": {
        "languages": [
          "de",
          "fr"
        ],
        "currencies": [
          "CHF"
        ],
        "mapping": {
          "languagesCountryCode": {"de":"CH_de", "fr":"CH_fr"}
        },
        "default": {
          "language": "de",
          "currency": "CHF"
        }
      }
    }
  }
]

Loading content to Boxalino GCS (connector: boxalino)

By following these steps, you can push your data (JSONL or CSV) directly in your client`s GCS bucket in the Boxalino scope. After all data has been loaded in GCS, the DI-SAAS request can be called https://boxalino.atlassian.net/wiki/spaces/BPKB/pages/928874497/DI-SAAS+ELT+Flow#The-DI-request , assigning connector → type : boxalino.

There are 2 available flows, based on the size of your data:

  1. The content is exported as the body of your POST request

  2. The content is exported with the help of a public GCS Signed URL (https://cloud.google.com/storage/docs/access-control/signed-urls )

Option #1 is recommended for data volume less than 32MB.

Option #2 is allowed for any data size.

The described steps can be followed if you want to export your data in Boxalino GCP project, in your clients` account.

The LOAD request will create a GCS file in your project`s GCS buckte: gs://prod_rtux-data-integration_<account>/<type>/<tm>/<mode>_<doc>

  • there is no content validation at this step

A. Loading content less than 32 MB

Files under 32MB can be loaded directly as REQUEST BODY CONTENT in Boxalino`s GCS.

The sample request bellow would create the languagefeed_20220317.csv in your GCS bucket gs://prod_rtux-data-integration_<account>/<type>/<tm>/<mode>_<filename>

curl "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "dev: true|false" \
  -H "tm: YYYYmmddHHiiss" \
  -H "type: product|content|order|user|communication_history|communication_planning|user_generated_content" \
  -H "mode: F|D|I" \
  -H "doc: <filename>" \
  -d "<JSONL>" \
  -H "Authorization: Basic <encode of the account>"

Step #1 must be repeated for every file that is required to be added for the given process (same tm, mode & type)

 Sample

For example, the request bellow would create a gs://prod_rtux-data-integration_<account>/product/20230301161554/T_doc_language.json in your clients` GCS bucket. This is a necessary data for the type:product integration.

curl --connect-timeout 30 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "tm: 20230301161554" \
  -H "type: product" \
  -H "mode: F" \
  -H "doc: doc_language.json" \
  -d "{\"language\":\"en\",\"country_code\":\"en-GB\",\"creation_tm\":\"2023-03-01 16:15:54\",\"client_id\":0,\"src_sys_id\":0}\n{\"language\":\"de\",\"country_code\":\"de-CH\",\"creation_tm\":\"2023-03-01 16:15:54\",\"client_id\":0,\"src_sys_id\":0}" \
  -H "Authorization: Basic <encode of the account>" 

The same tm value must be used across your other requests. This identifies the timestamp of your computation process.

If the service response is an error like: 413 Request Entity Too Large - please use the 2nd flow.

B. Loading undefined data size

For content over 32MB, we provide an endpoint to access a Signed GCS Url that would put all your streamed content into a file (currently there is no defined file size limit in GCS)

Read more about Google Cloud Signed URL https://cloud.google.com/storage/docs/access-control/signed-urls (response samples, uses, etc)

This is the generic POST request:

curl --connect-timeout 60 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load/url" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "dev: true|false" \
  -H "tm: YYYYmmddHHiiss" \
  -H "type: product|content|order|user|communication_history|communication_planning|user_generated_content" \
  -H "mode: F|D|I|E" \
  -H "chunk: <id>" \
  -H "doc: <filename>" \
  -H "Authorization: Basic <encode of the account>"

The response will be an upload link that can only be used in order to create the file in the clients` GCS bucket. The link is valid for 30 minutes.

 Sample

Lets say, we need a public link to upload a document called product-entity.jsonl. The following request can be used to generate the link:

curl --connect-timeout 60 --max-time 300 "https://boxalino-di-stage-krceabfwya-ew.a.run.app/transformer/load/url" \
  -X POST \
  -H "Content-Type: application/json" \
  -H "client: <account>" \
  -H "tm: 20230301161554" \
  -H "type: product" \
  -H "mode: F" \
  -H "doc: product-entity.jsonl" \
  -H "Authorization: Basic <encode of the account>"
curl --connect-timeout 60 --timeout 0 <GCS-signed-url> \
    -X PUT \
    -H "Content-Type: application/octet-stream" \
    -d "<YOUR DOCUMENT JSONL CONTENT (STREAM)>"

The use of the header chunk is required if the same file/document is exported in batches/sections.
Repeat steps 1+2 for every data batch loaded in GCS.
Make sure to increment the value of the chunk property for each /transformer/load/url request.

Step #1 - #2 must be repeated for every file that is required to be added for the given process (same tm, mode & type)

Only after all the files are available in GCS, you can move on to step#3.

After all required documents (doc) for the given type data sync (ex: product, order, etc) have been made available in GCS, the DI-SAAS request can be called https://boxalino.atlassian.net/wiki/spaces/BPKB/pages/928874497/DI-SAAS+ELT+Flow#The-DI-request , assigning connector → type : boxalino.

Technical notes

  1. Every request to our nodes returns a response (200 or 400). This can be used internally for testing purposes as well.

  2. It is possible to review the STATUS of the requests at any given time Status Review

  • No labels