diff --git a/spring/pom.xml b/spring/pom.xml
index 038fecb8d..10e3192a7 100644
--- a/spring/pom.xml
+++ b/spring/pom.xml
@@ -32,7 +32,7 @@
io.cloudevents.spring
- 2.4.3
+ 2.7.18
@@ -63,6 +63,11 @@
spring-messaging
true
+
+ org.springframework.amqp
+ spring-rabbit
+ true
+
io.cloudevents
cloudevents-core
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java
new file mode 100644
index 000000000..079b69f7d
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventContext;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.support.converter.MessageConverter;
+
+/**
+ * A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}.
+ * The {@link CloudEventContext} is canonicalized, with key names given a {@code cloudEvents_} prefix in the
+ * {@link MessageProperties}.
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation
+ */
+public class CloudEventMessageConverter implements MessageConverter {
+
+ @Override
+ public CloudEvent fromMessage(Message message) {
+ return createMessageReader(message).toEvent();
+ }
+
+ @Override
+ public Message toMessage(Object object, MessageProperties messageProperties) {
+ if (object instanceof CloudEvent) {
+ CloudEvent event = (CloudEvent) object;
+ return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties));
+ }
+ return null;
+ }
+
+ private MessageReader createMessageReader(Message message) {
+ return MessageUtils.parseStructuredOrBinaryMessage(
+ () -> contentType(message.getMessageProperties()),
+ format -> structuredMessageReader(message, format),
+ () -> version(message.getMessageProperties()),
+ version -> binaryMessageReader(message, version)
+ );
+ }
+
+ private String version(MessageProperties properties) {
+ Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION);
+ if (header == null) {
+ header = properties.getHeader(CloudEventsHeaders.ALT_SPEC_VERSION);
+ }
+ return header == null ? null : header.toString();
+ }
+
+ private MessageReader binaryMessageReader(Message message, SpecVersion version) {
+ return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody());
+ }
+
+ private MessageReader structuredMessageReader(Message message, EventFormat format) {
+ return new GenericStructuredMessageReader(format, message.getBody());
+ }
+
+ private String contentType(MessageProperties properties) {
+ String contentType = properties.getContentType();
+ if (contentType == null) {
+ Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE);
+ if (header == null) {
+ header = properties.getHeader(CloudEventsHeaders.ALT_CONTENT_TYPE);
+ }
+ return header == null ? null : header.toString();
+ }
+ return contentType;
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java
new file mode 100644
index 000000000..537c9382b
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cloudevents.spring.amqp;
+
+/**
+ * Constants used throughout the Spring AMQP binding for cloud events.
+ */
+public class CloudEventsHeaders {
+
+ /**
+ * CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section.
+ *
+ * @see
+ * AMQP Protocol Binding for CloudEvents
+ */
+ public static final String CE_PREFIX = "cloudEvents_";
+ /**
+ * CloudEvents AMQP consumers SHOULD understand the "cloudEvents" prefix with both the '_' and the ':' separators as permitted within the constraints of the client model.
+ *
+ * @see
+ * AMQP Protocol Binding for CloudEvents
+ */
+ public static final String ALT_CE_PREFIX = "cloudEvents:";
+ /**
+ * The spec version header name.
+ */
+ public static final String SPEC_VERSION = CE_PREFIX + "specversion";
+ /**
+ * The alternative spec version header name.
+ */
+ public static final String ALT_SPEC_VERSION = ALT_CE_PREFIX + "specversion";
+ /**
+ * The data content-type header name.
+ */
+ public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
+
+ /**
+ * The alternative data content-type header name.
+ */
+ public static final String ALT_CONTENT_TYPE = ALT_CE_PREFIX + "datacontenttype";
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java
new file mode 100644
index 000000000..a69877bca
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
+import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE;
+
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.impl.StringUtils;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts.
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation
+ */
+class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl {
+
+ private final Map headers;
+
+ public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) {
+ super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
+ this.headers = properties.getHeaders();
+ }
+
+ @Override
+ protected boolean isContentTypeHeader(String key) {
+ return CONTENT_TYPE.equalsIgnoreCase(key);
+ }
+
+ @Override
+ protected boolean isCloudEventsHeader(String key) {
+ return key != null && key.length() > CE_PREFIX.length() && (StringUtils.startsWithIgnoreCase(key, CE_PREFIX)
+ || StringUtils.startsWithIgnoreCase(key, ALT_CE_PREFIX));
+ }
+
+ @Override
+ protected String toCloudEventsKey(String key) {
+ return key.substring(CE_PREFIX.length()).toLowerCase();
+ }
+
+ @Override
+ protected void forEachHeader(BiConsumer fn) {
+ headers.forEach(fn);
+ }
+
+ @Override
+ protected String toCloudEventsValue(Object value) {
+ return value.toString();
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java
new file mode 100644
index 000000000..f51046133
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * Internal utility class for copying CloudEvent
context to {@link MessageProperties} (message
+ * headers).
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation
+ */
+class MessageBuilderMessageWriter implements CloudEventWriter, MessageWriter {
+
+ private final Map headers = new HashMap<>();
+
+ public MessageBuilderMessageWriter(MessageProperties properties) {
+ this.headers.putAll(properties.getHeaders());
+ }
+
+ @Override
+ public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
+ headers.put(CONTENT_TYPE, format.serializedContentType());
+ return MessageBuilder.withBody(value).copyHeaders(headers).build();
+ }
+
+ @Override
+ public Message end(CloudEventData value) throws CloudEventRWException {
+ return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
+ }
+
+ @Override
+ public Message end() {
+ return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build();
+ }
+
+ @Override
+ public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+ headers.put(CE_PREFIX + name, value);
+ return this;
+ }
+
+ @Override
+ public MessageBuilderMessageWriter create(SpecVersion version) {
+ headers.put(SPEC_VERSION, version.toString());
+ return this;
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java b/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java
new file mode 100644
index 000000000..83706d3e8
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring Amqp.
+ */
+package io.cloudevents.spring.amqp;
diff --git a/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java b/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java
new file mode 100644
index 000000000..f70690513
--- /dev/null
+++ b/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static org.assertj.core.api.Assertions.*;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.rw.CloudEventRWException;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.CloudEventMessageConverterTests used as stencil for the implementation
+ */
+class CloudEventMessageConverterTests {
+
+ private static final String JSON = "{\"specversion\":\"1.0\"," //
+ + "\"id\":\"12345\"," //
+ + "\"source\":\"https://spring.io/events\"," //
+ + "\"type\":\"io.spring.event\"," //
+ + "\"datacontenttype\":\"application/json\"," //
+ + "\"data\":{\"value\":\"Dave\"}" //
+ + "}";
+
+ private final CloudEventMessageConverter converter = new CloudEventMessageConverter();
+
+ @Test
+ void noSpecVersion() {
+ Message message = MessageBuilder.withBody(new byte[0]).build();
+ assertThatExceptionOfType(CloudEventRWException.class).isThrownBy(() -> {
+ assertThat(converter.fromMessage(message)).isNull();
+ });
+ }
+
+ @Test
+ void notValidCloudEvent() {
+ Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0").build();
+ assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
+ assertThat(converter.fromMessage(message)).isNull();
+ });
+ }
+
+ @Test
+ void validCloudEvent() {
+ Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0")
+ .setHeader("cloudEvents_id", "12345").setHeader("cloudEvents_source", "https://spring.io/events")
+ .setHeader("cloudEvents_type", "io.spring.event").build();
+ CloudEvent event = converter.fromMessage(message);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
+ void structuredCloudEvent() {
+ byte[] payload = JSON.getBytes(StandardCharsets.UTF_8);
+ Message message = MessageBuilder.withBody(payload)
+ .setContentType("application/cloudevents+json").build();
+ CloudEvent event = converter.fromMessage(message);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
+ void fromCloudEvent() {
+ CloudEvent attributes = CloudEventBuilder.v1().withId("A234-1234-1234")
+ .withSource(URI.create("https://spring.io/")).withType("org.springframework")
+ .withData("hello".getBytes(StandardCharsets.UTF_8)).build();
+ Message message = converter.toMessage(attributes, new MessageProperties());
+ Map headers = message.getMessageProperties().getHeaders();
+ assertThat(headers.get("cloudEvents_id")).isEqualTo("A234-1234-1234");
+ assertThat(headers.get("cloudEvents_specversion")).isEqualTo("1.0");
+ assertThat(headers.get("cloudEvents_source")).isEqualTo("https://spring.io/");
+ assertThat(headers.get("cloudEvents_type")).isEqualTo("org.springframework");
+ assertThat("hello".getBytes(StandardCharsets.UTF_8)).isEqualTo(message.getBody());
+ }
+
+ @Test
+ void fromNonCloudEvent() {
+ assertThat(converter.toMessage(new byte[0], new MessageProperties())).isNull();
+ }
+}
diff --git a/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java b/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java
index 53477b5c7..2e89441a6 100644
--- a/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java
+++ b/spring/src/test/java/io/cloudevents/spring/mvc/MvcRestControllerTests.java
@@ -29,7 +29,7 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
diff --git a/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java b/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java
index 33ec82689..9bef18c68 100644
--- a/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java
+++ b/spring/src/test/java/io/cloudevents/spring/webflux/WebFluxRestControllerTests.java
@@ -30,7 +30,7 @@
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.codec.CodecCustomizer;
-import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
@@ -198,4 +198,4 @@ public String toString() {
return "Foo [value=" + this.value + "]";
}
-}
\ No newline at end of file
+}