Wednesday, November 8, 2023
HomeBig DataAllow cost-efficient operational analytics with Amazon OpenSearch Ingestion

Allow cost-efficient operational analytics with Amazon OpenSearch Ingestion


As the dimensions and complexity of microservices and distributed purposes continues to develop, prospects are in search of steering for constructing cost-efficient infrastructure supporting operational analytics use instances. Operational analytics is a well-liked use case with Amazon OpenSearch Service. A couple of of the defining traits of those use instances are ingesting a excessive quantity of time collection knowledge and a comparatively low quantity of querying, alerting, and operating analytics on ingested knowledge for real-time insights. Though OpenSearch Service is able to ingesting petabytes of information throughout storage tiers, you continue to should provision capability emigrate between sizzling and heat tiers. This provides to the price of provisioned OpenSearch Service domains.

The time collection knowledge typically accommodates logs or telemetry knowledge from numerous sources with totally different values and desires. That’s, logs from some sources should be out there in a sizzling storage tier longer, whereas logs from different sources can tolerate a delay in querying and different necessities. Till now, prospects had been constructing exterior ingestion methods with the Amazon Kinesis household of providers, Amazon Easy Queue Service (Amazon SQS), AWS Lambda, customized code, and different comparable options. Though these options allow ingestion of operational knowledge with numerous necessities, they add to the price of ingestion.

Generally, operational analytics workloads use anomaly detection to assist area operations. This assumes that the information is already current in OpenSearch Service and the price of ingestion is already borne.

With the addition of some current options of Amazon OpenSearch Ingestion, a completely managed serverless pipeline for OpenSearch Service, you may successfully handle every of those value factors and construct an economical resolution. On this put up, we define an answer that does the next:

  • Makes use of conditional routing of Amazon OpenSearch Ingestion to separate logs with particular attributes and retailer these, for instance, in Amazon OpenSearch Service and archive all occasions in Amazon S3 to question with Amazon Athena
  • Makes use of in-stream anomaly detection with OpenSearch Ingestion, thereby eradicating the fee related to compute wanted for anomaly detection after ingestion

On this put up, we use a VPC circulate logs use case to display the answer. The answer and sample introduced on this put up is equally relevant to bigger operational analytics and observability use instances.

Resolution overview

We use VPC circulate logs to seize IP visitors and set off processing notifications to the OpenSearch Ingestion pipeline. The pipeline filters the information, routes the information, and detects anomalies. The uncooked knowledge shall be saved in Amazon S3 for archival functions, then the pipeline will detect anomalies within the knowledge in near-real time utilizing the Random Reduce Forest (RCF) algorithm and ship these knowledge information to OpenSearch Service. The uncooked knowledge saved in Amazon S3 could be inexpensively retained for an prolonged time frame utilizing tiered storage and queried utilizing the Athena question engine, and in addition visualized utilizing Amazon QuickSight or different knowledge visualization providers. Though this walkthrough makes use of VPC circulate log knowledge, the identical sample applies to be used with AWS CloudTrail, Amazon CloudWatch, any log recordsdata in addition to any OpenTelemetry occasions, and customized producers.

The next is a diagram of the answer that we configure on this put up.

Within the following sections, we offer a walkthrough for configuring this resolution.

The patterns and procedures introduced on this put up have been validated with the present model of OpenSearch Ingestion and the Information Prepper open-source challenge model 2.4.

Stipulations

Full the next prerequisite steps:

  1. We shall be utilizing a VPC for demonstration functions for producing knowledge. Arrange the VPC circulate logs to publish logs to an S3 bucket in textual content format. To optimize S3 storage prices, create a lifecycle configuration on the S3 bucket to transition the VPC circulate logs to totally different tiers or expire processed logs. Make an observation of the S3 bucket identify you configured to make use of in later steps.
  2. Arrange an OpenSearch Service area. Make an observation of the area URL. The area could be both public or VPC primarily based, which is the popular configuration.
  3. Create an S3 bucket for storing archived occasions, and make an observation of S3 bucket identify. Configure a resource-based coverage permitting OpenSearch Ingestion to archive logs and Athena to learn the logs.
  4. Configure an AWS Id and Entry Administration (IAM) position or separate IAM roles permitting OpenSearch Ingestion to work together with Amazon SQS and Amazon S3. For directions, consult with Configure the pipeline position.
  5. Configure Athena or validate that Athena is configured in your account. For directions, consult with Getting began.

