Skip to content

Export and Delivery

Introduction

The export and delivery workflow of Grepsr plays a critical role in handling the data obtained from the running crawlers. The crawl data is continuously sent to AWS Kinesis for further processing. However, instead of directly sending the data to Kinesis, Grepsr utilizes Fluentd as a buffer. The data is first sent to Fluentd, which then pushes the data to a Kinesis stream. This additional step is implemented to address the throughput limitations of Kinesis, ensuring data integrity even during high traffic periods.

After the data is streamed from Kinesis, it undergoes processing through a Flink pipeline. The Flink pipeline stores the data into MongoDB, serving as the primary storage mechanism. Additionally, there is another Flink data pipeline known as the Archival pipeline, which stores the data in an Avro format to an S3 bucket. This step is necessary as the data collection in MongoDB is set to be deleted after a specific number of days and the archived data in the form of avro will be there permanently. If needed we can unarchive the data into mongodb as per the business requirements.

When all the data for a particular crawler is completed, a flag is set within the data, and the Flink pipeline emits an event called "Reportexportinitiatedevent." This event serves as a trigger to initiate the export and delivery workflow. The Reportexportinitiatedevent is emitted to AWS EventBridge, which then triggers a Lambda function responsible for initiating the temporal event processor.

The temporal workflow, named export-files-sch, is then initiated to ensure the matching of metadata count and actual data in MongoDB. This workflow also handles the export formats and delivery methods based on predefined configurations. In addition to the "Reportexportinitiatedevent," the Flink pipeline emits another event called "HistoryStatsAddedEvent" regularly. This event serves as a form of metadata and provides visibility into the crawler data statistics, allowing users to monitor the quantity and types of data being crawled. The "HistoryStatsAddedEvent" data is stored in a MySQL database table named "vt_extractor_history."

This documentation aims to provide an overview of the export and delivery workflow in Grepsr, covering the various components, data flow, and event-driven processes involved.

Export and Delivery Components:

1. Data Collection and Storage

The data collection and storage component of the export and delivery workflow involve the following steps:

Crawlers gather data during the crawling process, and this data is transmitted to Grepsr. To manage the data flow and prevent data loss, a buffering mechanism is employed. The data is sent to Fluentd, which acts as a buffer. Fluentd receives the data and forwards it to a Kinesis shards.

Once the data is streamed from Kinesis, it undergoes further processing and storage. The data is passed through a Flink pipeline, which is responsible for storing the data into MongoDB. In MongoDB, the data is stored as the main repository for retrieval and querying purposes. It serves as a persistent storage solution, allowing easy access to the crawled data. Each run is stored as a collection with naming format data_<reportid>_<historyid>.

Simultaneously, the Flink data pipeline named archival pipeline converts the data into the Avro format and stores it in an S3 bucket. The archival pipeline ensures the long-term preservation of the data, as the data stored in MongoDB is periodically deleted after a specified number of days.

