Processing large streams of data records and ananylze with AWS Kinesis, AWS Athena

2022/01/09 12:48
#Video Log #AWS Kinesis #Streaming Data


Our service provides a learning management service for our users through logs of learning time. This log must be stacked every 30 seconds and most of the request in our service is this works. This is likely to make incidents due to many traffic and unfortunately, it happened last week. we scaled up a class of the instance and change the CGI configuration, but we should migrate this request to another place to resolve the root cause.
uwsgi[13273]: [pid: 13292|app: 0|req: 2826/352183] () {44 vars in 778 bytes} [Mon Jan 10 11:30:50 2022] POST /v1/video/player/kollus/notify => generated 0 bytes in 224 msecs (HTTP/1.1 200) 6 headers in 0 bytes (0 switches on core 0) uwsgi[13273]: Mon Jan 10 11:30:50 2022 - SIGPIPE: writing to a closed pipe/socket/fd (probably the client disconnected) on request /v1/video/player/kollus/notify (ip !!! Jan 13 06:15:24 ip-172-31-46-126 uwsgi[26570]: OSError: write error uwsgi_response_writev_headers_and_body_do(): Broken pipe [core/writer.c line 296] during GET / (
The problems are expected that the requests of log callback disconnect before the response. This makes an insufficient number of sockets to connect new requests.

AWS Kinesis

Definition : Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. Amazon Kinesis offers key capabilities to cost-effectively process streaming data at any scale, along with the flexibility to choose the tools that best suit the requirements of your application. With Amazon Kinesis, you can ingest real-time data such as video, audio, application logs, website clickstreams, and IoT telemetry data for machine learning, analytics, and other applications. Amazon Kinesis enables you to process and analyze data as it arrives and responds instantly instead of having to wait until all your data is collected before the processing can begin.
AWS kinesis help to process streaming data for us. But What is the streaming data? The mean of streaming data is continuously and sequentially generated by different sources. Such data should be processed incrementally using stream processing techniques without having access to all of the data. And According to the definition, the video learning data which should be logged in our services is also streaming data.
AWS kinesis consists of some kind of parts, I used Kinesis Data Streams and Kinesis Data Firehose

Kinesis Data Streams

Kinesis Data Streams helps to collect and process large streams of data records for us.
Kinesis Data Streams get pushed data by producers applications like mobile apps, web frontend requests, and, else. This prevents the log data from being lost if the front end or application server fails so we can save lots of huge data without any loss. and then this can send the consumer application like AWS lambda, EC2 implemented by proper interfaces, and Kinesis Data Firehose. Data is put into Kinesis data streams, which ensures durability and elasticity. Also, there could be multiple consumer applications. This data is stored as the unit called data record. Data records are composed of a sequence number, a partition key, and a data blob, which is an immutable sequence of bytes. Kinesis Data Streams does not inspect, interpret, or change the data in the blob in any way. A data blob can be up to 1 MB. In the pictures, Kinesis Data Streams is composed of one or more shards, each of which provides a fixed unit of capacity. Shards are important parts of requests capacity in Kinesis Data Streams, so we should properly control it.

Kinesis Data Firehose

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers, including Datadog, LogicMonitor, MongoDB, etc.
This also is called Delivery streams and you should set sources, data transformation, and destination. The source can set one of the Data Streams and the destination can set one of the above lists like S3, MongoDB, etc. Data transformation work the transform delivered data using the things like AWS lambda.
The Data transformation part can convert record format, and in my case, I changed output format to Apache Parquet.
Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. There are three advantages. First, the compression rate is better. Second, Reducing I/O usage. Last, We can adjust proper encoding at each column.

AWS Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
Athena is easy to use. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL. Most results are delivered within seconds. With Athena, there’s no need for complex ETL jobs to prepare your data for analysis. This makes it easy for anyone with SQL skills to quickly analyze large-scale datasets.
Athena is out-of-the-box integrated with AWS Glue Data Catalog, allowing you to create a unified metadata repository across various services, crawl data sources to discover schemas and populate your Catalog with new and modified table and partition definitions, and maintain schema versioning.
You can query using SQL on the dashboard.
You should select the data source, database, and partitioned table to query the data you want. These data sources are managed in AWS Glue.

AWS Glue

AWS Glue is a serverless data integration service(ETL) that makes it easy to discover, prepare, and combine data for analytics, and data integration is the process of preparing and combining data for analytics, machine learning, and application development. It involves multiple tasks, such as discovering and extracting data from various sources; enriching, cleaning, normalizing, and combining data; and loading and organizing data in databases, data warehouses, and data lakes. The Data Catalog is a central repository that saves all metadata of operation data assets. You should create the database and the table, and then you should define properties of the table column such as type, name, and some configuration. The table can be divided into partitions.
Partitioning is important technical to consist of data set for efficiency querying in the various big data system. The data consists of a hierarchical directory structure based on the unique value of one or more columns. For example, the application log can be divided by partition based on the date(year, month, day) in Amazon S3. In this case, The log data save on prefix like s3://my_bucket/logs/year=2018/month=01/day=23/ in a day.
So we can query the part of data we want, so we can save lots of time and costs. and These partitions are made by AWS glue crawler. and data stored in these ways, can query in AWS Athena!


We will not lose the learning data log anymore. Also, a lot of requests migrate from the backend server to AWS kinesis, so then the backend server has become stable. However, since we have a new burden that one of the independent servers to be managed, it is necessary to manage and monitor tools such as cloudwatch and alarms for it.
It is likely to happen the error that creates logs, so the alarms are really necessary!