Configure an SQS notification

VPC circulate logs will write knowledge in Amazon S3. After every file is written, Amazon S3 will ship an SQS notification to inform the OpenSearch Ingestion pipeline that the file is prepared for processing.

If the information is already saved in Amazon S3, you should utilize the S3 scan functionality for a one-time or scheduled loading of information by way of the OpenSearch Ingestion pipeline.

Use AWS CloudShell to situation the next instructions to create the SQS queues VpcFlowLogsNotifications and VpcFlowLogsNotifications-DLQ that we use for this walkthrough.

Create a dead-letter queue with the next code

export SQS_DLQ_URL=$(aws sqs create-queue --queue-name VpcFlowLogsNotifications-DLQ | jq -r '.QueueUrl')

echo $SQS_DLQ_URL 

export SQS_DLQ_ARN=$(aws sqs get-queue-attributes --queue-url $SQS_DLQ_URL --attribute-names QueueArn | jq -r '.Attributes.QueueArn') 

echo $SQS_DLQ_ARN

Create an SQS queue to obtain occasions from Amazon S3 with the next code:

export SQS_URL=$(aws sqs create-queue --queue-name VpcFlowLogsNotification --attributes '{
"RedrivePolicy": 
"{"deadLetterTargetArn":"'$SQS_DLQ_ARN'","maxReceiveCount":"2"}", 
"Coverage": 
  "{"Model":"2012-10-17","Assertion":[{"Effect":"Allow","Principal":{"Service":"s3.amazonaws.com"}, "Action":"SQS:SendMessage","Resource":"*"}]}" 
}' | jq -r '.QueueUrl')

echo $SQS_URL

To configure the S3 bucket to ship occasions to the SQS queue, use the next code (present the identify of your S3 bucket used for storing VPC circulate logs):

aws s3api put-bucket-notification-configuration --bucket __BUCKET_NAME__ --notification-configuration '{
     "QueueConfigurations": [
         {
             "QueueArn": "'$SQS_URL'",
             "Events": [
                 "s3:ObjectCreated:*"
             ]
         }
     ]
}'

Create the OpenSearch Ingestion pipeline

Now that you’ve got configured Amazon SQS and the S3 bucket notifications, you may configure the OpenSearch Ingestion pipeline.

  1. On the OpenSearch Service console, select Pipelines underneath Ingestion within the navigation pane.
  2. Select Create pipeline.

  1. For Pipeline identify, enter a reputation (for this put up, we use stream-analytics-pipeline).
  2. For Pipeline configuration, enter the next code:
model: "2"
entry-pipeline:
  supply:
     s3:
       notification_type: sqs
       compression: gzip
       codec:
         newline:
       sqs:
         queue_url: "<robust>__SQS_QUEUE_URL__</robust>"
         visibility_timeout: 180s
       aws:
        area: "<robust>__REGION__</robust>"
        sts_role_arn: "<robust>__STS_ROLE_ARN__</robust>"
  
  processor:
  sink:
    - pipeline:
        identify: "archive-pipeline"
    - pipeline:
        identify: "data-processing-pipeline"

data-processing-pipeline:
    supply: 
        pipeline:
            identify: "entry-pipeline"
    processor:
    - grok:
        tags_on_match_failure: [ "grok_match_failure" ]
        match:
          message: [ "%{VPC_FLOW_LOG}" ]
    route:
        - icmp_traffic: '/protocol == 1'
    sink:
        - pipeline:
            identify : "icmp-pipeline"
            routes:
                - "icmp_traffic"
        - pipeline:
            identify: "analytics-pipeline"
    