By employing this combined data collection and storage approach, Grepsr ensures data integrity, scalability, and reliable long-term storage of the crawled data.

  1. Event Emission and Triggering When all the data for a particular crawler is completed, a flag is set within the data to signal completion. The Flink pipeline emits a specific event named "Reportexportinitiatedevent." This event acts as a trigger for the subsequent steps in the export and delivery workflow.

    The "Reportexportinitiatedevent" event is emitted to AWS EventBridge, a fully managed event bus service. AWS EventBridge, in turn, triggers a Lambda function designed to initiate the temporal event processor.

    In addition, the Flink pipeline also emits a "HistoryStatsAddedEvent" regularly. The "HistoryStatsAddedEvent" serves as metadata and provides visibility into crawler data statistics. It contains metadata information such as

    • reportId
    • historyId
    • bandwidthUpload
    • bandwidthDownload
    • requestCount
    • itemCount
    • proxyType
    • proxyTypes
    • accumulatedUsageMetric
    • timestamp
    • batchId

    The event is handled by Crawler events pipeline which calls stats service to store data in a MySQL database table named "vt_extractor_history".

  2. Temporal Event Processor The temporal event processor encompasses the "export-files-sch" workflow. It is responsible for coordinating and executing the various tasks required for exporting and delivering the data. The temporal event processor ensures that the metadata count and the actual data in MongoDB are matched accurately. Additionally, the temporal event processor determines the export formats and delivery methods based on predefined configurations. It orchestrates the necessary steps for packaging and preparing the data for export, such as generating the appropriate file formats and organizing the data for efficient delivery. Finally this workflow is also responsible for syncing the data schema into archived avro data stored in s3 bucket.

    🚨 Important Note It is crucial to be aware of a specific behavior when working with temporal activities that make gRPC calls to microservices. In such cases, these calls must traverse the AWS gateway. However, AWS has an automatic connection closure mechanism that terminates gRPC calls lasting more than 10 minutes. Unfortunately, the temporal framework is unaware of this behavior and continues to wait for a response indefinitely. Since no reply is received from the gRPC call, temporal assumes that the connection has timed out and retries the activity. This can result in duplicate processing of the activity as temporal attempts to rerun it endlessly. This issue particularly occurs in export file activity in ExportFilesWorkflow, especially when dealing with large data volumes where the export process can exceed the 10-minute threshold.

    To address this problem, a temporary workaround has been implemented in our workflow by introducing a threshold configuration. If the volume or size of the export data surpasses this threshold, the workflow bypasses the AWS gateway and directly calls the microservice. Although this solution is not ideal, it serves as a temporary fix until a permanent resolution is found. A similar scenario can occur with the "Sync Data Columns" activity, but in this case, the activity is retried only once. If the schema is excessively large and the activity takes too long to complete, the process will skip this step.`

Match Count

  • The match_count activity is responsible for checking whether the data in MongoDB and the metadata stored in MySQL are matched.
  • If the data is not matched, the activity sleeps for 30 seconds and repeats the check.
  • This process continues for a maximum of 100 attempts before proceeding.
  • The purpose of this activity is to ensure that all data has been successfully stored, accounting for any delays due to high traffic or other reasons.

Emit Data Ingestion Event

  • Once all the data has been ingested by our system CrawldataIngestionCompletedEvent event is emitted.
  • The purpose of this event is to let other parties know that all the crawler data has been received and additional processing can be initiated. (If any)
  • Currently we have CrawldataIngestionCompletedWorkflow which is responsible for calculating quality of data and sending alerts based on configurations.

Get Export Formats

  • The get_export_formats activity utilizes the export service to fetch the desired format configured for a particular report.
  • Grepsr supports multiple export options, including XLSX, CSV, XML, JSON, YAML, and Parquet.
  • This activity retrieves the export format information needed for further processing.

Should Deliver

  • The should_deliver activity checks whether the deliver flag is set to true or false.
  • Since the same workflow is used for manual export as well, this activity ensures that the delivery process is executed only when the deliver flag is set to true.

Get Delivery Methods

  • The get_delivery_methods activity uses the delivery service to determine the delivery method configured for a particular schedule.
  • Grepsr platform supports various delivery options, including email, FTP, S3, Dropbox, webhooks, Slack, Google Cloud, Azure Cloud, Box, and file feeds.
  • This activity retrieves the delivery method information required for further processing.

Export Files

  • The export_files activity is responsible for converting the data to the desired format as specified in the export configuration.
  • It formats the data and stores the formatted file in an S3 bucket.
  • Additionally, it generates a presigned URL that is valid for a specified number of days.
  • The presigned URL is sent to the client for file delivery.
  • The export_files activity utilizes the delivery service to accomplish these tasks.

Deliver Files

  • The deliver_files activity is responsible for delivering the formatted files to the client based on the chosen delivery method.
  • It uses the delivery service to perform the file delivery operation.
  • If the delivery operation is successful, the filedeliveryprocessedevent is emitted.
  • In case of a failed delivery attempt, the filedeliveryprocessedevent is emitted to indicate the failure.

Sync Avro Schema to S3

  • The sync_avro_schema_to_s3 activity is responsible for synchronizing the data schema to the archived data stored in the Avro format in the S3 bucket.
  • This activity ensures that the schema remains consistent and up-to-date throughout the export and delivery workflow.
  • These activities collectively form the temporal event processor workflow, enabling seamless export and delivery of the data to the clients in the desired format and via the chosen delivery method.

Sync Data Columns

  • This activity is responsible for syncing the column schema into mysql database.
  • This is particularly important for quality alerts since we can add alerts based on accuracy and fill rate for specific columns. get dataset meta activity is called from export service to get the data schema from the current run.

Manual Export

In the manual export workflow, when a user clicks on the "Export Report" option via the Grepsr platform, they are presented with the flexibility to choose the file format in which they want to export the file and decide whether to proceed with the delivery of the file. These options are sent as attributes on the ReportExportInitiatedEvent event. The manual export workflow closely resembles the scheduled export and delivery workflow, with a few key differences.

  1. Event Initiation

    • When the user initiates the manual export, the "ReportExportInitiatedEvent" event is triggered and sent to AWS EventBridge.
    • AWS EventBridge triggers a Lambda function, which in turn initiates the temporal workflow specifically designed for manual exports.
  2. Temporal Workflow Execution

    • The temporal workflow for manual exports executes similar activities as the scheduled export workflow, with some variations.
    • The match_count activity is skipped since the data is already present in MongoDB at this point, eliminating the need to check for matching data between MongoDB and metadata stored in MySQL.
    • One major difference in manual export is, the workflow checks whether the dataset is already archived.
    • If the dataset is archived, the export service is called to unarchive the dataset before further processing.
    • The workflow checks if the user has selected the "process_notification" option, indicating their intent to proceed with the delivery of the file.
    • If the "process_notification" option is not selected, the workflow will conclude without performing the file delivery step.

Manual Delivery

The manual delivery workflow in Grepsr allows users to deliver files to their desired destinations, such as email, Dropbox, FTP, webhook, Slack, Amazon S3, Google Cloud, Azure Cloud, feed, or Box. When a user clicks on the "Delivery File" option in the Grepsr platform, they can select the delivery destination and initiate the file delivery process. This documentation outlines the steps involved in the manual delivery workflow, from event triggering to file delivery.

  1. Event Initialization When a user selects the "Delivery File" option and chooses the desired delivery destination, the Grepsr platform emits a "DeliveryInitiatedEvent" through AWS EventBridge. This event contains the necessary information about the file to be delivered and the chosen delivery destination. The "DeliveryInitiatedEvent" triggers a Lambda function responsible for executing the "deliver-file-man" workflow in Temporal.

  2. Temporal Workflow Execution The "deliver-file-man" workflow, executed by Temporal, performs the necessary tasks to deliver the file to the user's specified destination. The workflow follows these steps:

    • Get File Details

      • The workflow validates the selected delivery destination provided by the user.
      • It ensures that the chosen destination is supported and available for file delivery.
    • Deliver File

      • Using the Delivery service, the workflow delivers the file to the specified destination.
      • The Delivery service handles the necessary protocols and integrations required for each delivery destination.
      • The file is securely transferred to the destination according to the delivery method's specifications.
      • After the file is successfully delivered, the workflow generates a delivery confirmation.
      • If the file delivery operation was successful, a "FileDeliveryProcessedEvent" is emitted to indicate the successful delivery.
      • In case of any errors or failures during the delivery process, an appropriate event is emitted to reflect the failure.

The manual delivery workflow ensures that users have the flexibility to choose their desired delivery destination for the files. By emitting the "DeliveryInitiatedEvent" and executing the "deliver-file-man" workflow in Temporal, Grepsr streamlines the file delivery process and leverages the Delivery service to handle the intricacies of each delivery destination. This documentation provides an overview of the manual delivery workflow and highlights the key steps involved in delivering files to the chosen destination.