¿Cómo utilizar Reactor Core para la transmisión de datos en Google Cloud Pub/Sub?

Nov 14, 2025Dejar un mensaje

Como proveedor de Reactor Core, me complace compartir con ustedes cómo utilizar Reactor Core para la transmisión de datos en Google Cloud Pub/Sub. Google Cloud Pub/Sub es un servicio de mensajería en tiempo real totalmente administrado que le permite enviar y recibir mensajes entre aplicaciones independientes. Reactor Core, por otro lado, es una biblioteca de programación reactiva para JVM que proporciona una API para crear aplicaciones asincrónicas y basadas en eventos. La combinación de estas dos tecnologías puede aportar importantes beneficios a sus aplicaciones de transmisión de datos.

Entendiendo Google Cloud Pub/Sub

Google Cloud Pub/Sub se basa en un modelo de publicación-suscripción. En este modelo, los editores envían mensajes a temas y los suscriptores reciben mensajes de las suscripciones. Un tema es un canal lógico donde se envían mensajes y una suscripción es un punto final que recibe mensajes de un tema.

Las características clave de Google Cloud Pub/Sub incluyen:

  • Escalabilidad: Puede manejar una gran cantidad de mensajes por segundo, lo que lo hace adecuado para escenarios de transmisión de datos de gran volumen.
  • Fiabilidad: Los mensajes se almacenan de forma duradera y Pub/Sub garantiza la entrega al menos una vez.
  • Flexibilidad: Admite múltiples lenguajes de programación y se puede integrar con varios servicios de Google Cloud.

Por qué utilizar Reactor Core con Google Cloud Pub/Sub

Reactor Core proporciona un modelo de programación reactiva que es muy adecuado para manejar operaciones asincrónicas y sin bloqueo. Cuando se utiliza con Google Cloud Pub/Sub, puede ofrecer las siguientes ventajas:

  • Procesamiento asincrónico: Reactor Core le permite procesar mensajes de Pub/Sub de forma asincrónica, lo que significa que su aplicación puede continuar realizando otras tareas mientras espera mensajes. Esto puede mejorar el rendimiento general de su aplicación.
  • Gestión de contrapresión: Reactor Core tiene soporte de contrapresión incorporado. En el contexto de Pub/Sub, esto significa que si su aplicación no puede procesar los mensajes tan rápido como llegan, puede indicarle a Pub/Sub que reduzca la velocidad de entrega de mensajes.
  • Secuencias componibles: Puede componer fácilmente diferentes flujos reactivos en Reactor Core. Por ejemplo, puede transformar, filtrar o agregar mensajes de Pub/Sub antes de continuar con su procesamiento.

Configurar el entorno

Antes de poder comenzar a usar Reactor Core con Google Cloud Pub/Sub, debe configurar su entorno de desarrollo.

Requisitos previos

  • Cuenta de nube de Google: Debes tener una cuenta de Google Cloud y habilitar la API Pub/Sub.
  • Kit de desarrollo de Java (JDK): Reactor Core es una biblioteca de Java, por lo que debe tener instalado JDK 8 o posterior.
  • Maven o Gradle: Puede utilizar Maven o Gradle para administrar sus dependencias de Java.

Agregar dependencias

Si está utilizando Maven, agregue las siguientes dependencias a supom.xml:

<dependencias> <dependencia> <groupId>io.projectreactor</groupId> <artifactId>reactor - core</artifactId> <versión>3.4.15</versión> </dependencia> <dependencia> <groupId>com.google.cloud</groupId> <artifactId>google - cloud - pubsub</artifactId> <versión>1.122.0</versión> </dependencia> </dependencias>

Si está utilizando Gradle, agregue lo siguiente a suconstruir.gradle:

dependencias {implementación 'io.projectreactor:reactor - core:3.4.15' implementación 'com.google.cloud:google - cloud - pubsub:1.122.0' }

Publicar mensajes en Google Cloud Pub/Sub con Reactor Core

Comencemos con un ejemplo de publicación de mensajes en un tema de Pub/Sub usando Reactor Core.