archive-pipeline:
  supply:
    pipeline:
      identify: entry-pipeline
  processor:
  sink:
    - s3:
        aws:
          area: "<robust>__REGION__</robust>"
          sts_role_arn: "<robust>__STS_ROLE_ARN__</robust>"
        max_retries: 16
        bucket: "<robust>__AWS_S3_BUCKET_ARCHIVE__</robust>"
        object_key:
          path_prefix: "vpc-flow-logs-archive/yr=%{yyyy}/month=%{MM}/day=%{dd}/"
        threshold:
          maximum_size: 50mb
          event_collect_timeout: 300s
        codec:
          parquet:
            auto_schema: true
      
analytics-pipeline:
  supply:
    pipeline:
      identify: "data-processing-pipeline"
  processor:
    - drop_events:
        drop_when: "hasTags("grok_match_failure") or "/log-status" == "NODATA""
    - date:
        from_time_received: true
        vacation spot: "@timestamp"
    - combination:
        identification_keys: ["srcaddr", "dstaddr"]
        motion:
          tail_sampler:
            %: 20.0
            wait_period: "60s"
            situation: '/motion != "ACCEPT"'
    - anomaly_detector:
        identification_keys: ["srcaddr","dstaddr"]
        keys: ["bytes"]
        verbose: true
        mode:
          random_cut_forest:
  sink:
    - opensearch:
        hosts: [ "<strong>__AMAZON_OPENSEARCH_DOMAIN_URL__</strong>" ]
        index: "flow-logs-anomalies"
        aws:
          sts_role_arn: "<robust>__STS_ROLE_ARN__</robust>"
          area: "<robust>__REGION__</robust>"
          
icmp-pipeline:
  supply:
    pipeline:
      identify: "data-processing-pipeline"
  processor:
  sink:
    - opensearch:
        hosts: [ "<strong>__AMAZON_OPENSEARCH_DOMAIN_URL__</strong>" ]
        index: "sensitive-icmp-traffic"
        aws:
          sts_role_arn: "<robust>__STS_ROLE_ARN__</robust>"
          area: "<robust>__REGION__</robust>"</code>

Change the variables within the previous code with sources in your account:

    • __SQS_QUEUE_URL__ – URL of Amazon SQS for Amazon S3 occasions
    • __STS_ROLE_ARN__AWS Safety Token Service (AWS STS) roles for sources to imagine
    • __AWS_S3_BUCKET_ARCHIVE__ – S3 bucket for archiving processed occasions
    • __AMAZON_OPENSEARCH_DOMAIN_URL__ – URL of OpenSearch Service area
    • __REGION__ – Area (for instance, us-east-1)
  1. Within the Community settings part, specify your community entry. For this walkthrough, we’re utilizing VPC entry. We supplied the VPC and personal subnet places which have connectivity with the OpenSearch Service area and safety teams.
  2. Depart the opposite settings with default values, and select Subsequent.
  3. Assessment the configuration adjustments and select Create pipeline.

It is going to take a couple of minutes for OpenSearch Service to provision the setting. Whereas the setting is being provisioned, we’ll stroll you thru the pipeline configuration. Entry-pipeline listens for SQS notifications about newly arrived recordsdata and triggers the studying of VPC circulate log compressed recordsdata:

…
entry-pipeline:
  supply:
     s3:
…

The pipeline branches into two sub-pipelines. The primary shops authentic messages for archival functions in Amazon S3 in read-optimized Parquet format; the opposite applies analytics routes occasions to the OpenSearch Service area for quick querying and alerting:

…
  sink:
    - pipeline:
        identify: "archive-pipeline"
    - pipeline:
        identify: "data-processing-pipeline"
… 

The pipeline archive-pipeline aggregates messages in 50 MB chunks or each 60 seconds and writes a Parquet file to Amazon S3 with the schema inferred from the message. Additionally, a prefix is added to assist with partitioning and question optimization when studying a set of recordsdata utilizing Athena.

…
sink:
    - s3:
…
        object_key:
          path_prefix: " vpc-flow-logs-archive/yr=%{yyyy}/month=%{MM}/day=%{dd}/"
        threshold:
          maximum_size: 50mb
          event_collect_timeout: 300s
        codec:
          parquet:
            auto_schema: true
…

