Commit ea8581eb authored by Artem Hrybeniuk's avatar Artem Hrybeniuk
Browse files

WIP: Firehose notification system

parent ba0c9556
/*
* Copyright 2022 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.api.v1.impl;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import org.apache.commons.lang3.StringUtils;
import org.gringlobal.api.exception.InvalidApiUsageException;
import org.gringlobal.api.v1.ApiBaseController;
import org.gringlobal.component.firehose.FirehoseSubscription;
import org.gringlobal.model.SysUser;
import org.gringlobal.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Set;
@RestController("firehoseNotificationApi1")
@RequestMapping(FirehoseNotificationController.API_URL)
@Api(tags = { "firehosev1" })
public class FirehoseNotificationController {
private static final Logger LOG = LoggerFactory.getLogger(FirehoseNotificationController.class);
/** The Constant API_URL. */
public static final String API_URL = ApiBaseController.APIv1_BASE + "/firehose";
@Resource(name = "subscriptionSet")
private Set<FirehoseSubscription> subscriptionSet;
@Autowired
private UserService userService;
@PostMapping(path = "/subscribe")
@PreAuthorize("isAuthenticated()")
@Operation(operationId = "subscribe", description = "Subscribe for specific notification")
public FirehoseSubscription subscribe(@RequestBody final FirehoseSubscription firehoseSubscription) {
validateSubscription(firehoseSubscription);
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication instanceof OAuth2Authentication) {
final Authentication userAuth = ((OAuth2Authentication) authentication).getUserAuthentication();
if (userAuth != null) {
final SysUser currentUser = (SysUser) userAuth.getPrincipal();
var email = ((SysUser) userService.loadUserByUsername(currentUser.getUsername())).getCooperator().getEmail();
if (StringUtils.isNotBlank(email)) {
firehoseSubscription.setEmail(email);
Date nextDeliveryDate = new Date(new Date().getTime() + firehoseSubscription.getDelay().delay);
firehoseSubscription.setNextDeliveryDate(nextDeliveryDate);
subscriptionSet.add(firehoseSubscription);
return firehoseSubscription;
} else {
LOG.warn("Exception in creating subscription. SysUser - {} has no email address", currentUser);
throw new InvalidApiUsageException("Email address not found for the user - " + currentUser.getUsername());
}
}
}
throw new InvalidApiUsageException("Not using user authentication");
}
private void validateSubscription(FirehoseSubscription firehoseSubscription) {
if (firehoseSubscription.getDelay() == null) {
throw new InvalidApiUsageException("Delay must not be empty");
}
if (firehoseSubscription.getNotificationType() == null) {
throw new InvalidApiUsageException("Notification Type must not be empty");
}
if (firehoseSubscription.getEntityType() == null) {
throw new InvalidApiUsageException("Entity Type must not be empty");
}
}
}
......@@ -15,17 +15,27 @@
*/
package org.gringlobal.application.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.genesys.blocks.util.CurrentApplicationContext;
import org.genesys.client.model.Accession;
import org.gringlobal.component.firehose.FirehoseEvent;
import org.gringlobal.component.firehose.FirehoseEventListener;
import org.gringlobal.component.firehose.FirehoseJPAListener;
import org.gringlobal.component.firehose.FirehoseMessageProcessor;
import org.gringlobal.component.firehose.FirehoseNotificationProcessor;
import org.gringlobal.component.firehose.FirehoseSubscription;
import org.gringlobal.component.firehose.service.FirehoseNotificationService;
import org.gringlobal.component.firehose.service.impl.FirehoseCompletedActionServiceImpl;
import org.gringlobal.component.firehose.service.impl.FirehoseCreatedEntityServiceImpl;
import org.gringlobal.model.CooperatorOwnedModel;
import org.gringlobal.model.Inventory;
import org.gringlobal.service.ElasticsearchService;
......@@ -34,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.filter.AssignableTypeFilter;
......@@ -42,22 +53,23 @@ import org.springframework.core.type.filter.AssignableTypeFilter;
* @author Artem Hrybeniuk
*/
@Configuration
@ComponentScan(basePackages = { "org.gringlobal.component.firehose.service"})
public class FirehoseConfig {
private final static Logger LOG = LoggerFactory.getLogger(FirehoseConfig.class);
@Bean(name = "createdEventSet")
public NavigableSet<FirehoseEvent> createdEvents() {
return Collections.synchronizedNavigableSet(new TreeSet<>(FirehoseEvent::compareTo));
@Bean(name = "createdEventMap")
public Map<String, FirehoseEvent> createdEvents() {
return Collections.synchronizedMap(new HashMap<>());
}
@Bean(name = "updatedEventSet")
public NavigableSet<FirehoseEvent> updatedEvents() {
return Collections.synchronizedNavigableSet(new TreeSet<>(FirehoseEvent::compareTo));
@Bean(name = "updatedEventMap")
public Map<String, FirehoseEvent> updatedEvents() {
return Collections.synchronizedMap(new HashMap<>());
}
@Bean(name = "removedEventSet")
public NavigableSet<FirehoseEvent> removedEvents() {
return Collections.synchronizedNavigableSet(new TreeSet<>(FirehoseEvent::compareTo));
@Bean(name = "removedEventMap")
public Map<String, FirehoseEvent> removedEvents() {
return Collections.synchronizedMap(new HashMap<>());
}
@Bean
......@@ -95,4 +107,23 @@ public class FirehoseConfig {
return new FirehoseEventListener();
}
@Bean
public FirehoseNotificationProcessor firehoseNotificationProcessor() {
return new FirehoseNotificationProcessor();
}
@Bean(name = "notificationServices")
public List<FirehoseNotificationService> notificationServices() {
var services = new ArrayList<FirehoseNotificationService>();
var createdEntityService = CurrentApplicationContext.getContext().getBean(FirehoseCreatedEntityServiceImpl.class);
var completedActionService = CurrentApplicationContext.getContext().getBean(FirehoseCompletedActionServiceImpl.class);
services.add(createdEntityService);
services.add(completedActionService);
return services;
}
@Bean(name = "subscriptionSet")
public NavigableSet<FirehoseSubscription> subscriptionSet() {
return Collections.synchronizedNavigableSet(new TreeSet<>(FirehoseSubscription::compareTo));
}
}
......@@ -15,6 +15,9 @@
*/
package org.gringlobal.component.firehose;
import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
import java.util.Date;
import java.util.Objects;
......@@ -22,10 +25,13 @@ import java.util.Objects;
*
* @author Artem Hrybeniuk
*/
public class FirehoseEvent {
public class FirehoseEvent implements Comparable<FirehoseEvent>, Serializable {
private static final long serialVersionUID = 476874138554129332L;
private Class<?> clazz;
private Long id;
private String key;
private Date timestamp;
......@@ -40,6 +46,8 @@ public class FirehoseEvent {
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType) {
this.clazz = clazz;
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
this.timestamp = timestamp;
this.eventType = eventType;
}
......@@ -47,6 +55,8 @@ public class FirehoseEvent {
public FirehoseEvent(Class<?> clazz, Long id, Date timestamp, EventType eventType, Object entity) {
this.clazz = clazz;
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
this.timestamp = timestamp;
this.eventType = eventType;
this.entity = entity;
......@@ -62,6 +72,7 @@ public class FirehoseEvent {
public void setId(Long id) {
this.id = id;
this.key = (this.id == null ? "null" : this.id) + ":" + (this.clazz == null ? "null" : this.clazz.getName());
}
public Date getTimestamp() {
......@@ -84,6 +95,15 @@ public class FirehoseEvent {
return eventType;
}
/**
* Object key
*
* @return id+className
*/
public String getKey() {
return this.key;
}
@Override
public String toString() {
return "Firehose." + this.eventType + " for type " + this.clazz + " id=" + this.id + " @" + this.timestamp;
......@@ -103,32 +123,29 @@ public class FirehoseEvent {
}
/**
*
* Compare by timestamp, then id, then clazz
*
* @param a
* @param b
* @return
*/
public static int compareTo(FirehoseEvent a, FirehoseEvent b) {
if (a.timestamp.equals(b.timestamp)) {
if (b.id == null && a.id == null) {
@Override
public int compareTo(@NotNull FirehoseEvent firehoseEvent) {
if (this.timestamp.equals(firehoseEvent.timestamp)) {
if (firehoseEvent.id == null && this.id == null) {
return 0;
} else if (b.id == null) {
} else if (firehoseEvent.id == null) {
return -1;
} else if (a.id == null) {
} else if (this.id == null) {
return 1;
} else {
return (int) (a.id - b.id);
return (int) (this.id - firehoseEvent.id);
}
} else {
return a.timestamp.compareTo(b.timestamp);
return this.timestamp.compareTo(firehoseEvent.timestamp);
}
}
/**
* Match by id and clazz
*
* @param a
*
* @param b
* @return
*/
......@@ -136,6 +153,6 @@ public class FirehoseEvent {
if (b == null) {
return false;
}
return Objects.equals(id, b.id) && Objects.equals(clazz, b.clazz);
return Objects.equals(key, b.key);
}
}
......@@ -15,8 +15,9 @@
*/
package org.gringlobal.component.firehose;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Resource;
......@@ -37,14 +38,14 @@ import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class FirehoseEventListener {
@Resource(name = "updatedEventSet")
private Set<FirehoseEvent> updatedEvents;
@Resource(name = "updatedEventMap")
private Map<String, FirehoseEvent> updatedEvents;
@Resource(name = "removedEventSet")
private Set<FirehoseEvent> removedEvents;
@Resource(name = "removedEventMap")
private Map<String, FirehoseEvent> removedEvents;
@Resource(name = "createdEventSet")
private Set<FirehoseEvent> createdEvents;
@Resource(name = "createdEventMap")
private Map<String, FirehoseEvent> createdEvents;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleEvent(FirehoseEvent firehoseEvent) {
......@@ -52,11 +53,11 @@ public class FirehoseEventListener {
if (FirehoseEvent.EventType.DELETE == eventType) {
// Remove given FirehoseEvent from CreatedEvents and UpdatedEvents
createdEvents.removeIf(firehoseEvent::sameReference);
updatedEvents.removeIf(firehoseEvent::sameReference);
removedEvents.add(firehoseEvent);
createdEvents.remove(firehoseEvent.getKey());
updatedEvents.remove(firehoseEvent.getKey());
removedEvents.put(firehoseEvent.getKey(), firehoseEvent);
} else if (FirehoseEvent.EventType.CREATE.equals(eventType)) {
createdEvents.add(firehoseEvent);
createdEvents.put(firehoseEvent.getKey(), firehoseEvent);
} else if (FirehoseEvent.EventType.UPDATE.equals(eventType)) {
putUpdatedEvent(firehoseEvent);
}
......@@ -66,29 +67,32 @@ public class FirehoseEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleDeleteAllEvent(FirehoseDeleteAllEvent firehoseDeleteAllEvent) {
// Remove from created and updated only
createdEvents.removeIf(event -> Objects.equals(event.getClazz(), firehoseDeleteAllEvent.getClazz()));
updatedEvents.removeIf(event -> Objects.equals(event.getClazz(), firehoseDeleteAllEvent.getClazz()));
var deletedType = firehoseDeleteAllEvent.getClazz();
for (var event : new ArrayList<>(createdEvents.values())) {
if (Objects.equals(event.getClazz(), deletedType)) {
createdEvents.remove(event.getKey());
}
}
for (var event : new ArrayList<>(updatedEvents.values())) {
if (Objects.equals(event.getClazz(), deletedType)) {
updatedEvents.remove(event.getKey());
}
}
}
private void putUpdatedEvent(FirehoseEvent firehoseEvent) {
// replace equal FirehoseEvent in CreatedEvents if contains
boolean replaceCreatedEvent = replaceIfContains(createdEvents, firehoseEvent);
if (!replaceCreatedEvent) {
// replace equal FirehoseEvent in UpdatedEvents if contains
boolean replaceUpdatedEvent = replaceIfContains(updatedEvents, firehoseEvent);
if (!replaceUpdatedEvent) {
// add FirehoseEvent to UpdatedEvents
updatedEvents.add(firehoseEvent);
}
if (! replaceCreatedEvent) {
// replace/put FirehoseEvent to UpdatedEvents
updatedEvents.put(firehoseEvent.getKey(), firehoseEvent);
}
}
private boolean replaceIfContains(Set<FirehoseEvent> events, FirehoseEvent firehoseEvent) {
private boolean replaceIfContains(Map<String, FirehoseEvent> events, FirehoseEvent firehoseEvent) {
// replace Entity and ModifiedDate for equal FirehoseEvent in given Collection
if (events.removeIf(firehoseEvent::sameReference)) {
events.add(firehoseEvent);
return true;
}
return false;
return events.replace(firehoseEvent.getKey(), firehoseEvent) != null;
}
}
......@@ -122,7 +122,7 @@ public class FirehoseJPAListener implements InitializingBean {
// Create and send saved object as FirehoseEvent to the FirehoseEventListener
FirehoseEvent firehoseEvent = createFirehoseEvent(model, timestamp, null);
var result = joinPoint.proceed();
if (result != null && result instanceof EmptyModel) {
if (result instanceof EmptyModel) {
firehoseEvent.setId(((EmptyModel) result).getId());
firehoseEvent.setEntity(result);
}
......
......@@ -16,13 +16,21 @@
package org.gringlobal.component.firehose;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.gringlobal.component.firehose.FirehoseEvent.EventType;
import org.gringlobal.component.firehose.service.FirehoseNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
......@@ -30,8 +38,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
*
* Inspects the insert/update/delete queues and issues notifications.
* Inspects the insert/update/delete queues and processes events by FirehoseNotificationServices for subscriptions.
*
* @author Artem Hrybeniuk
*/
......@@ -43,14 +50,20 @@ public class FirehoseMessageProcessor {
@Value("${firehose.delay:10}")
private int firehoseDelay;
@Resource(name = "updatedEventSet")
private NavigableSet<FirehoseEvent> updatedEvents;
@Resource(name = "updatedEventMap")
private Map<String, FirehoseEvent> updatedEvents;
@Resource(name = "removedEventMap")
private Map<String, FirehoseEvent> removedEvents;
@Resource(name = "createdEventMap")
private Map<String, FirehoseEvent> createdEvents;
@Resource(name = "removedEventSet")
private NavigableSet<FirehoseEvent> removedEvents;
@Resource(name = "notificationServices")
private List<FirehoseNotificationService> notificationServices;
@Resource(name = "createdEventSet")
private NavigableSet<FirehoseEvent> createdEvents;
@Resource(name = "subscriptionSet")
private Set<FirehoseSubscription> subscriptionSet;
@Scheduled(fixedDelay = 1000)
public void processQueues() {
......@@ -68,34 +81,42 @@ public class FirehoseMessageProcessor {
}
private void processEvents(EventType eventType, List<FirehoseEvent> events) {
if (events == null) {
if (CollectionUtils.isEmpty(events)) {
return;
}
events.forEach(firehoseEvent -> {
if (eventType == FirehoseEvent.EventType.CREATE) {
LOG.info("Sending CREATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
} else if (eventType == FirehoseEvent.EventType.UPDATE) {
LOG.info("Sending UPDATE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
} else if (eventType == FirehoseEvent.EventType.DELETE) {
LOG.info("Sending DELETE notification for {} id={} @ {}", firehoseEvent.getClazz(), firehoseEvent.getId(), firehoseEvent.getTimestamp());
for (var service : notificationServices) {
if (!service.getEventType().equals(eventType)) {
continue;
}
});
var serviceNotificationType = service.getNotificationType();
var subscriptions = subscriptionSet.stream()
.filter(subscription -> Objects.equals(serviceNotificationType, subscription.getNotificationType()))
.collect(Collectors.toSet());
for (var subscription : subscriptions) {
service.processEventsForSubscription(events, eventType, subscription);
}
}
}
private List<FirehoseEvent> getEventsBeforeStopDate(NavigableSet<FirehoseEvent> events, Date stopDate) {
private List<FirehoseEvent> getEventsBeforeStopDate(Map<String, FirehoseEvent> events, Date stopDate) {
if (events.isEmpty()) {
return null;
}
var eventsBeforeStopDate = events.headSet(new FirehoseEvent(null, null, stopDate, null));
NavigableSet<FirehoseEvent> eventNavigableSet = Collections.synchronizedNavigableSet(new TreeSet<>(events.values()));
var eventsBeforeStopDate = eventNavigableSet.headSet(new FirehoseEvent(null, null, stopDate, null));
if (eventsBeforeStopDate.size() == 0) {
return null;
}
LOG.debug("Matched {} of {} events", eventsBeforeStopDate.size(), events.size());
var eventsForSending = new ArrayList<>(eventsBeforeStopDate);
eventsBeforeStopDate.clear();
LOG.debug("Keeping {} events, remaining {} of {} events", eventsForSending.size(), eventsBeforeStopDate.size(), events.size());
eventsForSending.forEach(event -> events.remove(event.getKey()));
if (LOG.isDebugEnabled()) {
LOG.debug("Matched {} of {} events, keeping {} events", eventsBeforeStopDate.size(), eventNavigableSet.size(), events.size());
}
return eventsForSending;
}
......
/*
* Copyright 2022 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gringlobal.component.firehose;
import com.google.common.base.Objects;
import org.gringlobal.component.firehose.service.FirehoseNotificationService;
public class FirehoseNotificationEvent {
private Object entity;
private Class<?> clazz;
private FirehoseNotificationService.FirehoseNotificationType notificationType;
public FirehoseNotificationEvent() {
}
public FirehoseNotificationEvent(Object entity, Class<?> clazz, FirehoseNotificationService.FirehoseNotificationType notificationType) {
this.entity = entity;
this.clazz = clazz;
this.notificationType = notificationType;
}
public Object getEntity() {
return entity;
}
public void setEntity(Object entity) {
this.entity = entity;
}
public FirehoseNotificationService.FirehoseNotificationType getNotificationType() {
return notificationType;