AWS Kinesis is a event-streaming service that is a fully managed, cloud-based service for real-time data processing over large, distributed data streams. We can also emit data from Amazon Kinesis to other AWS services such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elastic Map Reduce (Amazon EMR), and AWS Lambda.
Today I'll demonstrate how to use the Async Kinesis client to post data to kinesis as well as consume it using the Kinesis Client Library (KCL). For the purpose of the demo, it is assumed that data is so small that using only 1 shard is sufficient.
Finally we write our Subscriber for which I'm going to use the KCL (Kinesis Client Library)
Today I'll demonstrate how to use the Async Kinesis client to post data to kinesis as well as consume it using the Kinesis Client Library (KCL). For the purpose of the demo, it is assumed that data is so small that using only 1 shard is sufficient.
Step 1: Adding the required dependencies
We are developing it for a traditional maven based webapp so we will add the kinesis java sdk dependency in our pom file.
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> <version>1.10.11</version> </dependency>
Step 2: Writing up a Publisher class that sends events in an async manner
Next up we write our Publisher implementation that does the following:
- validates that the stream is active
- converts the data to byte array
- finally publishes in an async manner.
@Slf4j public class Publisher { final AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient( new AWSCredentialsProviderChain( new ProfileCredentialsProvider() ) ); public void sendDataToStream(String body, String streamName){ try{ validateStream(streamName); } catch (ResourceNotFoundException e){ log.error(e.getMessage()); } ByteBuffer bytes = ByteBuffer.wrap(body.getBytes()); if(bytes==null){ log.info("Can't convert incoming data to byte array for transmission"); } PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); putRecord.setPartitionKey(UUID.randomUUID().toString().replace("-","")); putRecord.setData(bytes); kinesisAsyncClient.putRecordAsync(putRecord, new AsyncHandler<PutRecordRequest, PutRecordResult>() { @Override public void onError(Exception e) { log.error("Can't publish the record"); } @Override public void onSuccess(PutRecordRequest putRecordRequest, PutRecordResult putRecordResult) { log.info("published successfully"); } }); } private void validateStream(String streamName) { final DescribeStreamResult describeStreamResult = kinesisAsyncClient.describeStream(streamName); if(!describeStreamResult .getStreamDescription() .getStreamStatus().equalsIgnoreCase("ACTIVE")){ throw new ResourceNotFoundException("stream not found in an 'active' stage"); } } }
Finally we write our Subscriber for which I'm going to use the KCL (Kinesis Client Library)
Step 3: Implement a subscriber that consumes the data from the stream
The subscriber class implements the IRecordProcessor interface of the KCL. It provides the initialize, processRecord and shutdown methods that we need to implement to our needs. Following is one way I'm implementing it.
@Slf4j public class Subscriber implements IRecordProcessor { private static final Long REPORTING_INTERVAL_MILLIS = 1000l; private static final Long CHECKPOINT_INTERVAL_MILLIS = 1000l; private String kinesisShardId; private Long nextReportingTimeInMillis; private Long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { log.info("Initializing record processor for shard: " + shardId); this.kinesisShardId = shardId; nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS; nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { for(Record r : records){ processRecord(r); } // If it is time to report stats as per the reporting interval, report stats if (System.currentTimeMillis() > nextReportingTimeInMillis) { nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS; log.info("time to report stats. next report in {}",nextCheckpointTimeInMillis); } // Checkpoint once every checkpoint interval if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(iRecordProcessorCheckpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } } private void processRecord(Record r) { String dataFromBytes = StringUtils.fromByteBuffer(r.getData()); if(dataFromBytes==null){ log.error("Skipping record. Unable to process record, partition key {}",r.getPartitionKey()); return; } } @Override public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) { log.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (shutdownReason == ShutdownReason.TERMINATE) { checkpoint(iRecordProcessorCheckpointer); } }
Step 4: Implement a subscriber record factory which returns an instance of the subscriber we just implemented
This class implements the IRecordProcessorFactory interface which has only a single method to be implemented. As the name suggest it is a factory pattern that returns an implementation of a record processor which is our case will be the above Subscriber
@Slf4j public class SubscriberFactory implements IRecordProcessorFactory { public SubscriberFactory(){ super(); } @Override public IRecordProcessor createProcessor() { return new Subscriber(); } }
Step 5: Writing up a main class that can coordinate the record processing
@Slf4j public class SubscriberWorker { public static void main(String[] args) { String applicationName = args[0]; String streamName = args[1]; Region region = RegionUtils.getRegion(args[2]); if (region == null) { System.err.println(args[2] + " is not a valid AWS region."); System.exit(1); } String workerId = String.valueOf(UUID.randomUUID()); KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(applicationName, streamName, new DefaultAWSCredentialsProviderChain(), workerId) .withRegionName(region.getName()) .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent()); IRecordProcessorFactory recordProcessorFactory = new SubscriberFactory(); // Create the KCL worker with the stock trade record processor factory Worker worker = new Worker(recordProcessorFactory, kclConfig); int exitCode = 0; try { worker.run(); } catch (Throwable t) { log.error("Caught throwable while processing data.", t); exitCode = 1; } System.exit(exitCode); } }
Thats it folks! I've demonstrated how to implement both publisher and subscriber for AWS Kinesis. Thanks for your time for reading this post.
No comments:
Post a Comment