Amazon ElastiCache is a fully managed, in-memory caching service by AWS that supports Redis and Memcached engines. It helps improve application performance by reducing database load and delivering data with microsecond latency. With high availability, clustering, and automatic failover, ElastiCache integrates seamlessly with AWS applications.

Example

To create the AWSElasticCacheHandler, provide your AWS credentials (access key, secret key) and region. The handler allows you to retrieve details of ElastiCache clusters, replication groups, subnet groups, security groups, VPCs, and serverless cache configurations.
# elasticache_handler_test.py
import os
from superagentx_handlers.aws.elasticache import AWSElasticCacheHandler

elasticache_handler = AWSElasticCacheHandler(
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    region_name="us-east-1"
)
Get ElastiCache Details:
Aggregates clusters, replication groups, subnet groups, security groups, and VPCs into one JSON summary.
details = await elasticache_handler.get_elastic_cache_details()
print(details)
Get All ElastiCache Clusters:
Fetches metadata for all clusters in the account/region.
clusters = await elasticache_handler.get_clusters()
print(clusters)
Get All ElastiCache Replication Groups:
Retrieves replication group information including member clusters and status.
replication_groups = await elasticache_handler.get_replication_groups()
print(replication_groups)
Get All Cache Subnet Groups:
Retrieves subnet groups and maps them to VPCs.
subnet_groups = await elasticache_handler.get_cache_subnet_groups()
print(subnet_groups)
Update Clusters & Replication Groups with VPC Info:
Enhances cluster and replication group objects with VPC IDs from subnet groups.
await elasticache_handler.update_vpc_info(clusters, replication_groups, subnet_groups)
Get Unique Security Group IDs:
Extracts a unique list of security group IDs from clusters and replication groups.
sg_ids = await elasticache_handler.get_unique_security_group_ids(clusters, replication_groups)
print(sg_ids)
Get Security Group Details:
Retrieves details for each security group (name, rules, VPC association).
sg_details = await elasticache_handler.get_security_groups(security_group_ids=["sg-0123456789abcdef"])
print(sg_details)
Process Inbound Rules:
Parses inbound (ingress) security group rules into a normalized JSON format.
inbound_rules = elasticache_handler.process_inbound_rules(sg["IpPermissions"])
print(inbound_rules)
Process Outbound Rules:
Parses outbound (egress) security group rules into a normalized JSON format.
outbound_rules = elasticache_handler.process_outbound_rules(sg["IpPermissionsEgress"])
print(outbound_rules)
Get Unique VPC IDs:
Extracts unique VPC IDs from clusters, replication groups, and subnet groups.
vpc_ids = await elasticache_handler.get_unique_vpc_ids(clusters, replication_groups, subnet_groups)
print(vpc_ids)
Get VPC Details:
Retrieves details for each VPC (CIDR, state, default flag).
vpc_details = await elasticache_handler.get_vpcs(vpc_ids=["vpc-1234567890abcdef"])
print(vpc_details)