Now that now we have reviewed the fundamentals, we concentrate on the pipeline that detects anomalies and sends solely high-value messages that deviate from the norm to OpenSearch Service. It additionally shops Web Management Message Protocols (ICMP) messages in OpenSearch Service.

We utilized a grok processor to parse the message utilizing a predefined regex for parsing VPC circulate logs, and in addition tagged all unparsable messages with the grok_match_failure tag, which we use to take away headers and different information that may’t be parsed:

…
    processor:
    - grok:
        tags_on_match_failure: [ "grok_match_failure" ]
        match:
          message: [ "%{VPC_FLOW_LOG}" ]
…

We then routed all messages with the protocol identifier 1 (ICMP) to icmp-pipeline and all messages to analytics-pipeline for anomaly detection:

…
   route:
        - icmp_traffic: '/protocol == 1'
    sink:
        - pipeline:
            identify : "icmp-pipeline"
            routes:
                - "icmp_traffic"
        - pipeline:
            identify: "analytics-pipeline"
…

Within the analytics pipeline, we dropped all information that may’t be parsed utilizing the hasTags technique primarily based on the tag that we assigned on the time of parsing. We additionally eliminated all information that don’t accommodates helpful knowledge for anomaly detection.

…
  - drop_events:
        drop_when: "hasTags("grok_match_failure") or "/log-status" == "NODATA""		
…

Then we utilized probabilistic sampling utilizing the tail_sampler processor for all accepted messages grouped by supply and vacation spot addresses and despatched these to the sink with all messages that weren’t accepted. This helps scale back the quantity of messages inside the chosen cardinality keys, with a concentrate on all messages that weren’t accepted, and retains a pattern illustration of messages that had been accepted.

…
combination:
        identification_keys: ["srcaddr", "dstaddr"]
        motion:
          tail_sampler:
            %: 20.0
            wait_period: "60s"
            situation: '/motion != "ACCEPT"'
…

Then we used the anomaly detector processor to establish anomalies inside the cardinality key pairs or supply and vacation spot addresses in our instance. The anomaly detector processor creates and trains RCF fashions for a hashed worth of keys, then makes use of these fashions to find out whether or not newly arriving messages have an anomaly primarily based on the skilled knowledge. In our demonstration, we use bytes knowledge to detect anomalies:

…
anomaly_detector:
        identification_keys: ["srcaddr","dstaddr"]
        keys: ["bytes"]
        verbose: true
        mode:
          random_cut_forest:
…

We set verbose:true to instruct the detector to emit the message each time an anomaly is detected. Additionally, for this walkthrough, we used a non-default sample_size for coaching the mannequin.

When anomalies are detected, the anomaly detector returns an entire file and provides

"deviation_from_expected":worth,"grade":worth attributes that signify the deviation worth and severity of the anomaly. These values can be utilized to find out routing of such messages to OpenSearch Service, and use per-document monitoring capabilities in OpenSearch Service to alert on particular circumstances.

Presently, OpenSearch Ingestion creates as much as 5,000 distinct fashions primarily based on cardinality key values per compute unit. This restrict is noticed utilizing the anomaly_detector.RCFInstances.worth CloudWatch metric. It’s vital to pick a cardinality key-value pair to keep away from exceeding this constraint. As improvement of the Information Prepper open-source challenge and OpenSearch Ingestion continues, extra configuration choices shall be added to supply better flexibility round mannequin coaching and reminiscence administration.

The OpenSearch Ingestion pipeline exposes the anomaly_detector.cardinalityOverflow.rely metric by way of CloudWatch. This metric signifies quite a lot of key worth pairs that weren’t run by the anomaly detection processor throughout a time frame as the utmost variety of RCFInstances per compute unit was reached. To keep away from this constraint, quite a lot of compute items could be scaled out to supply further capability for internet hosting further situations of RCFInstances.

Within the final sink, the pipeline writes information with detected anomalies together with deviation_from_expected and grade attributes to the OpenSearch Service area:

…
sink:
    - opensearch:
        hosts: [ "__AMAZON_OPENSEARCH_DOMAIN_URL__" ]
        index: "anomalies"
…

As a result of solely anomaly information are being routed and written to the OpenSearch Service area, we’re capable of considerably scale back the scale of our area and optimize the price of our pattern observability infrastructure.

