send messages from aurora mysql to sqs

I have two lambdas and an SQS queue in between. The first lambda’s purpose is to pick product ids from aurora MySQL and send to SQS. There is over 7 million product ids. When the first lambda sends these product ids to SQS, I have enabled a trigger which invokes my second lambda.

The issue I am facing is that my first lambda is not able to send all product ids to queue in 1 invocation due to the time limits of lambda. I tested it and for 1 invocation it was able to send only 100k records to SQS. If I run it again obviously it will again pick the same product ids. Even if I put a limit and offset in my lambda then after 1st invocation I’ll have to change offset to pick the next 100k records, this is a bit tedious. How can I automate this process?

Edit: Ok so I tried the solution suggested. What I did was that I am uploading a csv file to s3 through which Lambda gets triggered and picks the initial limit, offset and count. Then after sending the messages to the sqs, the offset and limit get updated and this object is uploaded to S3 again and the Lambda gets triggered by updated limit and offset values. But what I have observed is sometimes it just runs perfectly and sometimes it just does not update the limit and offset. Any suggestions where I am wrong?

Lambda Handler

public Context handleRequest(S3Event s3event, Context context) 
    {   
        try 
        {
        Gson gson = new GsonBuilder().setPrettyPrinting().create();
        System.out.println("welcome to lambda");
        S3EventNotificationRecord record = s3event.getRecords().get(0);

         String bucket = record.getS3().getBucket().getName();
         // Object key may have spaces or unicode non-ASCII characters.
         String key = record.getS3().getObject().getUrlDecodedKey();
         //logger
         LambdaLogger logger = context.getLogger();
         logger.log("EVENT: " + gson.toJson(s3event));
         
         // Read the source file as text
         AmazonS3 s3Client =  AmazonS3ClientBuilder
                                .standard()
                                .build();
         
         String body = s3Client.getObjectAsString(bucket, key);
         System.out.println("Body: " + body);
         String element[] = body.split(",");
         int limit = Integer.parseInt(element[0]);
         int offset = Integer.parseInt(element[1]);
         int count = Integer.parseInt(element[2]);
         
        System.out.println("incoming:  limit = "+limit+" offset= " + offset + " count= " + count);         
        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + dbName + "?user=" + userName + "&password=" + password;
        String product_id;
        int maxCount = 0;
        AmazonSQS sqs =  AmazonSQSClientBuilder.defaultClient();
        
        Connection conn = DriverManager.getConnection(url, userName, password);
        
            if (conn != null) 
                {
                    System.out.println("Connected to the database");
                }   

            Statement stmt1 = conn.createStatement();  
            
            //for max count
            ResultSet rs1=stmt1.executeQuery("select count(*) from products"); 
            while(rs1.next())
                {
                    maxCount = rs1.getInt(1);
                }
            
            System.out.println("maxcount is: " + maxCount);
            
            Statement stmt2 = conn.createStatement();  
        
            ResultSet rs2 = stmt2.executeQuery("select productId from products limit "+ limit +" offset " + offset );
            while (rs2.next())
                {
                    product_id = rs2.getString(1);
                    String type = "product_id";
                    Test msgbody = new Test(product_id, type,count);
                    System.out.println("msgbody: " + msgbody.toString());
                    //send sqs message
            
                    SendMessageRequest sendMessageRequest = new SendMessageRequest()
                                                    .withQueueUrl("myQueueUrl")
                                                    .withMessageBody(msgbody.toString())
                                                    .withDelaySeconds(1);
            
                    sqs.sendMessage(sendMessageRequest);
                    count++;
            
                }   
            conn.close();
        
         String pString = "test";
         S3Writer s3Writer = new S3Writer();
         
         if(limit + offset <= maxCount)
         {
             
             if(limit + offset == maxCount)
             {
                 System.out.println("break lambda : limit+offset == maxcount");
                 return null;
                 
             }
             
             else {
                 //process messages -->send updated offset to csv
                offset = offset + limit;
                System.out.println("new offset: " + offset);
                State state = new State(String.valueOf(limit), String.valueOf(offset), String.valueOf(count));     
                System.out.println("state obj: "+ state.toString());
                s3Writer.writeCsvToS3(pString, "limit", state.toString());
            }
         }
         
         else if (maxCount < offset + limit)
         {
            int newLimit = maxCount - offset;
            System.out.println("new limit: "+ newLimit);
            
                if(newLimit == 0)
                    {
                    System.out.println("break lambda : new limit is zero");
                        return null;
                    }
                else 
                    {
                        
                        limit = newLimit;
                        State state = new State(String.valueOf(limit), String.valueOf(offset), String.valueOf(count));  
                        System.out.println("state obj: "+ state.toString());
                        s3Writer.writeCsvToS3(pString, "limit", state.toString());
                    }
        }
         
         else {
            System.out.println("break lamda: cause unknown");
            return null;
        }
        

WriteCsvToS3 method

 public void writeCsvToS3( String path, String name, String csv ) {
            Date currentDate = new Date();
            String date = FORMAT_DATE.format(currentDate);
            //file name
            String s3FileName = name + ".csv";
            //folder
            String s3FilePath = path + "/" + date + "/" + s3FileName;

            InputStream is = new ByteArrayInputStream(csv.getBytes());
            ObjectMetadata metadata = new ObjectMetadata();
            metadata.setContentType("plain/text");
            metadata.addUserMetadata("title", "someTitle");
            PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, s3FilePath, is, metadata);
            UploadFile(s3, putObjectRequest);
        }

enter image description here

enter image description here

Answer

Have you tried writing to s3 a csv file that stores the latest index/productid you have sent to SQS, which you will eventually access at the start of the next iteration of your lambda?

Here’s a rough implementation of the steps:

  1. Load latest index/productid from s3
  2. [Any other processes that you do]
  3. Rewrite csv file on s3 that stores latest index/productid

Leave a Reply

Your email address will not be published. Required fields are marked *