importar com.google.api.gax.core.CredentialsProvider; importar com.google.api.gax.core.FixedCredentialsProvider; importar com.google.auth.oauth2.GoogleCredentials; importar com.google.cloud.pubsub.v1.Publisher; importar com.google.protobuf.ByteString; importar com.google.pubsub.v1.ProjectTopicName; importar com.google.pubsub.v1.PubsubMessage; importar reactor.core.publisher.Flux; importar java.io.FileInputStream; importar java.io.IOException; importar java.util.UUID; public class PubSubPublisherExample { public static void main(String[] args) throws IOException { // Configurar credenciales GoogleCredentials credenciales = GoogleCredentials.fromStream(new FileInputStream("ruta/a/sus/credenciales.json")); CredentialsProvider credencialesProvider = FixCredentialsProvider.create(credenciales); // Crea un nombre de tema ProjectTopicName topicName = ProjectTopicName.of("tu - proyecto - id", "tu - tema - nombre"); // Crear un editor Editor editor = Publisher.newBuilder(topicName) .setCredentialsProvider(credentialsProvider) .build(); // Crear un flujo de mensajes Flux<PubsubMessage> messageFlux = Flux.range(1, 10) .map(i -> { String messageId = UUID.randomUUID().toString(); ByteString data = ByteString.copyFromUtf8("Message " + i + " with ID: " + messageId); return PubsubMessage.newBuilder() .setData(data) .putAttributes("mensajeId", mensajeId) .build() }); // Publicar mensajes messageFlux.flatMap(message -> { return Publisher.publish(message).toFuture().thenApply(resultado -> { System.out.println("Mensaje publicado con ID: " + resultado); return resultado; }); }).subscribe(); // Cerrar el editor editor.shutdown(); } }

En este ejemplo, primero configuramos las credenciales de Google Cloud. Luego creamos unEditorobjeto para el tema especificado. Usamos unFlujopara generar un flujo de mensajes. Luego, cada mensaje se publica de forma asincrónica utilizando elplanoMapaoperador.

Silicon Steel Iron Core factoryReactor Core

Suscribirse a Google Cloud Pub/Sub con Reactor Core

Ahora, veamos cómo suscribirse a una suscripción Pub/Sub y procesar mensajes usando Reactor Core.

importar com.google.api.gax.core.CredentialsProvider; importar com.google.api.gax.core.FixedCredentialsProvider; importar com.google.auth.oauth2.GoogleCredentials; importar com.google.cloud.pubsub.v1.AckReplyConsumer; importar com.google.cloud.pubsub.v1.MessageReceiver; importar com.google.cloud.pubsub.v1.Subscriber; importar com.google.pubsub.v1.ProjectSubscriptionName; importar com.google.pubsub.v1.PubsubMessage; importar reactor.core.publisher.Flux; importar reactor.core.publisher.Mono; importar java.io.FileInputStream; importar java.io.IOException; importar java.util.concurrent.atomic.AtomicInteger; public class PubSubSubscriberExample { public static void main(String[] args) throws IOException { // Configurar credenciales GoogleCredentials credenciales = GoogleCredentials.fromStream(new FileInputStream("ruta/a/su/credenciales.json")); CredentialsProvider credencialesProvider = FixCredentialsProvider.create(credenciales); // Crea un nombre de suscripción NombreDeSuscripciónProyectonombreSuscripción = NombreDeSuscripciónProyecto.of("tu - proyecto - id", "tu - suscripción - nombre"); // Crea un receptor de mensajes personalizado AtomicInteger counter = new AtomicInteger(0); Flux<PubsubMessage> messageFlux = Flux.create(sink -> { MessageReceiver receptor = (mensaje, consumidor) -> { fregadero.next(message); consumer.ack(); }; Suscriptor suscriptor = Subscriber.newBuilder(subscriptionName, receptor) .setCredentialsProvider(credentialsProvider) .build(); suscriptor.startAsync().awaitRunning(); fregadero.onCancel(suscriptor::stopAsync }); // Procesar mensajes messageFlux.subscribe(message -> { System.out.println("Mensaje recibido: " + message.getData().toStringUtf8()); counter.incrementAndGet(); System.out.println("Mensajes totales recibidos: " + counter.get()); }); } }

En este ejemplo, creamos un personalizadoReceptor de mensajesque emite los mensajes recibidos a unFlujo. ElFlujoLuego se suscribe y cada mensaje se imprime en la consola. También realizamos un seguimiento del número total de mensajes recibidos.

Casos de uso avanzados

Transformación y agregación de mensajes

Reactor Core le permite transformar y agregar mensajes fácilmente desde Pub/Sub. Por ejemplo, puede convertir los datos del mensaje de JSON a un objeto Java o puede agregar mensajes durante un período de tiempo determinado.

importar com.google.pubsub.v1.PubsubMessage; importar reactor.core.publisher.Flux; importar java.time.Duration; clase pública MessageTransformationExample { public static void main(String[] args) { Flux<PubsubMessage> messageFlux = getMessageFluxFromPubSub(); Flujo<String> transformadoFlux = messageFlux .map(message -> message.getData().toStringUtf8()) .map(text -> text.toUpperCase()); Flujo<Integer> agregadoFlux = transformadoFlux .bufferTimeout(10, Duration.ofSeconds(5)) .map(list -> list.size()); agregadoFlux.subscribe(count -> System.out.println("Número de mensajes en la ventana: " + count)); } private static Flux<PubsubMessage> getMessageFluxFromPubSub() { // Código para obtener el flujo de mensajes de Pub/Sub return Flux.empty(); } }

En este ejemplo, primero transformamos cada mensaje a mayúsculas. Luego agregamos los mensajes en un búfer de 10 mensajes o una ventana de tiempo de 5 segundos y contamos la cantidad de mensajes en cada ventana.

Conclusión

El uso de Reactor Core para la transmisión de datos en Google Cloud Pub/Sub puede aportar importantes beneficios a sus aplicaciones. Proporciona una forma asincrónica y reactiva de manejar mensajes, lo que puede mejorar el rendimiento y la escalabilidad de sus soluciones de transmisión de datos. Ya sea que esté publicando o suscribiéndose a Pub/Sub, Reactor Core ofrece una API potente y flexible para administrar el flujo de datos.

Si estás interesado en usarNúcleo del reactorpara sus proyectos de Google Cloud Pub/Sub o explorar otras soluciones relacionadas comoNúcleo de hierro y acero al silicio, no dude en contactarnos para adquisiciones y discusiones adicionales. Estamos comprometidos a proporcionar productos y servicios de alta calidad para satisfacer sus necesidades.

Referencias

  • Documentación de Google Cloud Pub/Sub
  • Documentación del núcleo del reactor