One other sink was used for storing all ICMP information in a separate index within the OpenSearch Service area:

…
sink:
    - opensearch:
        hosts: [ "__AMAZON_OPENSEARCH_DOMAIN_URL__" ]
        index: " sensitive-icmp-traffic"
…

Question archived knowledge from Amazon S3 utilizing Athena

On this part, we assessment the configuration of Athena for querying archived occasions knowledge saved in Amazon S3. Full the next steps:

  1. Navigate to the Athena question editor and create a brand new database referred to as vpc-flow-logs-archive-database utilizing the next command:
CREATE DATABASE `vpc-flow-logs-archive`

  1. 2. On the Database menu, select vpc-flow-logs-archive.
  2. Within the question editor, enter the next command to create a desk (present the S3 bucket used for archiving processed occasions). For simplicity, for this walkthrough, we create a desk with out partitions.
CREATE EXTERNAL TABLE `vpc-flow-logs-data`(
  `message` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://__AWS_S3_BUCKET_ARCHIVE__'
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none'
)

  1. Run the next question to validate you can question the archived VPC circulate log knowledge:
SELECT * FROM "vpc-flow-logs-archive"."vpc-flow-logs-data" LIMIT 10;

As a result of archived knowledge is saved in its authentic format, it helps keep away from points associated to format conversion. Athena will question and show information within the authentic format. Nevertheless, it’s excellent to work together solely with a subset of columns or components of the messages. You need to use the regexp_split perform in Athena to separate the message within the columns and retrieve sure columns. Run the next question to see the supply and vacation spot handle groupings from the VPC circulate log knowledge:

SELECT srcaddr, dstaddr FROM (
   SELECT regexp_split(message, ' ')[4] AS srcaddr, 
          regexp_split(message, ' ')[5] AS dstaddr, 
          regexp_split(message, ' ')[14] AS standing  FROM "vpc-flow-logs-archive"."vpc-flow-logs-data" 
) WHERE standing="OK" 
GROUP BY srcaddr, dstaddr 
ORDER BY srcaddr, dstaddr LIMIT 10;

This demonstrated you can question all occasions utilizing Athena, the place archived knowledge in its authentic uncooked format is used for the evaluation. Athena is priced per knowledge scanned. As a result of the information is saved in a read-optimized format and partitioned, it permits additional cost-optimization round on-demand querying of archived streaming and observability knowledge.

Clear up

To keep away from incurring future expenses, delete the next sources created as a part of this put up:

  • OpenSearch Service area
  • OpenSearch Ingestion pipeline
  • SQS queues
  • VPC circulate logs configuration
  • All knowledge saved in Amazon S3

Conclusion

On this put up, we demonstrated easy methods to use OpenSearch Ingestion pipelines to construct a cost-optimized infrastructure for log analytics and observability occasions. We used routing, filtering, aggregation, and anomaly detection in an OpenSearch Ingestion pipeline, enabling you to downsize your OpenSearch Service area and create a cost-optimized observability infrastructure. For our instance, we used a knowledge pattern with 1.5 million occasions with a pipeline distilling to 1,300 occasions with predicted anomalies primarily based on supply and vacation spot IP pairs. This metric demonstrates that the pipeline recognized that lower than 0.1% of occasions had been of excessive significance, and routed these to OpenSearch Service for visualization and alerting wants. This interprets to decrease useful resource utilization in OpenSearch Service domains and may result in provisioning of smaller OpenSearch Service environments.

We encourage you to make use of OpenSearch Ingestion pipelines to create your purpose-built and cost-optimized observability infrastructure that makes use of OpenSearch Service for storing and alerting on high-value occasions. When you’ve got feedback or suggestions, please depart them within the feedback part.


In regards to the Authors

Mikhail Vaynshteyn is a Options Architect with Amazon Net Providers. Mikhail works with healthcare and life sciences prospects to construct options that assist enhance sufferers’ outcomes. Mikhail focuses on knowledge analytics providers.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search purposes and options. Muthu is within the subjects of networking and safety, and is predicated out of Austin, Texas.



Supply hyperlink

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments