From 1e3ece801f565492ba9c7298f842d85dc02556fa Mon Sep 17 00:00:00 2001 From: Lan Liang Date: Wed, 10 Aug 2022 14:32:03 +0800 Subject: [PATCH] Topic stats Add more field, base pulsar 2.10.0 --- pkg/pulsar/utils/data.go | 89 ++++++++++++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 17 deletions(-) diff --git a/pkg/pulsar/utils/data.go b/pkg/pulsar/utils/data.go index 4941456dd..6903f6d8a 100644 --- a/pkg/pulsar/utils/data.go +++ b/pkg/pulsar/utils/data.go @@ -193,29 +193,62 @@ type TopicStats struct { Subscriptions map[string]SubscriptionStats `json:"subscriptions"` Replication map[string]ReplicatorStats `json:"replication"` DeDuplicationStatus string `json:"deduplicationStatus"` + + PublishRateLimitedTimes int64 `json:"publishRateLimitedTimes"` + EarliestMsgPublishTimeInBacklogs int64 `json:"earliestMsgPublishTimeInBacklogs"` + MsgChunkPublished bool `json:"msgChunkPublished"` + OffloadedStorageSize int64 `json:"offloadedStorageSize"` + LastOffloadLedgerID int64 `json:"lastOffloadLedgerId"` + LastOffloadSuccessTimeStamp int64 `json:"lastOffloadSuccessTimeStamp"` + LastOffloadFailureTimeStamp int64 `json:"lastOffloadFailureTimeStamp"` + WaitingPublishers int64 `json:"waitingPublishers"` + NonContiguousDeletedMessagesRanges int64 `json:"nonContiguousDeletedMessagesRanges"` + NonContiguousDeletedMessagesRangesSS int64 `json:"nonContiguousDeletedMessagesRangesSerializedSize"` + Compaction TopicCompaction `json:"compaction"` } type PublisherStats struct { - ProducerID int64 `json:"producerId"` - MsgRateIn float64 `json:"msgRateIn"` - MsgThroughputIn float64 `json:"msgThroughputIn"` - AverageMsgSize float64 `json:"averageMsgSize"` - Metadata map[string]string `json:"metadata"` + ProducerID int64 `json:"producerId"` + ProducerName string `json:"producerName"` + MsgRateIn float64 `json:"msgRateIn"` + MsgThroughputIn float64 `json:"msgThroughputIn"` + AverageMsgSize float64 `json:"averageMsgSize"` + Metadata map[string]string `json:"metadata"` + ClientVersion string `json:"clientVersion"` + ConnectedSince string `json:"connectedSince"` + Address string `json:"address"` + SupportsPartialProducer bool `json:"supportsPartialProducer"` + ChunkedMessageRate float64 `json:"chunkedMessageRate"` } type SubscriptionStats struct { - BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"` - IsReplicated bool `json:"isReplicated"` - MsgRateOut float64 `json:"msgRateOut"` - MsgThroughputOut float64 `json:"msgThroughputOut"` - MsgRateRedeliver float64 `json:"msgRateRedeliver"` - MsgRateExpired float64 `json:"msgRateExpired"` - MsgBacklog int64 `json:"msgBacklog"` - MsgDelayed int64 `json:"msgDelayed"` - UnAckedMessages int64 `json:"unackedMessages"` - SubType string `json:"type"` - ActiveConsumerName string `json:"activeConsumerName"` - Consumers []ConsumerStats `json:"consumers"` + BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"` + IsReplicated bool `json:"isReplicated"` + MsgRateOut float64 `json:"msgRateOut"` + MsgThroughputOut float64 `json:"msgThroughputOut"` + MsgRateRedeliver float64 `json:"msgRateRedeliver"` + MsgRateExpired float64 `json:"msgRateExpired"` + MsgBacklog int64 `json:"msgBacklog"` + MsgDelayed int64 `json:"msgDelayed"` + UnAckedMessages int64 `json:"unackedMessages"` + SubType string `json:"type"` + ActiveConsumerName string `json:"activeConsumerName"` + Consumers []ConsumerStats `json:"consumers"` + TotalMsgExpired int64 `json:"totalMsgExpired"` + BacklogSize int64 `json:"backlogSize"` + EarliestMsgPublishTimeInBacklog int64 `json:"earliestMsgPublishTimeInBacklog"` + MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"` + LastExpireTimestamp int64 `json:"lastExpireTimestamp"` + LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"` + LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"` + LastAckedTimestamp int64 `json:"lastAckedTimestamp"` + LastMarkDeleteAdvancedTimestamp int64 `json:"lastMarkDeleteAdvancedTimestamp"` + IsDurable bool `json:"isDurable"` + AllowOutOfOrderDelivery bool `json:"allowOutOfOrderDelivery"` + NonContiguousDeletedMessagesRanges int64 `json:"nonContiguousDeletedMessagesRanges"` + NonContiguousDeletedMessagesRangesSS int64 `json:"nonContiguousDeletedMessagesRangesSerializedSize"` + Durable bool `json:"durable"` + Replicated bool `json:"replicated"` } type ConsumerStats struct { @@ -227,6 +260,9 @@ type ConsumerStats struct { MsgRateRedeliver float64 `json:"msgRateRedeliver"` ConsumerName string `json:"consumerName"` Metadata map[string]string `json:"metadata"` + Address string `json:"address"` + ConnectedSince string `json:"connectedSince"` + ClientVersion string `json:"clientVersion"` } type ReplicatorStats struct { @@ -318,6 +354,25 @@ type PartitionedTopicStats struct { DeDuplicationStatus string `json:"deduplicationStatus"` Metadata PartitionedTopicMetadata `json:"metadata"` Partitions map[string]TopicStats `json:"partitions"` + + PublishRateLimitedTimes int64 `json:"publishRateLimitedTimes"` + EarliestMsgPublishTimeInBacklogs int64 `json:"earliestMsgPublishTimeInBacklogs"` + MsgChunkPublished bool `json:"msgChunkPublished"` + OffloadedStorageSize int64 `json:"offloadedStorageSize"` + LastOffloadLedgerID int64 `json:"lastOffloadLedgerId"` + LastOffloadSuccessTimeStamp int64 `json:"lastOffloadSuccessTimeStamp"` + LastOffloadFailureTimeStamp int64 `json:"lastOffloadFailureTimeStamp"` + WaitingPublishers int64 `json:"waitingPublishers"` + NonContiguousDeletedMessagesRanges int64 `json:"nonContiguousDeletedMessagesRanges"` + NonContiguousDeletedMessagesRangesSS int64 `json:"nonContiguousDeletedMessagesRangesSerializedSize"` + Compaction TopicCompaction `json:"compaction"` +} + +type TopicCompaction struct { + LastCompactionRemovedEventCount int64 `json:"lastCompactionRemovedEventCount"` + LastCompactionSucceedTimestamp int64 `json:"lastCompactionSucceedTimestamp"` + LastCompactionFailedTimestamp int64 `json:"lastCompactionFailedTimestamp"` + LastCompactionDurationTimeInMills int64 `json:"lastCompactionDurationTimeInMills"` } type SchemaData struct {