Saturday, August 15, 2015

Kinesis Async Publisher and Subscriber

AWS Kinesis is a event-streaming service that is a fully managed, cloud-based service for real-time data processing over large, distributed data streams. We can also emit data from Amazon Kinesis to other AWS services such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elastic Map Reduce (Amazon EMR), and AWS Lambda.

Today I'll demonstrate how to use the Async Kinesis client to post data to kinesis as well as consume it using the Kinesis Client Library (KCL). For the purpose of the demo, it is assumed that data is so small that using only 1 shard is sufficient.

Step 1: Adding the required dependencies

We are developing it for a traditional maven based webapp so we will add the kinesis java sdk dependency in our pom file.

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.10.11</version>
</dependency>



Step 2: Writing up a Publisher class that sends events in an async manner


Next up we write our Publisher implementation that does the following:

  • validates that the stream is active
  • converts the data to byte array
  • finally publishes in an async manner.


@Slf4j
public class Publisher {
 final AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(
   new AWSCredentialsProviderChain(
     new ProfileCredentialsProvider()
   )
 );

 public void sendDataToStream(String body, String streamName){
  try{
   validateStream(streamName);
  } catch (ResourceNotFoundException e){
   log.error(e.getMessage());
   
  }
  ByteBuffer bytes = ByteBuffer.wrap(body.getBytes());
  if(bytes==null){
   log.info("Can't convert incoming data to byte array for transmission");
  }
  PutRecordRequest putRecord = new PutRecordRequest();
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(UUID.randomUUID().toString().replace("-",""));
  putRecord.setData(bytes);

  kinesisAsyncClient.putRecordAsync(putRecord,
    new AsyncHandler<PutRecordRequest, PutRecordResult>() {
   @Override
   public void onError(Exception e) {
    log.error("Can't publish the record");
   }

   @Override
   public void onSuccess(PutRecordRequest putRecordRequest,
          PutRecordResult putRecordResult) {
    log.info("published successfully");
   }
  });
 }

 private void validateStream(String streamName) {
  final DescribeStreamResult describeStreamResult
    = kinesisAsyncClient.describeStream(streamName);
  if(!describeStreamResult
    .getStreamDescription()
    .getStreamStatus().equalsIgnoreCase("ACTIVE")){
   throw new ResourceNotFoundException("stream not found in an 'active' stage");
  }
 }
}

Finally we write our Subscriber for which I'm going to use the KCL (Kinesis Client Library)

Step 3: Implement a subscriber that consumes the data from the stream


The subscriber class implements the IRecordProcessor interface of the KCL. It provides the initialize, processRecord and shutdown methods that we need to implement to our needs. Following is one way I'm implementing it.



@Slf4j
public class Subscriber implements IRecordProcessor {
 private static final Long REPORTING_INTERVAL_MILLIS = 1000l;
 private static final Long CHECKPOINT_INTERVAL_MILLIS = 1000l;
 private String kinesisShardId;
 private Long nextReportingTimeInMillis;
 private Long nextCheckpointTimeInMillis;

 @Override
 public void initialize(String shardId) {
  log.info("Initializing record processor for shard: " + shardId);
  this.kinesisShardId = shardId;
  nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
  nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
 }

 @Override
 public void processRecords(List<Record> records,
          IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
  for(Record r : records){
   processRecord(r);
  }
  // If it is time to report stats as per the reporting interval, report stats
  if (System.currentTimeMillis() > nextReportingTimeInMillis) {
   nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
   log.info("time to report stats. next report in {}",nextCheckpointTimeInMillis);
  }

  // Checkpoint once every checkpoint interval
  if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
   checkpoint(iRecordProcessorCheckpointer);
   nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
  }
 }

 private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
  log.info("Checkpointing shard " + kinesisShardId);
  try {
   checkpointer.checkpoint();
  } catch (ShutdownException se) {
   // Ignore checkpoint if the processor instance has been shutdown (fail over).
   log.info("Caught shutdown exception, skipping checkpoint.", se);
  } catch (ThrottlingException e) {
   // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
   log.error("Caught throttling exception, skipping checkpoint.", e);
  } catch (InvalidStateException e) {
   // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
   log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
  }
 }

 private void processRecord(Record r) {
  String dataFromBytes = StringUtils.fromByteBuffer(r.getData());
  if(dataFromBytes==null){
   log.error("Skipping record. Unable to process record, partition key {}",r.getPartitionKey());
   return;
  }
 }

 @Override
 public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer,
       ShutdownReason shutdownReason) {
  log.info("Shutting down record processor for shard: " + kinesisShardId);
  // Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
  if (shutdownReason == ShutdownReason.TERMINATE) {
   checkpoint(iRecordProcessorCheckpointer);
  }
 }

Step 4: Implement a subscriber record factory which returns an instance of the subscriber we just implemented

This class implements the IRecordProcessorFactory interface which has only a single method to be implemented. As the name suggest it is a factory pattern that returns an implementation of a record processor which is our case will be the above Subscriber




@Slf4j
public class SubscriberFactory implements IRecordProcessorFactory {

 public SubscriberFactory(){
  super();
 }
 @Override
 public IRecordProcessor createProcessor() {
  return new Subscriber();
 }
}


Step 5: Writing up a main class that can coordinate the record processing




@Slf4j
public class SubscriberWorker {

 public static void main(String[] args) {
  String applicationName = args[0];
  String streamName = args[1];
  Region region = RegionUtils.getRegion(args[2]);
  if (region == null) {
   System.err.println(args[2] + " is not a valid AWS region.");
   System.exit(1);
  }



  String workerId = String.valueOf(UUID.randomUUID());
  KinesisClientLibConfiguration kclConfig =
    new KinesisClientLibConfiguration(applicationName, streamName, 
      new DefaultAWSCredentialsProviderChain(), workerId)
      .withRegionName(region.getName())
      .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());

  IRecordProcessorFactory recordProcessorFactory = new SubscriberFactory();

  // Create the KCL worker with the stock trade record processor factory
  Worker worker = new Worker(recordProcessorFactory, kclConfig);

  int exitCode = 0;
  try {
   worker.run();
  } catch (Throwable t) {
   log.error("Caught throwable while processing data.", t);
   exitCode = 1;
  }
  System.exit(exitCode);
 }
}

Thats it folks! I've demonstrated how to implement both publisher and subscriber for AWS Kinesis. Thanks for your time for reading this post.

No comments:

Post a Comment